NAT64: IPFix (VPP-1106)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         interfaces = self.vapi.nat44_interface_addr_dump()
993         for intf in interfaces:
994             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
995                                                twice_nat=intf.twice_nat,
996                                                is_add=0)
997
998         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
999                             domain_id=self.ipfix_domain_id)
1000         self.ipfix_src_port = 4739
1001         self.ipfix_domain_id = 1
1002
1003         interfaces = self.vapi.nat44_interface_dump()
1004         for intf in interfaces:
1005             if intf.is_inside > 1:
1006                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1007                                                           0,
1008                                                           is_add=0)
1009             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1010                                                       intf.is_inside,
1011                                                       is_add=0)
1012
1013         interfaces = self.vapi.nat44_interface_output_feature_dump()
1014         for intf in interfaces:
1015             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1016                                                              intf.is_inside,
1017                                                              is_add=0)
1018
1019         static_mappings = self.vapi.nat44_static_mapping_dump()
1020         for sm in static_mappings:
1021             self.vapi.nat44_add_del_static_mapping(
1022                 sm.local_ip_address,
1023                 sm.external_ip_address,
1024                 local_port=sm.local_port,
1025                 external_port=sm.external_port,
1026                 addr_only=sm.addr_only,
1027                 vrf_id=sm.vrf_id,
1028                 protocol=sm.protocol,
1029                 twice_nat=sm.twice_nat,
1030                 is_add=0)
1031
1032         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1033         for lb_sm in lb_static_mappings:
1034             self.vapi.nat44_add_del_lb_static_mapping(
1035                 lb_sm.external_addr,
1036                 lb_sm.external_port,
1037                 lb_sm.protocol,
1038                 vrf_id=lb_sm.vrf_id,
1039                 twice_nat=lb_sm.twice_nat,
1040                 is_add=0,
1041                 local_num=0,
1042                 locals=[])
1043
1044         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1045         for id_m in identity_mappings:
1046             self.vapi.nat44_add_del_identity_mapping(
1047                 addr_only=id_m.addr_only,
1048                 ip=id_m.ip_address,
1049                 port=id_m.port,
1050                 sw_if_index=id_m.sw_if_index,
1051                 vrf_id=id_m.vrf_id,
1052                 protocol=id_m.protocol,
1053                 is_add=0)
1054
1055         adresses = self.vapi.nat44_address_dump()
1056         for addr in adresses:
1057             self.vapi.nat44_add_del_address_range(addr.ip_address,
1058                                                   addr.ip_address,
1059                                                   twice_nat=addr.twice_nat,
1060                                                   is_add=0)
1061
1062         self.vapi.nat_set_reass()
1063         self.vapi.nat_set_reass(is_ip6=1)
1064
1065     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1066                                  local_port=0, external_port=0, vrf_id=0,
1067                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1068                                  proto=0, twice_nat=0):
1069         """
1070         Add/delete NAT44 static mapping
1071
1072         :param local_ip: Local IP address
1073         :param external_ip: External IP address
1074         :param local_port: Local port number (Optional)
1075         :param external_port: External port number (Optional)
1076         :param vrf_id: VRF ID (Default 0)
1077         :param is_add: 1 if add, 0 if delete (Default add)
1078         :param external_sw_if_index: External interface instead of IP address
1079         :param proto: IP protocol (Mandatory if port specified)
1080         :param twice_nat: 1 if translate external host address and port
1081         """
1082         addr_only = 1
1083         if local_port and external_port:
1084             addr_only = 0
1085         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1086         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1087         self.vapi.nat44_add_del_static_mapping(
1088             l_ip,
1089             e_ip,
1090             external_sw_if_index,
1091             local_port,
1092             external_port,
1093             addr_only,
1094             vrf_id,
1095             proto,
1096             twice_nat,
1097             is_add)
1098
1099     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1100         """
1101         Add/delete NAT44 address
1102
1103         :param ip: IP address
1104         :param is_add: 1 if add, 0 if delete (Default add)
1105         :param twice_nat: twice NAT address for extenal hosts
1106         """
1107         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1108         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1109                                               vrf_id=vrf_id,
1110                                               twice_nat=twice_nat)
1111
1112     def test_dynamic(self):
1113         """ NAT44 dynamic translation test """
1114
1115         self.nat44_add_address(self.nat_addr)
1116         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1117         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1118                                                   is_inside=0)
1119
1120         # in2out
1121         pkts = self.create_stream_in(self.pg0, self.pg1)
1122         self.pg0.add_stream(pkts)
1123         self.pg_enable_capture(self.pg_interfaces)
1124         self.pg_start()
1125         capture = self.pg1.get_capture(len(pkts))
1126         self.verify_capture_out(capture)
1127
1128         # out2in
1129         pkts = self.create_stream_out(self.pg1)
1130         self.pg1.add_stream(pkts)
1131         self.pg_enable_capture(self.pg_interfaces)
1132         self.pg_start()
1133         capture = self.pg0.get_capture(len(pkts))
1134         self.verify_capture_in(capture, self.pg0)
1135
1136     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1137         """ NAT44 handling of client packets with TTL=1 """
1138
1139         self.nat44_add_address(self.nat_addr)
1140         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1141         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1142                                                   is_inside=0)
1143
1144         # Client side - generate traffic
1145         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1146         self.pg0.add_stream(pkts)
1147         self.pg_enable_capture(self.pg_interfaces)
1148         self.pg_start()
1149
1150         # Client side - verify ICMP type 11 packets
1151         capture = self.pg0.get_capture(len(pkts))
1152         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1153
1154     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1155         """ NAT44 handling of server packets with TTL=1 """
1156
1157         self.nat44_add_address(self.nat_addr)
1158         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1159         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1160                                                   is_inside=0)
1161
1162         # Client side - create sessions
1163         pkts = self.create_stream_in(self.pg0, self.pg1)
1164         self.pg0.add_stream(pkts)
1165         self.pg_enable_capture(self.pg_interfaces)
1166         self.pg_start()
1167
1168         # Server side - generate traffic
1169         capture = self.pg1.get_capture(len(pkts))
1170         self.verify_capture_out(capture)
1171         pkts = self.create_stream_out(self.pg1, ttl=1)
1172         self.pg1.add_stream(pkts)
1173         self.pg_enable_capture(self.pg_interfaces)
1174         self.pg_start()
1175
1176         # Server side - verify ICMP type 11 packets
1177         capture = self.pg1.get_capture(len(pkts))
1178         self.verify_capture_out_with_icmp_errors(capture,
1179                                                  src_ip=self.pg1.local_ip4)
1180
1181     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1182         """ NAT44 handling of error responses to client packets with TTL=2 """
1183
1184         self.nat44_add_address(self.nat_addr)
1185         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1186         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1187                                                   is_inside=0)
1188
1189         # Client side - generate traffic
1190         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1191         self.pg0.add_stream(pkts)
1192         self.pg_enable_capture(self.pg_interfaces)
1193         self.pg_start()
1194
1195         # Server side - simulate ICMP type 11 response
1196         capture = self.pg1.get_capture(len(pkts))
1197         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1198                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1199                 ICMP(type=11) / packet[IP] for packet in capture]
1200         self.pg1.add_stream(pkts)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203
1204         # Client side - verify ICMP type 11 packets
1205         capture = self.pg0.get_capture(len(pkts))
1206         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1207
1208     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1209         """ NAT44 handling of error responses to server packets with TTL=2 """
1210
1211         self.nat44_add_address(self.nat_addr)
1212         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1213         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1214                                                   is_inside=0)
1215
1216         # Client side - create sessions
1217         pkts = self.create_stream_in(self.pg0, self.pg1)
1218         self.pg0.add_stream(pkts)
1219         self.pg_enable_capture(self.pg_interfaces)
1220         self.pg_start()
1221
1222         # Server side - generate traffic
1223         capture = self.pg1.get_capture(len(pkts))
1224         self.verify_capture_out(capture)
1225         pkts = self.create_stream_out(self.pg1, ttl=2)
1226         self.pg1.add_stream(pkts)
1227         self.pg_enable_capture(self.pg_interfaces)
1228         self.pg_start()
1229
1230         # Client side - simulate ICMP type 11 response
1231         capture = self.pg0.get_capture(len(pkts))
1232         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1233                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1234                 ICMP(type=11) / packet[IP] for packet in capture]
1235         self.pg0.add_stream(pkts)
1236         self.pg_enable_capture(self.pg_interfaces)
1237         self.pg_start()
1238
1239         # Server side - verify ICMP type 11 packets
1240         capture = self.pg1.get_capture(len(pkts))
1241         self.verify_capture_out_with_icmp_errors(capture)
1242
1243     def test_ping_out_interface_from_outside(self):
1244         """ Ping NAT44 out interface from outside network """
1245
1246         self.nat44_add_address(self.nat_addr)
1247         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1248         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1249                                                   is_inside=0)
1250
1251         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1252              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1253              ICMP(id=self.icmp_id_out, type='echo-request'))
1254         pkts = [p]
1255         self.pg1.add_stream(pkts)
1256         self.pg_enable_capture(self.pg_interfaces)
1257         self.pg_start()
1258         capture = self.pg1.get_capture(len(pkts))
1259         self.assertEqual(1, len(capture))
1260         packet = capture[0]
1261         try:
1262             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1263             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1264             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1265             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1266         except:
1267             self.logger.error(ppp("Unexpected or invalid packet "
1268                                   "(outside network):", packet))
1269             raise
1270
1271     def test_ping_internal_host_from_outside(self):
1272         """ Ping internal host from outside network """
1273
1274         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1275         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1276         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1277                                                   is_inside=0)
1278
1279         # out2in
1280         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1281                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1282                ICMP(id=self.icmp_id_out, type='echo-request'))
1283         self.pg1.add_stream(pkt)
1284         self.pg_enable_capture(self.pg_interfaces)
1285         self.pg_start()
1286         capture = self.pg0.get_capture(1)
1287         self.verify_capture_in(capture, self.pg0, packet_num=1)
1288         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1289
1290         # in2out
1291         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1292                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1293                ICMP(id=self.icmp_id_in, type='echo-reply'))
1294         self.pg0.add_stream(pkt)
1295         self.pg_enable_capture(self.pg_interfaces)
1296         self.pg_start()
1297         capture = self.pg1.get_capture(1)
1298         self.verify_capture_out(capture, same_port=True, packet_num=1)
1299         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1300
1301     def test_forwarding(self):
1302         """ NAT44 forwarding test """
1303
1304         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1305         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1306                                                   is_inside=0)
1307         self.vapi.nat44_forwarding_enable_disable(1)
1308
1309         real_ip = self.pg0.remote_ip4n
1310         alias_ip = self.nat_addr_n
1311         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1312                                                external_ip=alias_ip)
1313
1314         try:
1315             # in2out - static mapping match
1316
1317             pkts = self.create_stream_out(self.pg1)
1318             self.pg1.add_stream(pkts)
1319             self.pg_enable_capture(self.pg_interfaces)
1320             self.pg_start()
1321             capture = self.pg0.get_capture(len(pkts))
1322             self.verify_capture_in(capture, self.pg0)
1323
1324             pkts = self.create_stream_in(self.pg0, self.pg1)
1325             self.pg0.add_stream(pkts)
1326             self.pg_enable_capture(self.pg_interfaces)
1327             self.pg_start()
1328             capture = self.pg1.get_capture(len(pkts))
1329             self.verify_capture_out(capture, same_port=True)
1330
1331             # in2out - no static mapping match
1332
1333             host0 = self.pg0.remote_hosts[0]
1334             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1335             try:
1336                 pkts = self.create_stream_out(self.pg1,
1337                                               dst_ip=self.pg0.remote_ip4,
1338                                               use_inside_ports=True)
1339                 self.pg1.add_stream(pkts)
1340                 self.pg_enable_capture(self.pg_interfaces)
1341                 self.pg_start()
1342                 capture = self.pg0.get_capture(len(pkts))
1343                 self.verify_capture_in(capture, self.pg0)
1344
1345                 pkts = self.create_stream_in(self.pg0, self.pg1)
1346                 self.pg0.add_stream(pkts)
1347                 self.pg_enable_capture(self.pg_interfaces)
1348                 self.pg_start()
1349                 capture = self.pg1.get_capture(len(pkts))
1350                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1351                                         same_port=True)
1352             finally:
1353                 self.pg0.remote_hosts[0] = host0
1354
1355         finally:
1356             self.vapi.nat44_forwarding_enable_disable(0)
1357             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1358                                                    external_ip=alias_ip,
1359                                                    is_add=0)
1360
1361     def test_static_in(self):
1362         """ 1:1 NAT initialized from inside network """
1363
1364         nat_ip = "10.0.0.10"
1365         self.tcp_port_out = 6303
1366         self.udp_port_out = 6304
1367         self.icmp_id_out = 6305
1368
1369         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1370         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1371         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1372                                                   is_inside=0)
1373
1374         # in2out
1375         pkts = self.create_stream_in(self.pg0, self.pg1)
1376         self.pg0.add_stream(pkts)
1377         self.pg_enable_capture(self.pg_interfaces)
1378         self.pg_start()
1379         capture = self.pg1.get_capture(len(pkts))
1380         self.verify_capture_out(capture, nat_ip, True)
1381
1382         # out2in
1383         pkts = self.create_stream_out(self.pg1, nat_ip)
1384         self.pg1.add_stream(pkts)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         capture = self.pg0.get_capture(len(pkts))
1388         self.verify_capture_in(capture, self.pg0)
1389
1390     def test_static_out(self):
1391         """ 1:1 NAT initialized from outside network """
1392
1393         nat_ip = "10.0.0.20"
1394         self.tcp_port_out = 6303
1395         self.udp_port_out = 6304
1396         self.icmp_id_out = 6305
1397
1398         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1399         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1400         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1401                                                   is_inside=0)
1402
1403         # out2in
1404         pkts = self.create_stream_out(self.pg1, nat_ip)
1405         self.pg1.add_stream(pkts)
1406         self.pg_enable_capture(self.pg_interfaces)
1407         self.pg_start()
1408         capture = self.pg0.get_capture(len(pkts))
1409         self.verify_capture_in(capture, self.pg0)
1410
1411         # in2out
1412         pkts = self.create_stream_in(self.pg0, self.pg1)
1413         self.pg0.add_stream(pkts)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg1.get_capture(len(pkts))
1417         self.verify_capture_out(capture, nat_ip, True)
1418
1419     def test_static_with_port_in(self):
1420         """ 1:1 NAPT initialized from inside network """
1421
1422         self.tcp_port_out = 3606
1423         self.udp_port_out = 3607
1424         self.icmp_id_out = 3608
1425
1426         self.nat44_add_address(self.nat_addr)
1427         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1428                                       self.tcp_port_in, self.tcp_port_out,
1429                                       proto=IP_PROTOS.tcp)
1430         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431                                       self.udp_port_in, self.udp_port_out,
1432                                       proto=IP_PROTOS.udp)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.icmp_id_in, self.icmp_id_out,
1435                                       proto=IP_PROTOS.icmp)
1436         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1437         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1438                                                   is_inside=0)
1439
1440         # in2out
1441         pkts = self.create_stream_in(self.pg0, self.pg1)
1442         self.pg0.add_stream(pkts)
1443         self.pg_enable_capture(self.pg_interfaces)
1444         self.pg_start()
1445         capture = self.pg1.get_capture(len(pkts))
1446         self.verify_capture_out(capture)
1447
1448         # out2in
1449         pkts = self.create_stream_out(self.pg1)
1450         self.pg1.add_stream(pkts)
1451         self.pg_enable_capture(self.pg_interfaces)
1452         self.pg_start()
1453         capture = self.pg0.get_capture(len(pkts))
1454         self.verify_capture_in(capture, self.pg0)
1455
1456     def test_static_with_port_out(self):
1457         """ 1:1 NAPT initialized from outside network """
1458
1459         self.tcp_port_out = 30606
1460         self.udp_port_out = 30607
1461         self.icmp_id_out = 30608
1462
1463         self.nat44_add_address(self.nat_addr)
1464         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1465                                       self.tcp_port_in, self.tcp_port_out,
1466                                       proto=IP_PROTOS.tcp)
1467         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1468                                       self.udp_port_in, self.udp_port_out,
1469                                       proto=IP_PROTOS.udp)
1470         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471                                       self.icmp_id_in, self.icmp_id_out,
1472                                       proto=IP_PROTOS.icmp)
1473         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1474         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1475                                                   is_inside=0)
1476
1477         # out2in
1478         pkts = self.create_stream_out(self.pg1)
1479         self.pg1.add_stream(pkts)
1480         self.pg_enable_capture(self.pg_interfaces)
1481         self.pg_start()
1482         capture = self.pg0.get_capture(len(pkts))
1483         self.verify_capture_in(capture, self.pg0)
1484
1485         # in2out
1486         pkts = self.create_stream_in(self.pg0, self.pg1)
1487         self.pg0.add_stream(pkts)
1488         self.pg_enable_capture(self.pg_interfaces)
1489         self.pg_start()
1490         capture = self.pg1.get_capture(len(pkts))
1491         self.verify_capture_out(capture)
1492
1493     def test_static_vrf_aware(self):
1494         """ 1:1 NAT VRF awareness """
1495
1496         nat_ip1 = "10.0.0.30"
1497         nat_ip2 = "10.0.0.40"
1498         self.tcp_port_out = 6303
1499         self.udp_port_out = 6304
1500         self.icmp_id_out = 6305
1501
1502         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1503                                       vrf_id=10)
1504         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1505                                       vrf_id=10)
1506         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1507                                                   is_inside=0)
1508         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1509         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1510
1511         # inside interface VRF match NAT44 static mapping VRF
1512         pkts = self.create_stream_in(self.pg4, self.pg3)
1513         self.pg4.add_stream(pkts)
1514         self.pg_enable_capture(self.pg_interfaces)
1515         self.pg_start()
1516         capture = self.pg3.get_capture(len(pkts))
1517         self.verify_capture_out(capture, nat_ip1, True)
1518
1519         # inside interface VRF don't match NAT44 static mapping VRF (packets
1520         # are dropped)
1521         pkts = self.create_stream_in(self.pg0, self.pg3)
1522         self.pg0.add_stream(pkts)
1523         self.pg_enable_capture(self.pg_interfaces)
1524         self.pg_start()
1525         self.pg3.assert_nothing_captured()
1526
1527     def test_identity_nat(self):
1528         """ Identity NAT """
1529
1530         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1531         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1532         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1533                                                   is_inside=0)
1534
1535         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1536              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1537              TCP(sport=12345, dport=56789))
1538         self.pg1.add_stream(p)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         capture = self.pg0.get_capture(1)
1542         p = capture[0]
1543         try:
1544             ip = p[IP]
1545             tcp = p[TCP]
1546             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1547             self.assertEqual(ip.src, self.pg1.remote_ip4)
1548             self.assertEqual(tcp.dport, 56789)
1549             self.assertEqual(tcp.sport, 12345)
1550             self.check_tcp_checksum(p)
1551             self.check_ip_checksum(p)
1552         except:
1553             self.logger.error(ppp("Unexpected or invalid packet:", p))
1554             raise
1555
1556     def test_static_lb(self):
1557         """ NAT44 local service load balancing """
1558         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1559         external_port = 80
1560         local_port = 8080
1561         server1 = self.pg0.remote_hosts[0]
1562         server2 = self.pg0.remote_hosts[1]
1563
1564         locals = [{'addr': server1.ip4n,
1565                    'port': local_port,
1566                    'probability': 70},
1567                   {'addr': server2.ip4n,
1568                    'port': local_port,
1569                    'probability': 30}]
1570
1571         self.nat44_add_address(self.nat_addr)
1572         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1573                                                   external_port,
1574                                                   IP_PROTOS.tcp,
1575                                                   local_num=len(locals),
1576                                                   locals=locals)
1577         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1578         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1579                                                   is_inside=0)
1580
1581         # from client to service
1582         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1583              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1584              TCP(sport=12345, dport=external_port))
1585         self.pg1.add_stream(p)
1586         self.pg_enable_capture(self.pg_interfaces)
1587         self.pg_start()
1588         capture = self.pg0.get_capture(1)
1589         p = capture[0]
1590         server = None
1591         try:
1592             ip = p[IP]
1593             tcp = p[TCP]
1594             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1595             if ip.dst == server1.ip4:
1596                 server = server1
1597             else:
1598                 server = server2
1599             self.assertEqual(tcp.dport, local_port)
1600             self.check_tcp_checksum(p)
1601             self.check_ip_checksum(p)
1602         except:
1603             self.logger.error(ppp("Unexpected or invalid packet:", p))
1604             raise
1605
1606         # from service back to client
1607         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1608              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1609              TCP(sport=local_port, dport=12345))
1610         self.pg0.add_stream(p)
1611         self.pg_enable_capture(self.pg_interfaces)
1612         self.pg_start()
1613         capture = self.pg1.get_capture(1)
1614         p = capture[0]
1615         try:
1616             ip = p[IP]
1617             tcp = p[TCP]
1618             self.assertEqual(ip.src, self.nat_addr)
1619             self.assertEqual(tcp.sport, external_port)
1620             self.check_tcp_checksum(p)
1621             self.check_ip_checksum(p)
1622         except:
1623             self.logger.error(ppp("Unexpected or invalid packet:", p))
1624             raise
1625
1626         # multiple clients
1627         server1_n = 0
1628         server2_n = 0
1629         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1630         pkts = []
1631         for client in clients:
1632             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1633                  IP(src=client, dst=self.nat_addr) /
1634                  TCP(sport=12345, dport=external_port))
1635             pkts.append(p)
1636         self.pg1.add_stream(pkts)
1637         self.pg_enable_capture(self.pg_interfaces)
1638         self.pg_start()
1639         capture = self.pg0.get_capture(len(pkts))
1640         for p in capture:
1641             if p[IP].dst == server1.ip4:
1642                 server1_n += 1
1643             else:
1644                 server2_n += 1
1645         self.assertTrue(server1_n > server2_n)
1646
1647     def test_multiple_inside_interfaces(self):
1648         """ NAT44 multiple non-overlapping address space inside interfaces """
1649
1650         self.nat44_add_address(self.nat_addr)
1651         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1652         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1653         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1654                                                   is_inside=0)
1655
1656         # between two NAT44 inside interfaces (no translation)
1657         pkts = self.create_stream_in(self.pg0, self.pg1)
1658         self.pg0.add_stream(pkts)
1659         self.pg_enable_capture(self.pg_interfaces)
1660         self.pg_start()
1661         capture = self.pg1.get_capture(len(pkts))
1662         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1663
1664         # from NAT44 inside to interface without NAT44 feature (no translation)
1665         pkts = self.create_stream_in(self.pg0, self.pg2)
1666         self.pg0.add_stream(pkts)
1667         self.pg_enable_capture(self.pg_interfaces)
1668         self.pg_start()
1669         capture = self.pg2.get_capture(len(pkts))
1670         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1671
1672         # in2out 1st interface
1673         pkts = self.create_stream_in(self.pg0, self.pg3)
1674         self.pg0.add_stream(pkts)
1675         self.pg_enable_capture(self.pg_interfaces)
1676         self.pg_start()
1677         capture = self.pg3.get_capture(len(pkts))
1678         self.verify_capture_out(capture)
1679
1680         # out2in 1st interface
1681         pkts = self.create_stream_out(self.pg3)
1682         self.pg3.add_stream(pkts)
1683         self.pg_enable_capture(self.pg_interfaces)
1684         self.pg_start()
1685         capture = self.pg0.get_capture(len(pkts))
1686         self.verify_capture_in(capture, self.pg0)
1687
1688         # in2out 2nd interface
1689         pkts = self.create_stream_in(self.pg1, self.pg3)
1690         self.pg1.add_stream(pkts)
1691         self.pg_enable_capture(self.pg_interfaces)
1692         self.pg_start()
1693         capture = self.pg3.get_capture(len(pkts))
1694         self.verify_capture_out(capture)
1695
1696         # out2in 2nd interface
1697         pkts = self.create_stream_out(self.pg3)
1698         self.pg3.add_stream(pkts)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         capture = self.pg1.get_capture(len(pkts))
1702         self.verify_capture_in(capture, self.pg1)
1703
1704     def test_inside_overlapping_interfaces(self):
1705         """ NAT44 multiple inside interfaces with overlapping address space """
1706
1707         static_nat_ip = "10.0.0.10"
1708         self.nat44_add_address(self.nat_addr)
1709         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1710                                                   is_inside=0)
1711         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1712         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1713         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1714         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1715                                       vrf_id=20)
1716
1717         # between NAT44 inside interfaces with same VRF (no translation)
1718         pkts = self.create_stream_in(self.pg4, self.pg5)
1719         self.pg4.add_stream(pkts)
1720         self.pg_enable_capture(self.pg_interfaces)
1721         self.pg_start()
1722         capture = self.pg5.get_capture(len(pkts))
1723         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1724
1725         # between NAT44 inside interfaces with different VRF (hairpinning)
1726         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1727              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1728              TCP(sport=1234, dport=5678))
1729         self.pg4.add_stream(p)
1730         self.pg_enable_capture(self.pg_interfaces)
1731         self.pg_start()
1732         capture = self.pg6.get_capture(1)
1733         p = capture[0]
1734         try:
1735             ip = p[IP]
1736             tcp = p[TCP]
1737             self.assertEqual(ip.src, self.nat_addr)
1738             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1739             self.assertNotEqual(tcp.sport, 1234)
1740             self.assertEqual(tcp.dport, 5678)
1741         except:
1742             self.logger.error(ppp("Unexpected or invalid packet:", p))
1743             raise
1744
1745         # in2out 1st interface
1746         pkts = self.create_stream_in(self.pg4, self.pg3)
1747         self.pg4.add_stream(pkts)
1748         self.pg_enable_capture(self.pg_interfaces)
1749         self.pg_start()
1750         capture = self.pg3.get_capture(len(pkts))
1751         self.verify_capture_out(capture)
1752
1753         # out2in 1st interface
1754         pkts = self.create_stream_out(self.pg3)
1755         self.pg3.add_stream(pkts)
1756         self.pg_enable_capture(self.pg_interfaces)
1757         self.pg_start()
1758         capture = self.pg4.get_capture(len(pkts))
1759         self.verify_capture_in(capture, self.pg4)
1760
1761         # in2out 2nd interface
1762         pkts = self.create_stream_in(self.pg5, self.pg3)
1763         self.pg5.add_stream(pkts)
1764         self.pg_enable_capture(self.pg_interfaces)
1765         self.pg_start()
1766         capture = self.pg3.get_capture(len(pkts))
1767         self.verify_capture_out(capture)
1768
1769         # out2in 2nd interface
1770         pkts = self.create_stream_out(self.pg3)
1771         self.pg3.add_stream(pkts)
1772         self.pg_enable_capture(self.pg_interfaces)
1773         self.pg_start()
1774         capture = self.pg5.get_capture(len(pkts))
1775         self.verify_capture_in(capture, self.pg5)
1776
1777         # pg5 session dump
1778         addresses = self.vapi.nat44_address_dump()
1779         self.assertEqual(len(addresses), 1)
1780         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1781         self.assertEqual(len(sessions), 3)
1782         for session in sessions:
1783             self.assertFalse(session.is_static)
1784             self.assertEqual(session.inside_ip_address[0:4],
1785                              self.pg5.remote_ip4n)
1786             self.assertEqual(session.outside_ip_address,
1787                              addresses[0].ip_address)
1788         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1789         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1790         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1791         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1792         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1793         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1794         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1795         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1796         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1797
1798         # in2out 3rd interface
1799         pkts = self.create_stream_in(self.pg6, self.pg3)
1800         self.pg6.add_stream(pkts)
1801         self.pg_enable_capture(self.pg_interfaces)
1802         self.pg_start()
1803         capture = self.pg3.get_capture(len(pkts))
1804         self.verify_capture_out(capture, static_nat_ip, True)
1805
1806         # out2in 3rd interface
1807         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1808         self.pg3.add_stream(pkts)
1809         self.pg_enable_capture(self.pg_interfaces)
1810         self.pg_start()
1811         capture = self.pg6.get_capture(len(pkts))
1812         self.verify_capture_in(capture, self.pg6)
1813
1814         # general user and session dump verifications
1815         users = self.vapi.nat44_user_dump()
1816         self.assertTrue(len(users) >= 3)
1817         addresses = self.vapi.nat44_address_dump()
1818         self.assertEqual(len(addresses), 1)
1819         for user in users:
1820             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1821                                                          user.vrf_id)
1822             for session in sessions:
1823                 self.assertEqual(user.ip_address, session.inside_ip_address)
1824                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1825                 self.assertTrue(session.protocol in
1826                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1827                                  IP_PROTOS.icmp])
1828
1829         # pg4 session dump
1830         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1831         self.assertTrue(len(sessions) >= 4)
1832         for session in sessions:
1833             self.assertFalse(session.is_static)
1834             self.assertEqual(session.inside_ip_address[0:4],
1835                              self.pg4.remote_ip4n)
1836             self.assertEqual(session.outside_ip_address,
1837                              addresses[0].ip_address)
1838
1839         # pg6 session dump
1840         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1841         self.assertTrue(len(sessions) >= 3)
1842         for session in sessions:
1843             self.assertTrue(session.is_static)
1844             self.assertEqual(session.inside_ip_address[0:4],
1845                              self.pg6.remote_ip4n)
1846             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1847                              map(int, static_nat_ip.split('.')))
1848             self.assertTrue(session.inside_port in
1849                             [self.tcp_port_in, self.udp_port_in,
1850                              self.icmp_id_in])
1851
1852     def test_hairpinning(self):
1853         """ NAT44 hairpinning - 1:1 NAPT """
1854
1855         host = self.pg0.remote_hosts[0]
1856         server = self.pg0.remote_hosts[1]
1857         host_in_port = 1234
1858         host_out_port = 0
1859         server_in_port = 5678
1860         server_out_port = 8765
1861
1862         self.nat44_add_address(self.nat_addr)
1863         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1864         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1865                                                   is_inside=0)
1866         # add static mapping for server
1867         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1868                                       server_in_port, server_out_port,
1869                                       proto=IP_PROTOS.tcp)
1870
1871         # send packet from host to server
1872         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1873              IP(src=host.ip4, dst=self.nat_addr) /
1874              TCP(sport=host_in_port, dport=server_out_port))
1875         self.pg0.add_stream(p)
1876         self.pg_enable_capture(self.pg_interfaces)
1877         self.pg_start()
1878         capture = self.pg0.get_capture(1)
1879         p = capture[0]
1880         try:
1881             ip = p[IP]
1882             tcp = p[TCP]
1883             self.assertEqual(ip.src, self.nat_addr)
1884             self.assertEqual(ip.dst, server.ip4)
1885             self.assertNotEqual(tcp.sport, host_in_port)
1886             self.assertEqual(tcp.dport, server_in_port)
1887             self.check_tcp_checksum(p)
1888             host_out_port = tcp.sport
1889         except:
1890             self.logger.error(ppp("Unexpected or invalid packet:", p))
1891             raise
1892
1893         # send reply from server to host
1894         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1895              IP(src=server.ip4, dst=self.nat_addr) /
1896              TCP(sport=server_in_port, dport=host_out_port))
1897         self.pg0.add_stream(p)
1898         self.pg_enable_capture(self.pg_interfaces)
1899         self.pg_start()
1900         capture = self.pg0.get_capture(1)
1901         p = capture[0]
1902         try:
1903             ip = p[IP]
1904             tcp = p[TCP]
1905             self.assertEqual(ip.src, self.nat_addr)
1906             self.assertEqual(ip.dst, host.ip4)
1907             self.assertEqual(tcp.sport, server_out_port)
1908             self.assertEqual(tcp.dport, host_in_port)
1909             self.check_tcp_checksum(p)
1910         except:
1911             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1912             raise
1913
1914     def test_hairpinning2(self):
1915         """ NAT44 hairpinning - 1:1 NAT"""
1916
1917         server1_nat_ip = "10.0.0.10"
1918         server2_nat_ip = "10.0.0.11"
1919         host = self.pg0.remote_hosts[0]
1920         server1 = self.pg0.remote_hosts[1]
1921         server2 = self.pg0.remote_hosts[2]
1922         server_tcp_port = 22
1923         server_udp_port = 20
1924
1925         self.nat44_add_address(self.nat_addr)
1926         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1927         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1928                                                   is_inside=0)
1929
1930         # add static mapping for servers
1931         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1932         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1933
1934         # host to server1
1935         pkts = []
1936         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1937              IP(src=host.ip4, dst=server1_nat_ip) /
1938              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1939         pkts.append(p)
1940         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1941              IP(src=host.ip4, dst=server1_nat_ip) /
1942              UDP(sport=self.udp_port_in, dport=server_udp_port))
1943         pkts.append(p)
1944         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1945              IP(src=host.ip4, dst=server1_nat_ip) /
1946              ICMP(id=self.icmp_id_in, type='echo-request'))
1947         pkts.append(p)
1948         self.pg0.add_stream(pkts)
1949         self.pg_enable_capture(self.pg_interfaces)
1950         self.pg_start()
1951         capture = self.pg0.get_capture(len(pkts))
1952         for packet in capture:
1953             try:
1954                 self.assertEqual(packet[IP].src, self.nat_addr)
1955                 self.assertEqual(packet[IP].dst, server1.ip4)
1956                 if packet.haslayer(TCP):
1957                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1958                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1959                     self.tcp_port_out = packet[TCP].sport
1960                     self.check_tcp_checksum(packet)
1961                 elif packet.haslayer(UDP):
1962                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1963                     self.assertEqual(packet[UDP].dport, server_udp_port)
1964                     self.udp_port_out = packet[UDP].sport
1965                 else:
1966                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1967                     self.icmp_id_out = packet[ICMP].id
1968             except:
1969                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1970                 raise
1971
1972         # server1 to host
1973         pkts = []
1974         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1975              IP(src=server1.ip4, dst=self.nat_addr) /
1976              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1977         pkts.append(p)
1978         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1979              IP(src=server1.ip4, dst=self.nat_addr) /
1980              UDP(sport=server_udp_port, dport=self.udp_port_out))
1981         pkts.append(p)
1982         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1983              IP(src=server1.ip4, dst=self.nat_addr) /
1984              ICMP(id=self.icmp_id_out, type='echo-reply'))
1985         pkts.append(p)
1986         self.pg0.add_stream(pkts)
1987         self.pg_enable_capture(self.pg_interfaces)
1988         self.pg_start()
1989         capture = self.pg0.get_capture(len(pkts))
1990         for packet in capture:
1991             try:
1992                 self.assertEqual(packet[IP].src, server1_nat_ip)
1993                 self.assertEqual(packet[IP].dst, host.ip4)
1994                 if packet.haslayer(TCP):
1995                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1996                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1997                     self.check_tcp_checksum(packet)
1998                 elif packet.haslayer(UDP):
1999                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2000                     self.assertEqual(packet[UDP].sport, server_udp_port)
2001                 else:
2002                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2003             except:
2004                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2005                 raise
2006
2007         # server2 to server1
2008         pkts = []
2009         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2010              IP(src=server2.ip4, dst=server1_nat_ip) /
2011              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2012         pkts.append(p)
2013         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014              IP(src=server2.ip4, dst=server1_nat_ip) /
2015              UDP(sport=self.udp_port_in, dport=server_udp_port))
2016         pkts.append(p)
2017         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2018              IP(src=server2.ip4, dst=server1_nat_ip) /
2019              ICMP(id=self.icmp_id_in, type='echo-request'))
2020         pkts.append(p)
2021         self.pg0.add_stream(pkts)
2022         self.pg_enable_capture(self.pg_interfaces)
2023         self.pg_start()
2024         capture = self.pg0.get_capture(len(pkts))
2025         for packet in capture:
2026             try:
2027                 self.assertEqual(packet[IP].src, server2_nat_ip)
2028                 self.assertEqual(packet[IP].dst, server1.ip4)
2029                 if packet.haslayer(TCP):
2030                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2031                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2032                     self.tcp_port_out = packet[TCP].sport
2033                     self.check_tcp_checksum(packet)
2034                 elif packet.haslayer(UDP):
2035                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2036                     self.assertEqual(packet[UDP].dport, server_udp_port)
2037                     self.udp_port_out = packet[UDP].sport
2038                 else:
2039                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2040                     self.icmp_id_out = packet[ICMP].id
2041             except:
2042                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2043                 raise
2044
2045         # server1 to server2
2046         pkts = []
2047         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2048              IP(src=server1.ip4, dst=server2_nat_ip) /
2049              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2050         pkts.append(p)
2051         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2052              IP(src=server1.ip4, dst=server2_nat_ip) /
2053              UDP(sport=server_udp_port, dport=self.udp_port_out))
2054         pkts.append(p)
2055         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2056              IP(src=server1.ip4, dst=server2_nat_ip) /
2057              ICMP(id=self.icmp_id_out, type='echo-reply'))
2058         pkts.append(p)
2059         self.pg0.add_stream(pkts)
2060         self.pg_enable_capture(self.pg_interfaces)
2061         self.pg_start()
2062         capture = self.pg0.get_capture(len(pkts))
2063         for packet in capture:
2064             try:
2065                 self.assertEqual(packet[IP].src, server1_nat_ip)
2066                 self.assertEqual(packet[IP].dst, server2.ip4)
2067                 if packet.haslayer(TCP):
2068                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2069                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2070                     self.check_tcp_checksum(packet)
2071                 elif packet.haslayer(UDP):
2072                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2073                     self.assertEqual(packet[UDP].sport, server_udp_port)
2074                 else:
2075                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2076             except:
2077                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2078                 raise
2079
2080     def test_max_translations_per_user(self):
2081         """ MAX translations per user - recycle the least recently used """
2082
2083         self.nat44_add_address(self.nat_addr)
2084         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2085         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2086                                                   is_inside=0)
2087
2088         # get maximum number of translations per user
2089         nat44_config = self.vapi.nat_show_config()
2090
2091         # send more than maximum number of translations per user packets
2092         pkts_num = nat44_config.max_translations_per_user + 5
2093         pkts = []
2094         for port in range(0, pkts_num):
2095             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2096                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2097                  TCP(sport=1025 + port))
2098             pkts.append(p)
2099         self.pg0.add_stream(pkts)
2100         self.pg_enable_capture(self.pg_interfaces)
2101         self.pg_start()
2102
2103         # verify number of translated packet
2104         self.pg1.get_capture(pkts_num)
2105
2106     def test_interface_addr(self):
2107         """ Acquire NAT44 addresses from interface """
2108         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2109
2110         # no address in NAT pool
2111         adresses = self.vapi.nat44_address_dump()
2112         self.assertEqual(0, len(adresses))
2113
2114         # configure interface address and check NAT address pool
2115         self.pg7.config_ip4()
2116         adresses = self.vapi.nat44_address_dump()
2117         self.assertEqual(1, len(adresses))
2118         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2119
2120         # remove interface address and check NAT address pool
2121         self.pg7.unconfig_ip4()
2122         adresses = self.vapi.nat44_address_dump()
2123         self.assertEqual(0, len(adresses))
2124
2125     def test_interface_addr_static_mapping(self):
2126         """ Static mapping with addresses from interface """
2127         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2128         self.nat44_add_static_mapping(
2129             '1.2.3.4',
2130             external_sw_if_index=self.pg7.sw_if_index)
2131
2132         # static mappings with external interface
2133         static_mappings = self.vapi.nat44_static_mapping_dump()
2134         self.assertEqual(1, len(static_mappings))
2135         self.assertEqual(self.pg7.sw_if_index,
2136                          static_mappings[0].external_sw_if_index)
2137
2138         # configure interface address and check static mappings
2139         self.pg7.config_ip4()
2140         static_mappings = self.vapi.nat44_static_mapping_dump()
2141         self.assertEqual(1, len(static_mappings))
2142         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2143                          self.pg7.local_ip4n)
2144         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2145
2146         # remove interface address and check static mappings
2147         self.pg7.unconfig_ip4()
2148         static_mappings = self.vapi.nat44_static_mapping_dump()
2149         self.assertEqual(0, len(static_mappings))
2150
2151     def test_interface_addr_identity_nat(self):
2152         """ Identity NAT with addresses from interface """
2153
2154         port = 53053
2155         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2156         self.vapi.nat44_add_del_identity_mapping(
2157             sw_if_index=self.pg7.sw_if_index,
2158             port=port,
2159             protocol=IP_PROTOS.tcp,
2160             addr_only=0)
2161
2162         # identity mappings with external interface
2163         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2164         self.assertEqual(1, len(identity_mappings))
2165         self.assertEqual(self.pg7.sw_if_index,
2166                          identity_mappings[0].sw_if_index)
2167
2168         # configure interface address and check identity mappings
2169         self.pg7.config_ip4()
2170         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2171         self.assertEqual(1, len(identity_mappings))
2172         self.assertEqual(identity_mappings[0].ip_address,
2173                          self.pg7.local_ip4n)
2174         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2175         self.assertEqual(port, identity_mappings[0].port)
2176         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2177
2178         # remove interface address and check identity mappings
2179         self.pg7.unconfig_ip4()
2180         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2181         self.assertEqual(0, len(identity_mappings))
2182
2183     def test_ipfix_nat44_sess(self):
2184         """ IPFIX logging NAT44 session created/delted """
2185         self.ipfix_domain_id = 10
2186         self.ipfix_src_port = 20202
2187         colector_port = 30303
2188         bind_layers(UDP, IPFIX, dport=30303)
2189         self.nat44_add_address(self.nat_addr)
2190         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2191         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2192                                                   is_inside=0)
2193         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2194                                      src_address=self.pg3.local_ip4n,
2195                                      path_mtu=512,
2196                                      template_interval=10,
2197                                      collector_port=colector_port)
2198         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2199                             src_port=self.ipfix_src_port)
2200
2201         pkts = self.create_stream_in(self.pg0, self.pg1)
2202         self.pg0.add_stream(pkts)
2203         self.pg_enable_capture(self.pg_interfaces)
2204         self.pg_start()
2205         capture = self.pg1.get_capture(len(pkts))
2206         self.verify_capture_out(capture)
2207         self.nat44_add_address(self.nat_addr, is_add=0)
2208         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2209         capture = self.pg3.get_capture(9)
2210         ipfix = IPFIXDecoder()
2211         # first load template
2212         for p in capture:
2213             self.assertTrue(p.haslayer(IPFIX))
2214             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2215             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2216             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2217             self.assertEqual(p[UDP].dport, colector_port)
2218             self.assertEqual(p[IPFIX].observationDomainID,
2219                              self.ipfix_domain_id)
2220             if p.haslayer(Template):
2221                 ipfix.add_template(p.getlayer(Template))
2222         # verify events in data set
2223         for p in capture:
2224             if p.haslayer(Data):
2225                 data = ipfix.decode_data_set(p.getlayer(Set))
2226                 self.verify_ipfix_nat44_ses(data)
2227
2228     def test_ipfix_addr_exhausted(self):
2229         """ IPFIX logging NAT addresses exhausted """
2230         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2231         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2232                                                   is_inside=0)
2233         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2234                                      src_address=self.pg3.local_ip4n,
2235                                      path_mtu=512,
2236                                      template_interval=10)
2237         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2238                             src_port=self.ipfix_src_port)
2239
2240         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2241              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2242              TCP(sport=3025))
2243         self.pg0.add_stream(p)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         capture = self.pg1.get_capture(0)
2247         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2248         capture = self.pg3.get_capture(9)
2249         ipfix = IPFIXDecoder()
2250         # first load template
2251         for p in capture:
2252             self.assertTrue(p.haslayer(IPFIX))
2253             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2254             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2255             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2256             self.assertEqual(p[UDP].dport, 4739)
2257             self.assertEqual(p[IPFIX].observationDomainID,
2258                              self.ipfix_domain_id)
2259             if p.haslayer(Template):
2260                 ipfix.add_template(p.getlayer(Template))
2261         # verify events in data set
2262         for p in capture:
2263             if p.haslayer(Data):
2264                 data = ipfix.decode_data_set(p.getlayer(Set))
2265                 self.verify_ipfix_addr_exhausted(data)
2266
2267     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2268     def test_ipfix_max_sessions(self):
2269         """ IPFIX logging maximum session entries exceeded """
2270         self.nat44_add_address(self.nat_addr)
2271         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2272         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2273                                                   is_inside=0)
2274
2275         nat44_config = self.vapi.nat_show_config()
2276         max_sessions = 10 * nat44_config.translation_buckets
2277
2278         pkts = []
2279         for i in range(0, max_sessions):
2280             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2281             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282                  IP(src=src, dst=self.pg1.remote_ip4) /
2283                  TCP(sport=1025))
2284             pkts.append(p)
2285         self.pg0.add_stream(pkts)
2286         self.pg_enable_capture(self.pg_interfaces)
2287         self.pg_start()
2288
2289         self.pg1.get_capture(max_sessions)
2290         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2291                                      src_address=self.pg3.local_ip4n,
2292                                      path_mtu=512,
2293                                      template_interval=10)
2294         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2295                             src_port=self.ipfix_src_port)
2296
2297         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2298              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2299              TCP(sport=1025))
2300         self.pg0.add_stream(p)
2301         self.pg_enable_capture(self.pg_interfaces)
2302         self.pg_start()
2303         self.pg1.get_capture(0)
2304         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2305         capture = self.pg3.get_capture(9)
2306         ipfix = IPFIXDecoder()
2307         # first load template
2308         for p in capture:
2309             self.assertTrue(p.haslayer(IPFIX))
2310             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2311             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2312             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2313             self.assertEqual(p[UDP].dport, 4739)
2314             self.assertEqual(p[IPFIX].observationDomainID,
2315                              self.ipfix_domain_id)
2316             if p.haslayer(Template):
2317                 ipfix.add_template(p.getlayer(Template))
2318         # verify events in data set
2319         for p in capture:
2320             if p.haslayer(Data):
2321                 data = ipfix.decode_data_set(p.getlayer(Set))
2322                 self.verify_ipfix_max_sessions(data, max_sessions)
2323
2324     def test_pool_addr_fib(self):
2325         """ NAT44 add pool addresses to FIB """
2326         static_addr = '10.0.0.10'
2327         self.nat44_add_address(self.nat_addr)
2328         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2329         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2330                                                   is_inside=0)
2331         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2332
2333         # NAT44 address
2334         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2335              ARP(op=ARP.who_has, pdst=self.nat_addr,
2336                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2337         self.pg1.add_stream(p)
2338         self.pg_enable_capture(self.pg_interfaces)
2339         self.pg_start()
2340         capture = self.pg1.get_capture(1)
2341         self.assertTrue(capture[0].haslayer(ARP))
2342         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2343
2344         # 1:1 NAT address
2345         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2346              ARP(op=ARP.who_has, pdst=static_addr,
2347                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2348         self.pg1.add_stream(p)
2349         self.pg_enable_capture(self.pg_interfaces)
2350         self.pg_start()
2351         capture = self.pg1.get_capture(1)
2352         self.assertTrue(capture[0].haslayer(ARP))
2353         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2354
2355         # send ARP to non-NAT44 interface
2356         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2357              ARP(op=ARP.who_has, pdst=self.nat_addr,
2358                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2359         self.pg2.add_stream(p)
2360         self.pg_enable_capture(self.pg_interfaces)
2361         self.pg_start()
2362         capture = self.pg1.get_capture(0)
2363
2364         # remove addresses and verify
2365         self.nat44_add_address(self.nat_addr, is_add=0)
2366         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2367                                       is_add=0)
2368
2369         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2370              ARP(op=ARP.who_has, pdst=self.nat_addr,
2371                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2372         self.pg1.add_stream(p)
2373         self.pg_enable_capture(self.pg_interfaces)
2374         self.pg_start()
2375         capture = self.pg1.get_capture(0)
2376
2377         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2378              ARP(op=ARP.who_has, pdst=static_addr,
2379                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2380         self.pg1.add_stream(p)
2381         self.pg_enable_capture(self.pg_interfaces)
2382         self.pg_start()
2383         capture = self.pg1.get_capture(0)
2384
2385     def test_vrf_mode(self):
2386         """ NAT44 tenant VRF aware address pool mode """
2387
2388         vrf_id1 = 1
2389         vrf_id2 = 2
2390         nat_ip1 = "10.0.0.10"
2391         nat_ip2 = "10.0.0.11"
2392
2393         self.pg0.unconfig_ip4()
2394         self.pg1.unconfig_ip4()
2395         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2396         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2397         self.pg0.set_table_ip4(vrf_id1)
2398         self.pg1.set_table_ip4(vrf_id2)
2399         self.pg0.config_ip4()
2400         self.pg1.config_ip4()
2401
2402         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2403         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2404         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2406         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2407                                                   is_inside=0)
2408
2409         # first VRF
2410         pkts = self.create_stream_in(self.pg0, self.pg2)
2411         self.pg0.add_stream(pkts)
2412         self.pg_enable_capture(self.pg_interfaces)
2413         self.pg_start()
2414         capture = self.pg2.get_capture(len(pkts))
2415         self.verify_capture_out(capture, nat_ip1)
2416
2417         # second VRF
2418         pkts = self.create_stream_in(self.pg1, self.pg2)
2419         self.pg1.add_stream(pkts)
2420         self.pg_enable_capture(self.pg_interfaces)
2421         self.pg_start()
2422         capture = self.pg2.get_capture(len(pkts))
2423         self.verify_capture_out(capture, nat_ip2)
2424
2425         self.pg0.unconfig_ip4()
2426         self.pg1.unconfig_ip4()
2427         self.pg0.set_table_ip4(0)
2428         self.pg1.set_table_ip4(0)
2429         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2430         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2431
2432     def test_vrf_feature_independent(self):
2433         """ NAT44 tenant VRF independent address pool mode """
2434
2435         nat_ip1 = "10.0.0.10"
2436         nat_ip2 = "10.0.0.11"
2437
2438         self.nat44_add_address(nat_ip1)
2439         self.nat44_add_address(nat_ip2, vrf_id=99)
2440         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2441         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2442         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2443                                                   is_inside=0)
2444
2445         # first VRF
2446         pkts = self.create_stream_in(self.pg0, self.pg2)
2447         self.pg0.add_stream(pkts)
2448         self.pg_enable_capture(self.pg_interfaces)
2449         self.pg_start()
2450         capture = self.pg2.get_capture(len(pkts))
2451         self.verify_capture_out(capture, nat_ip1)
2452
2453         # second VRF
2454         pkts = self.create_stream_in(self.pg1, self.pg2)
2455         self.pg1.add_stream(pkts)
2456         self.pg_enable_capture(self.pg_interfaces)
2457         self.pg_start()
2458         capture = self.pg2.get_capture(len(pkts))
2459         self.verify_capture_out(capture, nat_ip1)
2460
2461     def test_dynamic_ipless_interfaces(self):
2462         """ NAT44 interfaces without configured IP address """
2463
2464         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2465                                       mactobinary(self.pg7.remote_mac),
2466                                       self.pg7.remote_ip4n,
2467                                       is_static=1)
2468         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2469                                       mactobinary(self.pg8.remote_mac),
2470                                       self.pg8.remote_ip4n,
2471                                       is_static=1)
2472
2473         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2474                                    dst_address_length=32,
2475                                    next_hop_address=self.pg7.remote_ip4n,
2476                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2477         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2478                                    dst_address_length=32,
2479                                    next_hop_address=self.pg8.remote_ip4n,
2480                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2481
2482         self.nat44_add_address(self.nat_addr)
2483         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2484         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2485                                                   is_inside=0)
2486
2487         # in2out
2488         pkts = self.create_stream_in(self.pg7, self.pg8)
2489         self.pg7.add_stream(pkts)
2490         self.pg_enable_capture(self.pg_interfaces)
2491         self.pg_start()
2492         capture = self.pg8.get_capture(len(pkts))
2493         self.verify_capture_out(capture)
2494
2495         # out2in
2496         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2497         self.pg8.add_stream(pkts)
2498         self.pg_enable_capture(self.pg_interfaces)
2499         self.pg_start()
2500         capture = self.pg7.get_capture(len(pkts))
2501         self.verify_capture_in(capture, self.pg7)
2502
2503     def test_static_ipless_interfaces(self):
2504         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2505
2506         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2507                                       mactobinary(self.pg7.remote_mac),
2508                                       self.pg7.remote_ip4n,
2509                                       is_static=1)
2510         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2511                                       mactobinary(self.pg8.remote_mac),
2512                                       self.pg8.remote_ip4n,
2513                                       is_static=1)
2514
2515         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2516                                    dst_address_length=32,
2517                                    next_hop_address=self.pg7.remote_ip4n,
2518                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2519         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2520                                    dst_address_length=32,
2521                                    next_hop_address=self.pg8.remote_ip4n,
2522                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2523
2524         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2525         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2526         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2527                                                   is_inside=0)
2528
2529         # out2in
2530         pkts = self.create_stream_out(self.pg8)
2531         self.pg8.add_stream(pkts)
2532         self.pg_enable_capture(self.pg_interfaces)
2533         self.pg_start()
2534         capture = self.pg7.get_capture(len(pkts))
2535         self.verify_capture_in(capture, self.pg7)
2536
2537         # in2out
2538         pkts = self.create_stream_in(self.pg7, self.pg8)
2539         self.pg7.add_stream(pkts)
2540         self.pg_enable_capture(self.pg_interfaces)
2541         self.pg_start()
2542         capture = self.pg8.get_capture(len(pkts))
2543         self.verify_capture_out(capture, self.nat_addr, True)
2544
2545     def test_static_with_port_ipless_interfaces(self):
2546         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2547
2548         self.tcp_port_out = 30606
2549         self.udp_port_out = 30607
2550         self.icmp_id_out = 30608
2551
2552         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2553                                       mactobinary(self.pg7.remote_mac),
2554                                       self.pg7.remote_ip4n,
2555                                       is_static=1)
2556         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2557                                       mactobinary(self.pg8.remote_mac),
2558                                       self.pg8.remote_ip4n,
2559                                       is_static=1)
2560
2561         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2562                                    dst_address_length=32,
2563                                    next_hop_address=self.pg7.remote_ip4n,
2564                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2565         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2566                                    dst_address_length=32,
2567                                    next_hop_address=self.pg8.remote_ip4n,
2568                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2569
2570         self.nat44_add_address(self.nat_addr)
2571         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2572                                       self.tcp_port_in, self.tcp_port_out,
2573                                       proto=IP_PROTOS.tcp)
2574         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2575                                       self.udp_port_in, self.udp_port_out,
2576                                       proto=IP_PROTOS.udp)
2577         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2578                                       self.icmp_id_in, self.icmp_id_out,
2579                                       proto=IP_PROTOS.icmp)
2580         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2581         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2582                                                   is_inside=0)
2583
2584         # out2in
2585         pkts = self.create_stream_out(self.pg8)
2586         self.pg8.add_stream(pkts)
2587         self.pg_enable_capture(self.pg_interfaces)
2588         self.pg_start()
2589         capture = self.pg7.get_capture(len(pkts))
2590         self.verify_capture_in(capture, self.pg7)
2591
2592         # in2out
2593         pkts = self.create_stream_in(self.pg7, self.pg8)
2594         self.pg7.add_stream(pkts)
2595         self.pg_enable_capture(self.pg_interfaces)
2596         self.pg_start()
2597         capture = self.pg8.get_capture(len(pkts))
2598         self.verify_capture_out(capture)
2599
2600     def test_static_unknown_proto(self):
2601         """ 1:1 NAT translate packet with unknown protocol """
2602         nat_ip = "10.0.0.10"
2603         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2604         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2605         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2606                                                   is_inside=0)
2607
2608         # in2out
2609         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2610              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2611              GRE() /
2612              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2613              TCP(sport=1234, dport=1234))
2614         self.pg0.add_stream(p)
2615         self.pg_enable_capture(self.pg_interfaces)
2616         self.pg_start()
2617         p = self.pg1.get_capture(1)
2618         packet = p[0]
2619         try:
2620             self.assertEqual(packet[IP].src, nat_ip)
2621             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2622             self.assertTrue(packet.haslayer(GRE))
2623             self.check_ip_checksum(packet)
2624         except:
2625             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2626             raise
2627
2628         # out2in
2629         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2630              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2631              GRE() /
2632              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2633              TCP(sport=1234, dport=1234))
2634         self.pg1.add_stream(p)
2635         self.pg_enable_capture(self.pg_interfaces)
2636         self.pg_start()
2637         p = self.pg0.get_capture(1)
2638         packet = p[0]
2639         try:
2640             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2641             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2642             self.assertTrue(packet.haslayer(GRE))
2643             self.check_ip_checksum(packet)
2644         except:
2645             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2646             raise
2647
2648     def test_hairpinning_static_unknown_proto(self):
2649         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2650
2651         host = self.pg0.remote_hosts[0]
2652         server = self.pg0.remote_hosts[1]
2653
2654         host_nat_ip = "10.0.0.10"
2655         server_nat_ip = "10.0.0.11"
2656
2657         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2658         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2659         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2660         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2661                                                   is_inside=0)
2662
2663         # host to server
2664         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2665              IP(src=host.ip4, dst=server_nat_ip) /
2666              GRE() /
2667              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2668              TCP(sport=1234, dport=1234))
2669         self.pg0.add_stream(p)
2670         self.pg_enable_capture(self.pg_interfaces)
2671         self.pg_start()
2672         p = self.pg0.get_capture(1)
2673         packet = p[0]
2674         try:
2675             self.assertEqual(packet[IP].src, host_nat_ip)
2676             self.assertEqual(packet[IP].dst, server.ip4)
2677             self.assertTrue(packet.haslayer(GRE))
2678             self.check_ip_checksum(packet)
2679         except:
2680             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2681             raise
2682
2683         # server to host
2684         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2685              IP(src=server.ip4, dst=host_nat_ip) /
2686              GRE() /
2687              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2688              TCP(sport=1234, dport=1234))
2689         self.pg0.add_stream(p)
2690         self.pg_enable_capture(self.pg_interfaces)
2691         self.pg_start()
2692         p = self.pg0.get_capture(1)
2693         packet = p[0]
2694         try:
2695             self.assertEqual(packet[IP].src, server_nat_ip)
2696             self.assertEqual(packet[IP].dst, host.ip4)
2697             self.assertTrue(packet.haslayer(GRE))
2698             self.check_ip_checksum(packet)
2699         except:
2700             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2701             raise
2702
2703     def test_unknown_proto(self):
2704         """ NAT44 translate packet with unknown protocol """
2705         self.nat44_add_address(self.nat_addr)
2706         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2707         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2708                                                   is_inside=0)
2709
2710         # in2out
2711         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2712              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2713              TCP(sport=self.tcp_port_in, dport=20))
2714         self.pg0.add_stream(p)
2715         self.pg_enable_capture(self.pg_interfaces)
2716         self.pg_start()
2717         p = self.pg1.get_capture(1)
2718
2719         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2720              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2721              GRE() /
2722              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2723              TCP(sport=1234, dport=1234))
2724         self.pg0.add_stream(p)
2725         self.pg_enable_capture(self.pg_interfaces)
2726         self.pg_start()
2727         p = self.pg1.get_capture(1)
2728         packet = p[0]
2729         try:
2730             self.assertEqual(packet[IP].src, self.nat_addr)
2731             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2732             self.assertTrue(packet.haslayer(GRE))
2733             self.check_ip_checksum(packet)
2734         except:
2735             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2736             raise
2737
2738         # out2in
2739         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2740              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2741              GRE() /
2742              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2743              TCP(sport=1234, dport=1234))
2744         self.pg1.add_stream(p)
2745         self.pg_enable_capture(self.pg_interfaces)
2746         self.pg_start()
2747         p = self.pg0.get_capture(1)
2748         packet = p[0]
2749         try:
2750             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2751             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2752             self.assertTrue(packet.haslayer(GRE))
2753             self.check_ip_checksum(packet)
2754         except:
2755             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2756             raise
2757
2758     def test_hairpinning_unknown_proto(self):
2759         """ NAT44 translate packet with unknown protocol - hairpinning """
2760         host = self.pg0.remote_hosts[0]
2761         server = self.pg0.remote_hosts[1]
2762         host_in_port = 1234
2763         host_out_port = 0
2764         server_in_port = 5678
2765         server_out_port = 8765
2766         server_nat_ip = "10.0.0.11"
2767
2768         self.nat44_add_address(self.nat_addr)
2769         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2770         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2771                                                   is_inside=0)
2772
2773         # add static mapping for server
2774         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2775
2776         # host to server
2777         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2778              IP(src=host.ip4, dst=server_nat_ip) /
2779              TCP(sport=host_in_port, dport=server_out_port))
2780         self.pg0.add_stream(p)
2781         self.pg_enable_capture(self.pg_interfaces)
2782         self.pg_start()
2783         capture = self.pg0.get_capture(1)
2784
2785         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2786              IP(src=host.ip4, dst=server_nat_ip) /
2787              GRE() /
2788              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2789              TCP(sport=1234, dport=1234))
2790         self.pg0.add_stream(p)
2791         self.pg_enable_capture(self.pg_interfaces)
2792         self.pg_start()
2793         p = self.pg0.get_capture(1)
2794         packet = p[0]
2795         try:
2796             self.assertEqual(packet[IP].src, self.nat_addr)
2797             self.assertEqual(packet[IP].dst, server.ip4)
2798             self.assertTrue(packet.haslayer(GRE))
2799             self.check_ip_checksum(packet)
2800         except:
2801             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2802             raise
2803
2804         # server to host
2805         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2806              IP(src=server.ip4, dst=self.nat_addr) /
2807              GRE() /
2808              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2809              TCP(sport=1234, dport=1234))
2810         self.pg0.add_stream(p)
2811         self.pg_enable_capture(self.pg_interfaces)
2812         self.pg_start()
2813         p = self.pg0.get_capture(1)
2814         packet = p[0]
2815         try:
2816             self.assertEqual(packet[IP].src, server_nat_ip)
2817             self.assertEqual(packet[IP].dst, host.ip4)
2818             self.assertTrue(packet.haslayer(GRE))
2819             self.check_ip_checksum(packet)
2820         except:
2821             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2822             raise
2823
2824     def test_output_feature(self):
2825         """ NAT44 interface output feature (in2out postrouting) """
2826         self.nat44_add_address(self.nat_addr)
2827         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2828         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2829         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2830                                                          is_inside=0)
2831
2832         # in2out
2833         pkts = self.create_stream_in(self.pg0, self.pg3)
2834         self.pg0.add_stream(pkts)
2835         self.pg_enable_capture(self.pg_interfaces)
2836         self.pg_start()
2837         capture = self.pg3.get_capture(len(pkts))
2838         self.verify_capture_out(capture)
2839
2840         # out2in
2841         pkts = self.create_stream_out(self.pg3)
2842         self.pg3.add_stream(pkts)
2843         self.pg_enable_capture(self.pg_interfaces)
2844         self.pg_start()
2845         capture = self.pg0.get_capture(len(pkts))
2846         self.verify_capture_in(capture, self.pg0)
2847
2848         # from non-NAT interface to NAT inside interface
2849         pkts = self.create_stream_in(self.pg2, self.pg0)
2850         self.pg2.add_stream(pkts)
2851         self.pg_enable_capture(self.pg_interfaces)
2852         self.pg_start()
2853         capture = self.pg0.get_capture(len(pkts))
2854         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2855
2856     def test_output_feature_vrf_aware(self):
2857         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2858         nat_ip_vrf10 = "10.0.0.10"
2859         nat_ip_vrf20 = "10.0.0.20"
2860
2861         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2862                                    dst_address_length=32,
2863                                    next_hop_address=self.pg3.remote_ip4n,
2864                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2865                                    table_id=10)
2866         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2867                                    dst_address_length=32,
2868                                    next_hop_address=self.pg3.remote_ip4n,
2869                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2870                                    table_id=20)
2871
2872         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2873         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2874         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2875         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2876         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2877                                                          is_inside=0)
2878
2879         # in2out VRF 10
2880         pkts = self.create_stream_in(self.pg4, self.pg3)
2881         self.pg4.add_stream(pkts)
2882         self.pg_enable_capture(self.pg_interfaces)
2883         self.pg_start()
2884         capture = self.pg3.get_capture(len(pkts))
2885         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2886
2887         # out2in VRF 10
2888         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2889         self.pg3.add_stream(pkts)
2890         self.pg_enable_capture(self.pg_interfaces)
2891         self.pg_start()
2892         capture = self.pg4.get_capture(len(pkts))
2893         self.verify_capture_in(capture, self.pg4)
2894
2895         # in2out VRF 20
2896         pkts = self.create_stream_in(self.pg6, self.pg3)
2897         self.pg6.add_stream(pkts)
2898         self.pg_enable_capture(self.pg_interfaces)
2899         self.pg_start()
2900         capture = self.pg3.get_capture(len(pkts))
2901         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2902
2903         # out2in VRF 20
2904         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2905         self.pg3.add_stream(pkts)
2906         self.pg_enable_capture(self.pg_interfaces)
2907         self.pg_start()
2908         capture = self.pg6.get_capture(len(pkts))
2909         self.verify_capture_in(capture, self.pg6)
2910
2911     def test_output_feature_hairpinning(self):
2912         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2913         host = self.pg0.remote_hosts[0]
2914         server = self.pg0.remote_hosts[1]
2915         host_in_port = 1234
2916         host_out_port = 0
2917         server_in_port = 5678
2918         server_out_port = 8765
2919
2920         self.nat44_add_address(self.nat_addr)
2921         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2922         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2923                                                          is_inside=0)
2924
2925         # add static mapping for server
2926         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2927                                       server_in_port, server_out_port,
2928                                       proto=IP_PROTOS.tcp)
2929
2930         # send packet from host to server
2931         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2932              IP(src=host.ip4, dst=self.nat_addr) /
2933              TCP(sport=host_in_port, dport=server_out_port))
2934         self.pg0.add_stream(p)
2935         self.pg_enable_capture(self.pg_interfaces)
2936         self.pg_start()
2937         capture = self.pg0.get_capture(1)
2938         p = capture[0]
2939         try:
2940             ip = p[IP]
2941             tcp = p[TCP]
2942             self.assertEqual(ip.src, self.nat_addr)
2943             self.assertEqual(ip.dst, server.ip4)
2944             self.assertNotEqual(tcp.sport, host_in_port)
2945             self.assertEqual(tcp.dport, server_in_port)
2946             self.check_tcp_checksum(p)
2947             host_out_port = tcp.sport
2948         except:
2949             self.logger.error(ppp("Unexpected or invalid packet:", p))
2950             raise
2951
2952         # send reply from server to host
2953         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2954              IP(src=server.ip4, dst=self.nat_addr) /
2955              TCP(sport=server_in_port, dport=host_out_port))
2956         self.pg0.add_stream(p)
2957         self.pg_enable_capture(self.pg_interfaces)
2958         self.pg_start()
2959         capture = self.pg0.get_capture(1)
2960         p = capture[0]
2961         try:
2962             ip = p[IP]
2963             tcp = p[TCP]
2964             self.assertEqual(ip.src, self.nat_addr)
2965             self.assertEqual(ip.dst, host.ip4)
2966             self.assertEqual(tcp.sport, server_out_port)
2967             self.assertEqual(tcp.dport, host_in_port)
2968             self.check_tcp_checksum(p)
2969         except:
2970             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2971             raise
2972
2973     def test_one_armed_nat44(self):
2974         """ One armed NAT44 """
2975         remote_host = self.pg9.remote_hosts[0]
2976         local_host = self.pg9.remote_hosts[1]
2977         external_port = 0
2978
2979         self.nat44_add_address(self.nat_addr)
2980         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2981         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2982                                                   is_inside=0)
2983
2984         # in2out
2985         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2986              IP(src=local_host.ip4, dst=remote_host.ip4) /
2987              TCP(sport=12345, dport=80))
2988         self.pg9.add_stream(p)
2989         self.pg_enable_capture(self.pg_interfaces)
2990         self.pg_start()
2991         capture = self.pg9.get_capture(1)
2992         p = capture[0]
2993         try:
2994             ip = p[IP]
2995             tcp = p[TCP]
2996             self.assertEqual(ip.src, self.nat_addr)
2997             self.assertEqual(ip.dst, remote_host.ip4)
2998             self.assertNotEqual(tcp.sport, 12345)
2999             external_port = tcp.sport
3000             self.assertEqual(tcp.dport, 80)
3001             self.check_tcp_checksum(p)
3002             self.check_ip_checksum(p)
3003         except:
3004             self.logger.error(ppp("Unexpected or invalid packet:", p))
3005             raise
3006
3007         # out2in
3008         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3009              IP(src=remote_host.ip4, dst=self.nat_addr) /
3010              TCP(sport=80, dport=external_port))
3011         self.pg9.add_stream(p)
3012         self.pg_enable_capture(self.pg_interfaces)
3013         self.pg_start()
3014         capture = self.pg9.get_capture(1)
3015         p = capture[0]
3016         try:
3017             ip = p[IP]
3018             tcp = p[TCP]
3019             self.assertEqual(ip.src, remote_host.ip4)
3020             self.assertEqual(ip.dst, local_host.ip4)
3021             self.assertEqual(tcp.sport, 80)
3022             self.assertEqual(tcp.dport, 12345)
3023             self.check_tcp_checksum(p)
3024             self.check_ip_checksum(p)
3025         except:
3026             self.logger.error(ppp("Unexpected or invalid packet:", p))
3027             raise
3028
3029     def test_del_session(self):
3030         """ Delete NAT44 session """
3031         self.nat44_add_address(self.nat_addr)
3032         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3033         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3034                                                   is_inside=0)
3035
3036         pkts = self.create_stream_in(self.pg0, self.pg1)
3037         self.pg0.add_stream(pkts)
3038         self.pg_enable_capture(self.pg_interfaces)
3039         self.pg_start()
3040         capture = self.pg1.get_capture(len(pkts))
3041
3042         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3043         nsessions = len(sessions)
3044
3045         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3046                                     sessions[0].inside_port,
3047                                     sessions[0].protocol)
3048         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3049                                     sessions[1].outside_port,
3050                                     sessions[1].protocol,
3051                                     is_in=0)
3052
3053         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3054         self.assertEqual(nsessions - len(sessions), 2)
3055
3056     def test_set_get_reass(self):
3057         """ NAT44 set/get virtual fragmentation reassembly """
3058         reas_cfg1 = self.vapi.nat_get_reass()
3059
3060         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3061                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3062                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3063
3064         reas_cfg2 = self.vapi.nat_get_reass()
3065
3066         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3067         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3068         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3069
3070         self.vapi.nat_set_reass(drop_frag=1)
3071         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3072
3073     def test_frag_in_order(self):
3074         """ NAT44 translate fragments arriving in order """
3075         self.nat44_add_address(self.nat_addr)
3076         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3077         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3078                                                   is_inside=0)
3079
3080         data = "A" * 4 + "B" * 16 + "C" * 3
3081         self.tcp_port_in = random.randint(1025, 65535)
3082
3083         reass = self.vapi.nat_reass_dump()
3084         reass_n_start = len(reass)
3085
3086         # in2out
3087         pkts = self.create_stream_frag(self.pg0,
3088                                        self.pg1.remote_ip4,
3089                                        self.tcp_port_in,
3090                                        20,
3091                                        data)
3092         self.pg0.add_stream(pkts)
3093         self.pg_enable_capture(self.pg_interfaces)
3094         self.pg_start()
3095         frags = self.pg1.get_capture(len(pkts))
3096         p = self.reass_frags_and_verify(frags,
3097                                         self.nat_addr,
3098                                         self.pg1.remote_ip4)
3099         self.assertEqual(p[TCP].dport, 20)
3100         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3101         self.tcp_port_out = p[TCP].sport
3102         self.assertEqual(data, p[Raw].load)
3103
3104         # out2in
3105         pkts = self.create_stream_frag(self.pg1,
3106                                        self.nat_addr,
3107                                        20,
3108                                        self.tcp_port_out,
3109                                        data)
3110         self.pg1.add_stream(pkts)
3111         self.pg_enable_capture(self.pg_interfaces)
3112         self.pg_start()
3113         frags = self.pg0.get_capture(len(pkts))
3114         p = self.reass_frags_and_verify(frags,
3115                                         self.pg1.remote_ip4,
3116                                         self.pg0.remote_ip4)
3117         self.assertEqual(p[TCP].sport, 20)
3118         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3119         self.assertEqual(data, p[Raw].load)
3120
3121         reass = self.vapi.nat_reass_dump()
3122         reass_n_end = len(reass)
3123
3124         self.assertEqual(reass_n_end - reass_n_start, 2)
3125
3126     def test_reass_hairpinning(self):
3127         """ NAT44 fragments hairpinning """
3128         host = self.pg0.remote_hosts[0]
3129         server = self.pg0.remote_hosts[1]
3130         host_in_port = random.randint(1025, 65535)
3131         host_out_port = 0
3132         server_in_port = random.randint(1025, 65535)
3133         server_out_port = random.randint(1025, 65535)
3134         data = "A" * 4 + "B" * 16 + "C" * 3
3135
3136         self.nat44_add_address(self.nat_addr)
3137         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3138         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3139                                                   is_inside=0)
3140         # add static mapping for server
3141         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3142                                       server_in_port, server_out_port,
3143                                       proto=IP_PROTOS.tcp)
3144
3145         # send packet from host to server
3146         pkts = self.create_stream_frag(self.pg0,
3147                                        self.nat_addr,
3148                                        host_in_port,
3149                                        server_out_port,
3150                                        data)
3151         self.pg0.add_stream(pkts)
3152         self.pg_enable_capture(self.pg_interfaces)
3153         self.pg_start()
3154         frags = self.pg0.get_capture(len(pkts))
3155         p = self.reass_frags_and_verify(frags,
3156                                         self.nat_addr,
3157                                         server.ip4)
3158         self.assertNotEqual(p[TCP].sport, host_in_port)
3159         self.assertEqual(p[TCP].dport, server_in_port)
3160         self.assertEqual(data, p[Raw].load)
3161
3162     def test_frag_out_of_order(self):
3163         """ NAT44 translate fragments arriving out of order """
3164         self.nat44_add_address(self.nat_addr)
3165         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3166         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3167                                                   is_inside=0)
3168
3169         data = "A" * 4 + "B" * 16 + "C" * 3
3170         random.randint(1025, 65535)
3171
3172         # in2out
3173         pkts = self.create_stream_frag(self.pg0,
3174                                        self.pg1.remote_ip4,
3175                                        self.tcp_port_in,
3176                                        20,
3177                                        data)
3178         pkts.reverse()
3179         self.pg0.add_stream(pkts)
3180         self.pg_enable_capture(self.pg_interfaces)
3181         self.pg_start()
3182         frags = self.pg1.get_capture(len(pkts))
3183         p = self.reass_frags_and_verify(frags,
3184                                         self.nat_addr,
3185                                         self.pg1.remote_ip4)
3186         self.assertEqual(p[TCP].dport, 20)
3187         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3188         self.tcp_port_out = p[TCP].sport
3189         self.assertEqual(data, p[Raw].load)
3190
3191         # out2in
3192         pkts = self.create_stream_frag(self.pg1,
3193                                        self.nat_addr,
3194                                        20,
3195                                        self.tcp_port_out,
3196                                        data)
3197         pkts.reverse()
3198         self.pg1.add_stream(pkts)
3199         self.pg_enable_capture(self.pg_interfaces)
3200         self.pg_start()
3201         frags = self.pg0.get_capture(len(pkts))
3202         p = self.reass_frags_and_verify(frags,
3203                                         self.pg1.remote_ip4,
3204                                         self.pg0.remote_ip4)
3205         self.assertEqual(p[TCP].sport, 20)
3206         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3207         self.assertEqual(data, p[Raw].load)
3208
3209     def test_port_restricted(self):
3210         """ Port restricted NAT44 (MAP-E CE) """
3211         self.nat44_add_address(self.nat_addr)
3212         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3213         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3214                                                   is_inside=0)
3215         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3216                       "psid-offset 6 psid-len 6")
3217
3218         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3219              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3220              TCP(sport=4567, dport=22))
3221         self.pg0.add_stream(p)
3222         self.pg_enable_capture(self.pg_interfaces)
3223         self.pg_start()
3224         capture = self.pg1.get_capture(1)
3225         p = capture[0]
3226         try:
3227             ip = p[IP]
3228             tcp = p[TCP]
3229             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3230             self.assertEqual(ip.src, self.nat_addr)
3231             self.assertEqual(tcp.dport, 22)
3232             self.assertNotEqual(tcp.sport, 4567)
3233             self.assertEqual((tcp.sport >> 6) & 63, 10)
3234             self.check_tcp_checksum(p)
3235             self.check_ip_checksum(p)
3236         except:
3237             self.logger.error(ppp("Unexpected or invalid packet:", p))
3238             raise
3239
3240     def test_twice_nat(self):
3241         """ Twice NAT44 """
3242         twice_nat_addr = '10.0.1.3'
3243         port_in = 8080
3244         port_out = 80
3245         eh_port_out = 4567
3246         eh_port_in = 0
3247         self.nat44_add_address(self.nat_addr)
3248         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3249         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3250                                       port_in, port_out, proto=IP_PROTOS.tcp,
3251                                       twice_nat=1)
3252         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3253         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3254                                                   is_inside=0)
3255
3256         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3257              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3258              TCP(sport=eh_port_out, dport=port_out))
3259         self.pg1.add_stream(p)
3260         self.pg_enable_capture(self.pg_interfaces)
3261         self.pg_start()
3262         capture = self.pg0.get_capture(1)
3263         p = capture[0]
3264         try:
3265             ip = p[IP]
3266             tcp = p[TCP]
3267             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3268             self.assertEqual(ip.src, twice_nat_addr)
3269             self.assertEqual(tcp.dport, port_in)
3270             self.assertNotEqual(tcp.sport, eh_port_out)
3271             eh_port_in = tcp.sport
3272             self.check_tcp_checksum(p)
3273             self.check_ip_checksum(p)
3274         except:
3275             self.logger.error(ppp("Unexpected or invalid packet:", p))
3276             raise
3277
3278         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3279              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3280              TCP(sport=port_in, dport=eh_port_in))
3281         self.pg0.add_stream(p)
3282         self.pg_enable_capture(self.pg_interfaces)
3283         self.pg_start()
3284         capture = self.pg1.get_capture(1)
3285         p = capture[0]
3286         try:
3287             ip = p[IP]
3288             tcp = p[TCP]
3289             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3290             self.assertEqual(ip.src, self.nat_addr)
3291             self.assertEqual(tcp.dport, eh_port_out)
3292             self.assertEqual(tcp.sport, port_out)
3293             self.check_tcp_checksum(p)
3294             self.check_ip_checksum(p)
3295         except:
3296             self.logger.error(ppp("Unexpected or invalid packet:", p))
3297             raise
3298
3299     def test_twice_nat_lb(self):
3300         """ Twice NAT44 local service load balancing """
3301         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3302         twice_nat_addr = '10.0.1.3'
3303         local_port = 8080
3304         external_port = 80
3305         eh_port_out = 4567
3306         eh_port_in = 0
3307         server1 = self.pg0.remote_hosts[0]
3308         server2 = self.pg0.remote_hosts[1]
3309
3310         locals = [{'addr': server1.ip4n,
3311                    'port': local_port,
3312                    'probability': 50},
3313                   {'addr': server2.ip4n,
3314                    'port': local_port,
3315                    'probability': 50}]
3316
3317         self.nat44_add_address(self.nat_addr)
3318         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3319
3320         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3321                                                   external_port,
3322                                                   IP_PROTOS.tcp,
3323                                                   twice_nat=1,
3324                                                   local_num=len(locals),
3325                                                   locals=locals)
3326         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3327         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3328                                                   is_inside=0)
3329
3330         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3331              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3332              TCP(sport=eh_port_out, dport=external_port))
3333         self.pg1.add_stream(p)
3334         self.pg_enable_capture(self.pg_interfaces)
3335         self.pg_start()
3336         capture = self.pg0.get_capture(1)
3337         p = capture[0]
3338         server = None
3339         try:
3340             ip = p[IP]
3341             tcp = p[TCP]
3342             self.assertEqual(ip.src, twice_nat_addr)
3343             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3344             if ip.dst == server1.ip4:
3345                 server = server1
3346             else:
3347                 server = server2
3348             self.assertNotEqual(tcp.sport, eh_port_out)
3349             eh_port_in = tcp.sport
3350             self.assertEqual(tcp.dport, local_port)
3351             self.check_tcp_checksum(p)
3352             self.check_ip_checksum(p)
3353         except:
3354             self.logger.error(ppp("Unexpected or invalid packet:", p))
3355             raise
3356
3357         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3358              IP(src=server.ip4, dst=twice_nat_addr) /
3359              TCP(sport=local_port, dport=eh_port_in))
3360         self.pg0.add_stream(p)
3361         self.pg_enable_capture(self.pg_interfaces)
3362         self.pg_start()
3363         capture = self.pg1.get_capture(1)
3364         p = capture[0]
3365         try:
3366             ip = p[IP]
3367             tcp = p[TCP]
3368             self.assertEqual(ip.src, self.nat_addr)
3369             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3370             self.assertEqual(tcp.sport, external_port)
3371             self.assertEqual(tcp.dport, eh_port_out)
3372             self.check_tcp_checksum(p)
3373             self.check_ip_checksum(p)
3374         except:
3375             self.logger.error(ppp("Unexpected or invalid packet:", p))
3376             raise
3377
3378     def test_twice_nat_interface_addr(self):
3379         """ Acquire twice NAT44 addresses from interface """
3380         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3381
3382         # no address in NAT pool
3383         adresses = self.vapi.nat44_address_dump()
3384         self.assertEqual(0, len(adresses))
3385
3386         # configure interface address and check NAT address pool
3387         self.pg7.config_ip4()
3388         adresses = self.vapi.nat44_address_dump()
3389         self.assertEqual(1, len(adresses))
3390         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3391         self.assertEqual(adresses[0].twice_nat, 1)
3392
3393         # remove interface address and check NAT address pool
3394         self.pg7.unconfig_ip4()
3395         adresses = self.vapi.nat44_address_dump()
3396         self.assertEqual(0, len(adresses))
3397
3398     def test_ipfix_max_frags(self):
3399         """ IPFIX logging maximum fragments pending reassembly exceeded """
3400         self.nat44_add_address(self.nat_addr)
3401         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3402         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3403                                                   is_inside=0)
3404         self.vapi.nat_set_reass(max_frag=0)
3405         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3406                                      src_address=self.pg3.local_ip4n,
3407                                      path_mtu=512,
3408                                      template_interval=10)
3409         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3410                             src_port=self.ipfix_src_port)
3411
3412         data = "A" * 4 + "B" * 16 + "C" * 3
3413         self.tcp_port_in = random.randint(1025, 65535)
3414         pkts = self.create_stream_frag(self.pg0,
3415                                        self.pg1.remote_ip4,
3416                                        self.tcp_port_in,
3417                                        20,
3418                                        data)
3419         self.pg0.add_stream(pkts[-1])
3420         self.pg_enable_capture(self.pg_interfaces)
3421         self.pg_start()
3422         frags = self.pg1.get_capture(0)
3423         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3424         capture = self.pg3.get_capture(9)
3425         ipfix = IPFIXDecoder()
3426         # first load template
3427         for p in capture:
3428             self.assertTrue(p.haslayer(IPFIX))
3429             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3430             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3431             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3432             self.assertEqual(p[UDP].dport, 4739)
3433             self.assertEqual(p[IPFIX].observationDomainID,
3434                              self.ipfix_domain_id)
3435             if p.haslayer(Template):
3436                 ipfix.add_template(p.getlayer(Template))
3437         # verify events in data set
3438         for p in capture:
3439             if p.haslayer(Data):
3440                 data = ipfix.decode_data_set(p.getlayer(Set))
3441                 self.verify_ipfix_max_fragments_ip4(data, 0,
3442                                                     self.pg0.remote_ip4n)
3443
3444     def tearDown(self):
3445         super(TestNAT44, self).tearDown()
3446         if not self.vpp_dead:
3447             self.logger.info(self.vapi.cli("show nat44 verbose"))
3448             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3449             self.vapi.cli("nat addr-port-assignment-alg default")
3450             self.clear_nat44()
3451
3452
3453 class TestNAT44Out2InDPO(MethodHolder):
3454     """ NAT44 Test Cases using out2in DPO """
3455
3456     @classmethod
3457     def setUpConstants(cls):
3458         super(TestNAT44Out2InDPO, cls).setUpConstants()
3459         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3460
3461     @classmethod
3462     def setUpClass(cls):
3463         super(TestNAT44Out2InDPO, cls).setUpClass()
3464
3465         try:
3466             cls.tcp_port_in = 6303
3467             cls.tcp_port_out = 6303
3468             cls.udp_port_in = 6304
3469             cls.udp_port_out = 6304
3470             cls.icmp_id_in = 6305
3471             cls.icmp_id_out = 6305
3472             cls.nat_addr = '10.0.0.3'
3473             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3474             cls.dst_ip4 = '192.168.70.1'
3475
3476             cls.create_pg_interfaces(range(2))
3477
3478             cls.pg0.admin_up()
3479             cls.pg0.config_ip4()
3480             cls.pg0.resolve_arp()
3481
3482             cls.pg1.admin_up()
3483             cls.pg1.config_ip6()
3484             cls.pg1.resolve_ndp()
3485
3486             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3487                                       dst_address_length=0,
3488                                       next_hop_address=cls.pg1.remote_ip6n,
3489                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3490
3491         except Exception:
3492             super(TestNAT44Out2InDPO, cls).tearDownClass()
3493             raise
3494
3495     def configure_xlat(self):
3496         self.dst_ip6_pfx = '1:2:3::'
3497         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3498                                               self.dst_ip6_pfx)
3499         self.dst_ip6_pfx_len = 96
3500         self.src_ip6_pfx = '4:5:6::'
3501         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3502                                               self.src_ip6_pfx)
3503         self.src_ip6_pfx_len = 96
3504         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3505                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3506                                  '\x00\x00\x00\x00', 0, is_translation=1,
3507                                  is_rfc6052=1)
3508
3509     def test_464xlat_ce(self):
3510         """ Test 464XLAT CE with NAT44 """
3511
3512         self.configure_xlat()
3513
3514         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3515         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3516
3517         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3518                                        self.dst_ip6_pfx_len)
3519         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3520                                        self.src_ip6_pfx_len)
3521
3522         try:
3523             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3524             self.pg0.add_stream(pkts)
3525             self.pg_enable_capture(self.pg_interfaces)
3526             self.pg_start()
3527             capture = self.pg1.get_capture(len(pkts))
3528             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3529                                         dst_ip=out_src_ip6)
3530
3531             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3532                                               out_dst_ip6)
3533             self.pg1.add_stream(pkts)
3534             self.pg_enable_capture(self.pg_interfaces)
3535             self.pg_start()
3536             capture = self.pg0.get_capture(len(pkts))
3537             self.verify_capture_in(capture, self.pg0)
3538         finally:
3539             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3540                                                       is_add=0)
3541             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3542                                                   self.nat_addr_n, is_add=0)
3543
3544     def test_464xlat_ce_no_nat(self):
3545         """ Test 464XLAT CE without NAT44 """
3546
3547         self.configure_xlat()
3548
3549         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3550                                        self.dst_ip6_pfx_len)
3551         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3552                                        self.src_ip6_pfx_len)
3553
3554         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3555         self.pg0.add_stream(pkts)
3556         self.pg_enable_capture(self.pg_interfaces)
3557         self.pg_start()
3558         capture = self.pg1.get_capture(len(pkts))
3559         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3560                                     nat_ip=out_dst_ip6, same_port=True)
3561
3562         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3563         self.pg1.add_stream(pkts)
3564         self.pg_enable_capture(self.pg_interfaces)
3565         self.pg_start()
3566         capture = self.pg0.get_capture(len(pkts))
3567         self.verify_capture_in(capture, self.pg0)
3568
3569
3570 class TestDeterministicNAT(MethodHolder):
3571     """ Deterministic NAT Test Cases """
3572
3573     @classmethod
3574     def setUpConstants(cls):
3575         super(TestDeterministicNAT, cls).setUpConstants()
3576         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3577
3578     @classmethod
3579     def setUpClass(cls):
3580         super(TestDeterministicNAT, cls).setUpClass()
3581
3582         try:
3583             cls.tcp_port_in = 6303
3584             cls.tcp_external_port = 6303
3585             cls.udp_port_in = 6304
3586             cls.udp_external_port = 6304
3587             cls.icmp_id_in = 6305
3588             cls.nat_addr = '10.0.0.3'
3589
3590             cls.create_pg_interfaces(range(3))
3591             cls.interfaces = list(cls.pg_interfaces)
3592
3593             for i in cls.interfaces:
3594                 i.admin_up()
3595                 i.config_ip4()
3596                 i.resolve_arp()
3597
3598             cls.pg0.generate_remote_hosts(2)
3599             cls.pg0.configure_ipv4_neighbors()
3600
3601         except Exception:
3602             super(TestDeterministicNAT, cls).tearDownClass()
3603             raise
3604
3605     def create_stream_in(self, in_if, out_if, ttl=64):
3606         """
3607         Create packet stream for inside network
3608
3609         :param in_if: Inside interface
3610         :param out_if: Outside interface
3611         :param ttl: TTL of generated packets
3612         """
3613         pkts = []
3614         # TCP
3615         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3616              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3617              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3618         pkts.append(p)
3619
3620         # UDP
3621         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3622              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3623              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3624         pkts.append(p)
3625
3626         # ICMP
3627         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3628              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3629              ICMP(id=self.icmp_id_in, type='echo-request'))
3630         pkts.append(p)
3631
3632         return pkts
3633
3634     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3635         """
3636         Create packet stream for outside network
3637
3638         :param out_if: Outside interface
3639         :param dst_ip: Destination IP address (Default use global NAT address)
3640         :param ttl: TTL of generated packets
3641         """
3642         if dst_ip is None:
3643             dst_ip = self.nat_addr
3644         pkts = []
3645         # TCP
3646         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3647              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3648              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3649         pkts.append(p)
3650
3651         # UDP
3652         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3653              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3654              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3655         pkts.append(p)
3656
3657         # ICMP
3658         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3659              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3660              ICMP(id=self.icmp_external_id, type='echo-reply'))
3661         pkts.append(p)
3662
3663         return pkts
3664
3665     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3666         """
3667         Verify captured packets on outside network
3668
3669         :param capture: Captured packets
3670         :param nat_ip: Translated IP address (Default use global NAT address)
3671         :param same_port: Sorce port number is not translated (Default False)
3672         :param packet_num: Expected number of packets (Default 3)
3673         """
3674         if nat_ip is None:
3675             nat_ip = self.nat_addr
3676         self.assertEqual(packet_num, len(capture))
3677         for packet in capture:
3678             try:
3679                 self.assertEqual(packet[IP].src, nat_ip)
3680                 if packet.haslayer(TCP):
3681                     self.tcp_port_out = packet[TCP].sport
3682                 elif packet.haslayer(UDP):
3683                     self.udp_port_out = packet[UDP].sport
3684                 else:
3685                     self.icmp_external_id = packet[ICMP].id
3686             except:
3687                 self.logger.error(ppp("Unexpected or invalid packet "
3688                                       "(outside network):", packet))
3689                 raise
3690
3691     def initiate_tcp_session(self, in_if, out_if):
3692         """
3693         Initiates TCP session
3694
3695         :param in_if: Inside interface
3696         :param out_if: Outside interface
3697         """
3698         try:
3699             # SYN packet in->out
3700             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3701                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3702                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3703                      flags="S"))
3704             in_if.add_stream(p)
3705             self.pg_enable_capture(self.pg_interfaces)
3706             self.pg_start()
3707             capture = out_if.get_capture(1)
3708             p = capture[0]
3709             self.tcp_port_out = p[TCP].sport
3710
3711             # SYN + ACK packet out->in
3712             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3713                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3714                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3715                      flags="SA"))
3716             out_if.add_stream(p)
3717             self.pg_enable_capture(self.pg_interfaces)
3718             self.pg_start()
3719             in_if.get_capture(1)
3720
3721             # ACK packet in->out
3722             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3723                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3724                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3725                      flags="A"))
3726             in_if.add_stream(p)
3727             self.pg_enable_capture(self.pg_interfaces)
3728             self.pg_start()
3729             out_if.get_capture(1)
3730
3731         except:
3732             self.logger.error("TCP 3 way handshake failed")
3733             raise
3734
3735     def verify_ipfix_max_entries_per_user(self, data):
3736         """
3737         Verify IPFIX maximum entries per user exceeded event
3738
3739         :param data: Decoded IPFIX data records
3740         """
3741         self.assertEqual(1, len(data))
3742         record = data[0]
3743         # natEvent
3744         self.assertEqual(ord(record[230]), 13)
3745         # natQuotaExceededEvent
3746         self.assertEqual('\x03\x00\x00\x00', record[466])
3747         # maxEntriesPerUser
3748         self.assertEqual('\xe8\x03\x00\x00', record[473])
3749         # sourceIPv4Address
3750         self.assertEqual(self.pg0.remote_ip4n, record[8])
3751
3752     def test_deterministic_mode(self):
3753         """ NAT plugin run deterministic mode """
3754         in_addr = '172.16.255.0'
3755         out_addr = '172.17.255.50'
3756         in_addr_t = '172.16.255.20'
3757         in_addr_n = socket.inet_aton(in_addr)
3758         out_addr_n = socket.inet_aton(out_addr)
3759         in_addr_t_n = socket.inet_aton(in_addr_t)
3760         in_plen = 24
3761         out_plen = 32
3762
3763         nat_config = self.vapi.nat_show_config()
3764         self.assertEqual(1, nat_config.deterministic)
3765
3766         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3767
3768         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3769         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3770         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3771         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3772
3773         deterministic_mappings = self.vapi.nat_det_map_dump()
3774         self.assertEqual(len(deterministic_mappings), 1)
3775         dsm = deterministic_mappings[0]
3776         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3777         self.assertEqual(in_plen, dsm.in_plen)
3778         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3779         self.assertEqual(out_plen, dsm.out_plen)
3780
3781         self.clear_nat_det()
3782         deterministic_mappings = self.vapi.nat_det_map_dump()
3783         self.assertEqual(len(deterministic_mappings), 0)
3784
3785     def test_set_timeouts(self):
3786         """ Set deterministic NAT timeouts """
3787         timeouts_before = self.vapi.nat_det_get_timeouts()
3788
3789         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3790                                        timeouts_before.tcp_established + 10,
3791                                        timeouts_before.tcp_transitory + 10,
3792                                        timeouts_before.icmp + 10)
3793
3794         timeouts_after = self.vapi.nat_det_get_timeouts()
3795
3796         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3797         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3798         self.assertNotEqual(timeouts_before.tcp_established,
3799                             timeouts_after.tcp_established)
3800         self.assertNotEqual(timeouts_before.tcp_transitory,
3801                             timeouts_after.tcp_transitory)
3802
3803     def test_det_in(self):
3804         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3805
3806         nat_ip = "10.0.0.10"
3807
3808         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3809                                       32,
3810                                       socket.inet_aton(nat_ip),
3811                                       32)
3812         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3813         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3814                                                   is_inside=0)
3815
3816         # in2out
3817         pkts = self.create_stream_in(self.pg0, self.pg1)
3818         self.pg0.add_stream(pkts)
3819         self.pg_enable_capture(self.pg_interfaces)
3820         self.pg_start()
3821         capture = self.pg1.get_capture(len(pkts))
3822         self.verify_capture_out(capture, nat_ip)
3823
3824         # out2in
3825         pkts = self.create_stream_out(self.pg1, nat_ip)
3826         self.pg1.add_stream(pkts)
3827         self.pg_enable_capture(self.pg_interfaces)
3828         self.pg_start()
3829         capture = self.pg0.get_capture(len(pkts))
3830         self.verify_capture_in(capture, self.pg0)
3831
3832         # session dump test
3833         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3834         self.assertEqual(len(sessions), 3)
3835
3836         # TCP session
3837         s = sessions[0]
3838         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3839         self.assertEqual(s.in_port, self.tcp_port_in)
3840         self.assertEqual(s.out_port, self.tcp_port_out)
3841         self.assertEqual(s.ext_port, self.tcp_external_port)
3842
3843         # UDP session
3844         s = sessions[1]
3845         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3846         self.assertEqual(s.in_port, self.udp_port_in)
3847         self.assertEqual(s.out_port, self.udp_port_out)
3848         self.assertEqual(s.ext_port, self.udp_external_port)
3849
3850         # ICMP session
3851         s = sessions[2]
3852         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3853         self.assertEqual(s.in_port, self.icmp_id_in)
3854         self.assertEqual(s.out_port, self.icmp_external_id)
3855
3856     def test_multiple_users(self):
3857         """ Deterministic NAT multiple users """
3858
3859         nat_ip = "10.0.0.10"
3860         port_in = 80
3861         external_port = 6303
3862
3863         host0 = self.pg0.remote_hosts[0]
3864         host1 = self.pg0.remote_hosts[1]
3865
3866         self.vapi.nat_det_add_del_map(host0.ip4n,
3867                                       24,
3868                                       socket.inet_aton(nat_ip),
3869                                       32)
3870         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3871         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3872                                                   is_inside=0)
3873
3874         # host0 to out
3875         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3876              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3877              TCP(sport=port_in, dport=external_port))
3878         self.pg0.add_stream(p)
3879         self.pg_enable_capture(self.pg_interfaces)
3880         self.pg_start()
3881         capture = self.pg1.get_capture(1)
3882         p = capture[0]
3883         try:
3884             ip = p[IP]
3885             tcp = p[TCP]
3886             self.assertEqual(ip.src, nat_ip)
3887             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3888             self.assertEqual(tcp.dport, external_port)
3889             port_out0 = tcp.sport
3890         except:
3891             self.logger.error(ppp("Unexpected or invalid packet:", p))
3892             raise
3893
3894         # host1 to out
3895         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3896              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3897              TCP(sport=port_in, dport=external_port))
3898         self.pg0.add_stream(p)
3899         self.pg_enable_capture(self.pg_interfaces)
3900         self.pg_start()
3901         capture = self.pg1.get_capture(1)
3902         p = capture[0]
3903         try:
3904             ip = p[IP]
3905             tcp = p[TCP]
3906             self.assertEqual(ip.src, nat_ip)
3907             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3908             self.assertEqual(tcp.dport, external_port)
3909             port_out1 = tcp.sport
3910         except:
3911             self.logger.error(ppp("Unexpected or invalid packet:", p))
3912             raise
3913
3914         dms = self.vapi.nat_det_map_dump()
3915         self.assertEqual(1, len(dms))
3916         self.assertEqual(2, dms[0].ses_num)
3917
3918         # out to host0
3919         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3920              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3921              TCP(sport=external_port, dport=port_out0))
3922         self.pg1.add_stream(p)
3923         self.pg_enable_capture(self.pg_interfaces)
3924         self.pg_start()
3925         capture = self.pg0.get_capture(1)
3926         p = capture[0]
3927         try:
3928             ip = p[IP]
3929             tcp = p[TCP]
3930             self.assertEqual(ip.src, self.pg1.remote_ip4)
3931             self.assertEqual(ip.dst, host0.ip4)
3932             self.assertEqual(tcp.dport, port_in)
3933             self.assertEqual(tcp.sport, external_port)
3934         except:
3935             self.logger.error(ppp("Unexpected or invalid packet:", p))
3936             raise
3937
3938         # out to host1
3939         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3940              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3941              TCP(sport=external_port, dport=port_out1))
3942         self.pg1.add_stream(p)
3943         self.pg_enable_capture(self.pg_interfaces)
3944         self.pg_start()
3945         capture = self.pg0.get_capture(1)
3946         p = capture[0]
3947         try:
3948             ip = p[IP]
3949             tcp = p[TCP]
3950             self.assertEqual(ip.src, self.pg1.remote_ip4)
3951             self.assertEqual(ip.dst, host1.ip4)
3952             self.assertEqual(tcp.dport, port_in)
3953             self.assertEqual(tcp.sport, external_port)
3954         except:
3955             self.logger.error(ppp("Unexpected or invalid packet", p))
3956             raise
3957
3958         # session close api test
3959         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3960                                             port_out1,
3961                                             self.pg1.remote_ip4n,
3962                                             external_port)
3963         dms = self.vapi.nat_det_map_dump()
3964         self.assertEqual(dms[0].ses_num, 1)
3965
3966         self.vapi.nat_det_close_session_in(host0.ip4n,
3967                                            port_in,
3968                                            self.pg1.remote_ip4n,
3969                                            external_port)
3970         dms = self.vapi.nat_det_map_dump()
3971         self.assertEqual(dms[0].ses_num, 0)
3972
3973     def test_tcp_session_close_detection_in(self):
3974         """ Deterministic NAT TCP session close from inside network """
3975         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3976                                       32,
3977                                       socket.inet_aton(self.nat_addr),
3978                                       32)
3979         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3980         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3981                                                   is_inside=0)
3982
3983         self.initiate_tcp_session(self.pg0, self.pg1)
3984
3985         # close the session from inside
3986         try:
3987             # FIN packet in -> out
3988             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3989                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3990                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3991                      flags="F"))
3992             self.pg0.add_stream(p)
3993             self.pg_enable_capture(self.pg_interfaces)
3994             self.pg_start()
3995             self.pg1.get_capture(1)
3996
3997             pkts = []
3998
3999             # ACK packet out -> in
4000             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4001                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4002                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4003                      flags="A"))
4004             pkts.append(p)
4005
4006             # FIN packet out -> in
4007             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4008                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4009                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4010                      flags="F"))
4011             pkts.append(p)
4012
4013             self.pg1.add_stream(pkts)
4014             self.pg_enable_capture(self.pg_interfaces)
4015             self.pg_start()
4016             self.pg0.get_capture(2)
4017
4018             # ACK packet in -> out
4019             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4020                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4021                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4022                      flags="A"))
4023             self.pg0.add_stream(p)
4024             self.pg_enable_capture(self.pg_interfaces)
4025             self.pg_start()
4026             self.pg1.get_capture(1)
4027
4028             # Check if deterministic NAT44 closed the session
4029             dms = self.vapi.nat_det_map_dump()
4030             self.assertEqual(0, dms[0].ses_num)
4031         except:
4032             self.logger.error("TCP session termination failed")
4033             raise
4034
4035     def test_tcp_session_close_detection_out(self):
4036         """ Deterministic NAT TCP session close from outside network """
4037         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4038                                       32,
4039                                       socket.inet_aton(self.nat_addr),
4040                                       32)
4041         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4042         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4043                                                   is_inside=0)
4044
4045         self.initiate_tcp_session(self.pg0, self.pg1)
4046
4047         # close the session from outside
4048         try:
4049             # FIN packet out -> in
4050             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4051                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4052                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4053                      flags="F"))
4054             self.pg1.add_stream(p)
4055             self.pg_enable_capture(self.pg_interfaces)
4056             self.pg_start()
4057             self.pg0.get_capture(1)
4058
4059             pkts = []
4060
4061             # ACK packet in -> out
4062             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4063                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4064                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4065                      flags="A"))
4066             pkts.append(p)
4067
4068             # ACK packet in -> out
4069             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4070                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4071                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4072                      flags="F"))
4073             pkts.append(p)
4074
4075             self.pg0.add_stream(pkts)
4076             self.pg_enable_capture(self.pg_interfaces)
4077             self.pg_start()
4078             self.pg1.get_capture(2)
4079
4080             # ACK packet out -> in
4081             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4082                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4083                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4084                      flags="A"))
4085             self.pg1.add_stream(p)
4086             self.pg_enable_capture(self.pg_interfaces)
4087             self.pg_start()
4088             self.pg0.get_capture(1)
4089
4090             # Check if deterministic NAT44 closed the session
4091             dms = self.vapi.nat_det_map_dump()
4092             self.assertEqual(0, dms[0].ses_num)
4093         except:
4094             self.logger.error("TCP session termination failed")
4095             raise
4096
4097     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4098     def test_session_timeout(self):
4099         """ Deterministic NAT session timeouts """
4100         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4101                                       32,
4102                                       socket.inet_aton(self.nat_addr),
4103                                       32)
4104         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4105         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4106                                                   is_inside=0)
4107
4108         self.initiate_tcp_session(self.pg0, self.pg1)
4109         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4110         pkts = self.create_stream_in(self.pg0, self.pg1)
4111         self.pg0.add_stream(pkts)
4112         self.pg_enable_capture(self.pg_interfaces)
4113         self.pg_start()
4114         capture = self.pg1.get_capture(len(pkts))
4115         sleep(15)
4116
4117         dms = self.vapi.nat_det_map_dump()
4118         self.assertEqual(0, dms[0].ses_num)
4119
4120     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4121     def test_session_limit_per_user(self):
4122         """ Deterministic NAT maximum sessions per user limit """
4123         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4124                                       32,
4125                                       socket.inet_aton(self.nat_addr),
4126                                       32)
4127         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4128         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4129                                                   is_inside=0)
4130         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4131                                      src_address=self.pg2.local_ip4n,
4132                                      path_mtu=512,
4133                                      template_interval=10)
4134         self.vapi.nat_ipfix()
4135
4136         pkts = []
4137         for port in range(1025, 2025):
4138             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4139                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4140                  UDP(sport=port, dport=port))
4141             pkts.append(p)
4142
4143         self.pg0.add_stream(pkts)
4144         self.pg_enable_capture(self.pg_interfaces)
4145         self.pg_start()
4146         capture = self.pg1.get_capture(len(pkts))
4147
4148         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4149              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4150              UDP(sport=3001, dport=3002))
4151         self.pg0.add_stream(p)
4152         self.pg_enable_capture(self.pg_interfaces)
4153         self.pg_start()
4154         capture = self.pg1.assert_nothing_captured()
4155
4156         # verify ICMP error packet
4157         capture = self.pg0.get_capture(1)
4158         p = capture[0]
4159         self.assertTrue(p.haslayer(ICMP))
4160         icmp = p[ICMP]
4161         self.assertEqual(icmp.type, 3)
4162         self.assertEqual(icmp.code, 1)
4163         self.assertTrue(icmp.haslayer(IPerror))
4164         inner_ip = icmp[IPerror]
4165         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4166         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4167
4168         dms = self.vapi.nat_det_map_dump()
4169
4170         self.assertEqual(1000, dms[0].ses_num)
4171
4172         # verify IPFIX logging
4173         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4174         sleep(1)
4175         capture = self.pg2.get_capture(2)
4176         ipfix = IPFIXDecoder()
4177         # first load template
4178         for p in capture:
4179             self.assertTrue(p.haslayer(IPFIX))
4180             if p.haslayer(Template):
4181                 ipfix.add_template(p.getlayer(Template))
4182         # verify events in data set
4183         for p in capture:
4184             if p.haslayer(Data):
4185                 data = ipfix.decode_data_set(p.getlayer(Set))
4186                 self.verify_ipfix_max_entries_per_user(data)
4187
4188     def clear_nat_det(self):
4189         """
4190         Clear deterministic NAT configuration.
4191         """
4192         self.vapi.nat_ipfix(enable=0)
4193         self.vapi.nat_det_set_timeouts()
4194         deterministic_mappings = self.vapi.nat_det_map_dump()
4195         for dsm in deterministic_mappings:
4196             self.vapi.nat_det_add_del_map(dsm.in_addr,
4197                                           dsm.in_plen,
4198                                           dsm.out_addr,
4199                                           dsm.out_plen,
4200                                           is_add=0)
4201
4202         interfaces = self.vapi.nat44_interface_dump()
4203         for intf in interfaces:
4204             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4205                                                       intf.is_inside,
4206                                                       is_add=0)
4207
4208     def tearDown(self):
4209         super(TestDeterministicNAT, self).tearDown()
4210         if not self.vpp_dead:
4211             self.logger.info(self.vapi.cli("show nat44 detail"))
4212             self.clear_nat_det()
4213
4214
4215 class TestNAT64(MethodHolder):
4216     """ NAT64 Test Cases """
4217
4218     @classmethod
4219     def setUpConstants(cls):
4220         super(TestNAT64, cls).setUpConstants()
4221         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4222                                 "nat64 st hash buckets 256", "}"])
4223
4224     @classmethod
4225     def setUpClass(cls):
4226         super(TestNAT64, cls).setUpClass()
4227
4228         try:
4229             cls.tcp_port_in = 6303
4230             cls.tcp_port_out = 6303
4231             cls.udp_port_in = 6304
4232             cls.udp_port_out = 6304
4233             cls.icmp_id_in = 6305
4234             cls.icmp_id_out = 6305
4235             cls.nat_addr = '10.0.0.3'
4236             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4237             cls.vrf1_id = 10
4238             cls.vrf1_nat_addr = '10.0.10.3'
4239             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4240                                                    cls.vrf1_nat_addr)
4241             cls.ipfix_src_port = 4739
4242             cls.ipfix_domain_id = 1
4243
4244             cls.create_pg_interfaces(range(5))
4245             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4246             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4247             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4248
4249             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4250
4251             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4252
4253             cls.pg0.generate_remote_hosts(2)
4254
4255             for i in cls.ip6_interfaces:
4256                 i.admin_up()
4257                 i.config_ip6()
4258                 i.configure_ipv6_neighbors()
4259
4260             for i in cls.ip4_interfaces:
4261                 i.admin_up()
4262                 i.config_ip4()
4263                 i.resolve_arp()
4264
4265             cls.pg3.admin_up()
4266             cls.pg3.config_ip4()
4267             cls.pg3.resolve_arp()
4268             cls.pg3.config_ip6()
4269             cls.pg3.configure_ipv6_neighbors()
4270
4271         except Exception:
4272             super(TestNAT64, cls).tearDownClass()
4273             raise
4274
4275     def test_pool(self):
4276         """ Add/delete address to NAT64 pool """
4277         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4278
4279         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4280
4281         addresses = self.vapi.nat64_pool_addr_dump()
4282         self.assertEqual(len(addresses), 1)
4283         self.assertEqual(addresses[0].address, nat_addr)
4284
4285         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4286
4287         addresses = self.vapi.nat64_pool_addr_dump()
4288         self.assertEqual(len(addresses), 0)
4289
4290     def test_interface(self):
4291         """ Enable/disable NAT64 feature on the interface """
4292         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4293         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4294
4295         interfaces = self.vapi.nat64_interface_dump()
4296         self.assertEqual(len(interfaces), 2)
4297         pg0_found = False
4298         pg1_found = False
4299         for intf in interfaces:
4300             if intf.sw_if_index == self.pg0.sw_if_index:
4301                 self.assertEqual(intf.is_inside, 1)
4302                 pg0_found = True
4303             elif intf.sw_if_index == self.pg1.sw_if_index:
4304                 self.assertEqual(intf.is_inside, 0)
4305                 pg1_found = True
4306         self.assertTrue(pg0_found)
4307         self.assertTrue(pg1_found)
4308
4309         features = self.vapi.cli("show interface features pg0")
4310         self.assertNotEqual(features.find('nat64-in2out'), -1)
4311         features = self.vapi.cli("show interface features pg1")
4312         self.assertNotEqual(features.find('nat64-out2in'), -1)
4313
4314         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4315         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4316
4317         interfaces = self.vapi.nat64_interface_dump()
4318         self.assertEqual(len(interfaces), 0)
4319
4320     def test_static_bib(self):
4321         """ Add/delete static BIB entry """
4322         in_addr = socket.inet_pton(socket.AF_INET6,
4323                                    '2001:db8:85a3::8a2e:370:7334')
4324         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4325         in_port = 1234
4326         out_port = 5678
4327         proto = IP_PROTOS.tcp
4328
4329         self.vapi.nat64_add_del_static_bib(in_addr,
4330                                            out_addr,
4331                                            in_port,
4332                                            out_port,
4333                                            proto)
4334         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4335         static_bib_num = 0
4336         for bibe in bib:
4337             if bibe.is_static:
4338                 static_bib_num += 1
4339                 self.assertEqual(bibe.i_addr, in_addr)
4340                 self.assertEqual(bibe.o_addr, out_addr)
4341                 self.assertEqual(bibe.i_port, in_port)
4342                 self.assertEqual(bibe.o_port, out_port)
4343         self.assertEqual(static_bib_num, 1)
4344
4345         self.vapi.nat64_add_del_static_bib(in_addr,
4346                                            out_addr,
4347                                            in_port,
4348                                            out_port,
4349                                            proto,
4350                                            is_add=0)
4351         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4352         static_bib_num = 0
4353         for bibe in bib:
4354             if bibe.is_static:
4355                 static_bib_num += 1
4356         self.assertEqual(static_bib_num, 0)
4357
4358     def test_set_timeouts(self):
4359         """ Set NAT64 timeouts """
4360         # verify default values
4361         timeouts = self.vapi.nat64_get_timeouts()
4362         self.assertEqual(timeouts.udp, 300)
4363         self.assertEqual(timeouts.icmp, 60)
4364         self.assertEqual(timeouts.tcp_trans, 240)
4365         self.assertEqual(timeouts.tcp_est, 7440)
4366         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4367
4368         # set and verify custom values
4369         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4370                                      tcp_est=7450, tcp_incoming_syn=10)
4371         timeouts = self.vapi.nat64_get_timeouts()
4372         self.assertEqual(timeouts.udp, 200)
4373         self.assertEqual(timeouts.icmp, 30)
4374         self.assertEqual(timeouts.tcp_trans, 250)
4375         self.assertEqual(timeouts.tcp_est, 7450)
4376         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4377
4378     def test_dynamic(self):
4379         """ NAT64 dynamic translation test """
4380         self.tcp_port_in = 6303
4381         self.udp_port_in = 6304
4382         self.icmp_id_in = 6305
4383
4384         ses_num_start = self.nat64_get_ses_num()
4385
4386         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4387                                                 self.nat_addr_n)
4388         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4389         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4390
4391         # in2out
4392         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4393         self.pg0.add_stream(pkts)
4394         self.pg_enable_capture(self.pg_interfaces)
4395         self.pg_start()
4396         capture = self.pg1.get_capture(len(pkts))
4397         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4398                                 dst_ip=self.pg1.remote_ip4)
4399
4400         # out2in
4401         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4402         self.pg1.add_stream(pkts)
4403         self.pg_enable_capture(self.pg_interfaces)
4404         self.pg_start()
4405         capture = self.pg0.get_capture(len(pkts))
4406         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4407         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4408
4409         # in2out
4410         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4411         self.pg0.add_stream(pkts)
4412         self.pg_enable_capture(self.pg_interfaces)
4413         self.pg_start()
4414         capture = self.pg1.get_capture(len(pkts))
4415         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4416                                 dst_ip=self.pg1.remote_ip4)
4417
4418         # out2in
4419         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4420         self.pg1.add_stream(pkts)
4421         self.pg_enable_capture(self.pg_interfaces)
4422         self.pg_start()
4423         capture = self.pg0.get_capture(len(pkts))
4424         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4425
4426         ses_num_end = self.nat64_get_ses_num()
4427
4428         self.assertEqual(ses_num_end - ses_num_start, 3)
4429
4430         # tenant with specific VRF
4431         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4432                                                 self.vrf1_nat_addr_n,
4433                                                 vrf_id=self.vrf1_id)
4434         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4435
4436         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4437         self.pg2.add_stream(pkts)
4438         self.pg_enable_capture(self.pg_interfaces)
4439         self.pg_start()
4440         capture = self.pg1.get_capture(len(pkts))
4441         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4442                                 dst_ip=self.pg1.remote_ip4)
4443
4444         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4445         self.pg1.add_stream(pkts)
4446         self.pg_enable_capture(self.pg_interfaces)
4447         self.pg_start()
4448         capture = self.pg2.get_capture(len(pkts))
4449         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4450
4451     def test_static(self):
4452         """ NAT64 static translation test """
4453         self.tcp_port_in = 60303
4454         self.udp_port_in = 60304
4455         self.icmp_id_in = 60305
4456         self.tcp_port_out = 60303
4457         self.udp_port_out = 60304
4458         self.icmp_id_out = 60305
4459
4460         ses_num_start = self.nat64_get_ses_num()
4461
4462         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4463                                                 self.nat_addr_n)
4464         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4465         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4466
4467         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4468                                            self.nat_addr_n,
4469                                            self.tcp_port_in,
4470                                            self.tcp_port_out,
4471                                            IP_PROTOS.tcp)
4472         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4473                                            self.nat_addr_n,
4474                                            self.udp_port_in,
4475                                            self.udp_port_out,
4476                                            IP_PROTOS.udp)
4477         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4478                                            self.nat_addr_n,
4479                                            self.icmp_id_in,
4480                                            self.icmp_id_out,
4481                                            IP_PROTOS.icmp)
4482
4483         # in2out
4484         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4485         self.pg0.add_stream(pkts)
4486         self.pg_enable_capture(self.pg_interfaces)
4487         self.pg_start()
4488         capture = self.pg1.get_capture(len(pkts))
4489         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4490                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4491
4492         # out2in
4493         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4494         self.pg1.add_stream(pkts)
4495         self.pg_enable_capture(self.pg_interfaces)
4496         self.pg_start()
4497         capture = self.pg0.get_capture(len(pkts))
4498         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4499         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4500
4501         ses_num_end = self.nat64_get_ses_num()
4502
4503         self.assertEqual(ses_num_end - ses_num_start, 3)
4504
4505     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4506     def test_session_timeout(self):
4507         """ NAT64 session timeout """
4508         self.icmp_id_in = 1234
4509         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4510                                                 self.nat_addr_n)
4511         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4512         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4513         self.vapi.nat64_set_timeouts(icmp=5)
4514
4515         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4516         self.pg0.add_stream(pkts)
4517         self.pg_enable_capture(self.pg_interfaces)
4518         self.pg_start()
4519         capture = self.pg1.get_capture(len(pkts))
4520
4521         ses_num_before_timeout = self.nat64_get_ses_num()
4522
4523         sleep(15)
4524
4525         # ICMP session after timeout
4526         ses_num_after_timeout = self.nat64_get_ses_num()
4527         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4528
4529     def test_icmp_error(self):
4530         """ NAT64 ICMP Error message translation """
4531         self.tcp_port_in = 6303
4532         self.udp_port_in = 6304
4533         self.icmp_id_in = 6305
4534
4535         ses_num_start = self.nat64_get_ses_num()
4536
4537         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4538                                                 self.nat_addr_n)
4539         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4540         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4541
4542         # send some packets to create sessions
4543         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4544         self.pg0.add_stream(pkts)
4545         self.pg_enable_capture(self.pg_interfaces)
4546         self.pg_start()
4547         capture_ip4 = self.pg1.get_capture(len(pkts))
4548         self.verify_capture_out(capture_ip4,
4549                                 nat_ip=self.nat_addr,
4550                                 dst_ip=self.pg1.remote_ip4)
4551
4552         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4553         self.pg1.add_stream(pkts)
4554         self.pg_enable_capture(self.pg_interfaces)
4555         self.pg_start()
4556         capture_ip6 = self.pg0.get_capture(len(pkts))
4557         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4558         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4559                                    self.pg0.remote_ip6)
4560
4561         # in2out
4562         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4563                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4564                 ICMPv6DestUnreach(code=1) /
4565                 packet[IPv6] for packet in capture_ip6]
4566         self.pg0.add_stream(pkts)
4567         self.pg_enable_capture(self.pg_interfaces)
4568         self.pg_start()
4569         capture = self.pg1.get_capture(len(pkts))
4570         for packet in capture:
4571             try:
4572                 self.assertEqual(packet[IP].src, self.nat_addr)
4573                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4574                 self.assertEqual(packet[ICMP].type, 3)
4575                 self.assertEqual(packet[ICMP].code, 13)
4576                 inner = packet[IPerror]
4577                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4578                 self.assertEqual(inner.dst, self.nat_addr)
4579                 self.check_icmp_checksum(packet)
4580                 if inner.haslayer(TCPerror):
4581                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4582                 elif inner.haslayer(UDPerror):
4583                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4584                 else:
4585                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4586             except:
4587                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4588                 raise
4589
4590         # out2in
4591         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4592                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4593                 ICMP(type=3, code=13) /
4594                 packet[IP] for packet in capture_ip4]
4595         self.pg1.add_stream(pkts)
4596         self.pg_enable_capture(self.pg_interfaces)
4597         self.pg_start()
4598         capture = self.pg0.get_capture(len(pkts))
4599         for packet in capture:
4600             try:
4601                 self.assertEqual(packet[IPv6].src, ip.src)
4602                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4603                 icmp = packet[ICMPv6DestUnreach]
4604                 self.assertEqual(icmp.code, 1)
4605                 inner = icmp[IPerror6]
4606                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4607                 self.assertEqual(inner.dst, ip.src)
4608                 self.check_icmpv6_checksum(packet)
4609                 if inner.haslayer(TCPerror):
4610                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4611                 elif inner.haslayer(UDPerror):
4612                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4613                 else:
4614                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4615                                      self.icmp_id_in)
4616             except:
4617                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4618                 raise
4619
4620     def test_hairpinning(self):
4621         """ NAT64 hairpinning """
4622
4623         client = self.pg0.remote_hosts[0]
4624         server = self.pg0.remote_hosts[1]
4625         server_tcp_in_port = 22
4626         server_tcp_out_port = 4022
4627         server_udp_in_port = 23
4628         server_udp_out_port = 4023
4629         client_tcp_in_port = 1234
4630         client_udp_in_port = 1235
4631         client_tcp_out_port = 0
4632         client_udp_out_port = 0
4633         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4634         nat_addr_ip6 = ip.src
4635
4636         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4637                                                 self.nat_addr_n)
4638         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4639         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4640
4641         self.vapi.nat64_add_del_static_bib(server.ip6n,
4642                                            self.nat_addr_n,
4643                                            server_tcp_in_port,
4644                                            server_tcp_out_port,
4645                                            IP_PROTOS.tcp)
4646         self.vapi.nat64_add_del_static_bib(server.ip6n,
4647                                            self.nat_addr_n,
4648                                            server_udp_in_port,
4649                                            server_udp_out_port,
4650                                            IP_PROTOS.udp)
4651
4652         # client to server
4653         pkts = []
4654         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4655              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4656              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4657         pkts.append(p)
4658         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4659              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4660              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4661         pkts.append(p)
4662         self.pg0.add_stream(pkts)
4663         self.pg_enable_capture(self.pg_interfaces)
4664         self.pg_start()
4665         capture = self.pg0.get_capture(len(pkts))
4666         for packet in capture:
4667             try:
4668                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4669                 self.assertEqual(packet[IPv6].dst, server.ip6)
4670                 if packet.haslayer(TCP):
4671                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4672                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4673                     self.check_tcp_checksum(packet)
4674                     client_tcp_out_port = packet[TCP].sport
4675                 else:
4676                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4677                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4678                     self.check_udp_checksum(packet)
4679                     client_udp_out_port = packet[UDP].sport
4680             except:
4681                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4682                 raise
4683
4684         # server to client
4685         pkts = []
4686         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4687              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4688              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4689         pkts.append(p)
4690         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4691              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4692              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4693         pkts.append(p)
4694         self.pg0.add_stream(pkts)
4695         self.pg_enable_capture(self.pg_interfaces)
4696         self.pg_start()
4697         capture = self.pg0.get_capture(len(pkts))
4698         for packet in capture:
4699             try:
4700                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4701                 self.assertEqual(packet[IPv6].dst, client.ip6)
4702                 if packet.haslayer(TCP):
4703                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4704                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4705                     self.check_tcp_checksum(packet)
4706                 else:
4707                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4708                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4709                     self.check_udp_checksum(packet)
4710             except:
4711                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4712                 raise
4713
4714         # ICMP error
4715         pkts = []
4716         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4717                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4718                 ICMPv6DestUnreach(code=1) /
4719                 packet[IPv6] for packet in capture]
4720         self.pg0.add_stream(pkts)
4721         self.pg_enable_capture(self.pg_interfaces)
4722         self.pg_start()
4723         capture = self.pg0.get_capture(len(pkts))
4724         for packet in capture:
4725             try:
4726                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4727                 self.assertEqual(packet[IPv6].dst, server.ip6)
4728                 icmp = packet[ICMPv6DestUnreach]
4729                 self.assertEqual(icmp.code, 1)
4730                 inner = icmp[IPerror6]
4731                 self.assertEqual(inner.src, server.ip6)
4732                 self.assertEqual(inner.dst, nat_addr_ip6)
4733                 self.check_icmpv6_checksum(packet)
4734                 if inner.haslayer(TCPerror):
4735                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4736                     self.assertEqual(inner[TCPerror].dport,
4737                                      client_tcp_out_port)
4738                 else:
4739                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4740                     self.assertEqual(inner[UDPerror].dport,
4741                                      client_udp_out_port)
4742             except:
4743                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4744                 raise
4745
4746     def test_prefix(self):
4747         """ NAT64 Network-Specific Prefix """
4748
4749         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4750                                                 self.nat_addr_n)
4751         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4752         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4753         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4754                                                 self.vrf1_nat_addr_n,
4755                                                 vrf_id=self.vrf1_id)
4756         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4757
4758         # Add global prefix
4759         global_pref64 = "2001:db8::"
4760         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4761         global_pref64_len = 32
4762         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4763
4764         prefix = self.vapi.nat64_prefix_dump()
4765         self.assertEqual(len(prefix), 1)
4766         self.assertEqual(prefix[0].prefix, global_pref64_n)
4767         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4768         self.assertEqual(prefix[0].vrf_id, 0)
4769
4770         # Add tenant specific prefix
4771         vrf1_pref64 = "2001:db8:122:300::"
4772         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4773         vrf1_pref64_len = 56
4774         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4775                                        vrf1_pref64_len,
4776                                        vrf_id=self.vrf1_id)
4777         prefix = self.vapi.nat64_prefix_dump()
4778         self.assertEqual(len(prefix), 2)
4779
4780         # Global prefix
4781         pkts = self.create_stream_in_ip6(self.pg0,
4782                                          self.pg1,
4783                                          pref=global_pref64,
4784                                          plen=global_pref64_len)
4785         self.pg0.add_stream(pkts)
4786         self.pg_enable_capture(self.pg_interfaces)
4787         self.pg_start()
4788         capture = self.pg1.get_capture(len(pkts))
4789         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4790                                 dst_ip=self.pg1.remote_ip4)
4791
4792         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4793         self.pg1.add_stream(pkts)
4794         self.pg_enable_capture(self.pg_interfaces)
4795         self.pg_start()
4796         capture = self.pg0.get_capture(len(pkts))
4797         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4798                                   global_pref64,
4799                                   global_pref64_len)
4800         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4801
4802         # Tenant specific prefix
4803         pkts = self.create_stream_in_ip6(self.pg2,
4804                                          self.pg1,
4805                                          pref=vrf1_pref64,
4806                                          plen=vrf1_pref64_len)
4807         self.pg2.add_stream(pkts)
4808         self.pg_enable_capture(self.pg_interfaces)
4809         self.pg_start()
4810         capture = self.pg1.get_capture(len(pkts))
4811         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4812                                 dst_ip=self.pg1.remote_ip4)
4813
4814         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4815         self.pg1.add_stream(pkts)
4816         self.pg_enable_capture(self.pg_interfaces)
4817         self.pg_start()
4818         capture = self.pg2.get_capture(len(pkts))
4819         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4820                                   vrf1_pref64,
4821                                   vrf1_pref64_len)
4822         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4823
4824     def test_unknown_proto(self):
4825         """ NAT64 translate packet with unknown protocol """
4826
4827         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4828                                                 self.nat_addr_n)
4829         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4830         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4831         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4832
4833         # in2out
4834         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4835              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4836              TCP(sport=self.tcp_port_in, dport=20))
4837         self.pg0.add_stream(p)
4838         self.pg_enable_capture(self.pg_interfaces)
4839         self.pg_start()
4840         p = self.pg1.get_capture(1)
4841
4842         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4843              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4844              GRE() /
4845              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4846              TCP(sport=1234, dport=1234))
4847         self.pg0.add_stream(p)
4848         self.pg_enable_capture(self.pg_interfaces)
4849         self.pg_start()
4850         p = self.pg1.get_capture(1)
4851         packet = p[0]
4852         try:
4853             self.assertEqual(packet[IP].src, self.nat_addr)
4854             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4855             self.assertTrue(packet.haslayer(GRE))
4856             self.check_ip_checksum(packet)
4857         except:
4858             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4859             raise
4860
4861         # out2in
4862         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4863              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4864              GRE() /
4865              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4866              TCP(sport=1234, dport=1234))
4867         self.pg1.add_stream(p)
4868         self.pg_enable_capture(self.pg_interfaces)
4869         self.pg_start()
4870         p = self.pg0.get_capture(1)
4871         packet = p[0]
4872         try:
4873             self.assertEqual(packet[IPv6].src, remote_ip6)
4874             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4875             self.assertEqual(packet[IPv6].nh, 47)
4876         except:
4877             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4878             raise
4879
4880     def test_hairpinning_unknown_proto(self):
4881         """ NAT64 translate packet with unknown protocol - hairpinning """
4882
4883         client = self.pg0.remote_hosts[0]
4884         server = self.pg0.remote_hosts[1]
4885         server_tcp_in_port = 22
4886         server_tcp_out_port = 4022
4887         client_tcp_in_port = 1234
4888         client_tcp_out_port = 1235
4889         server_nat_ip = "10.0.0.100"
4890         client_nat_ip = "10.0.0.110"
4891         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4892         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4893         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4894         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4895
4896         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4897                                                 client_nat_ip_n)
4898         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4899         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4900
4901         self.vapi.nat64_add_del_static_bib(server.ip6n,
4902                                            server_nat_ip_n,
4903                                            server_tcp_in_port,
4904                                            server_tcp_out_port,
4905                                            IP_PROTOS.tcp)
4906
4907         self.vapi.nat64_add_del_static_bib(server.ip6n,
4908                                            server_nat_ip_n,
4909                                            0,
4910                                            0,
4911                                            IP_PROTOS.gre)
4912
4913         self.vapi.nat64_add_del_static_bib(client.ip6n,
4914                                            client_nat_ip_n,
4915                                            client_tcp_in_port,
4916                                            client_tcp_out_port,
4917                                            IP_PROTOS.tcp)
4918
4919         # client to server
4920         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4921              IPv6(src=client.ip6, dst=server_nat_ip6) /
4922              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4923         self.pg0.add_stream(p)
4924         self.pg_enable_capture(self.pg_interfaces)
4925         self.pg_start()
4926         p = self.pg0.get_capture(1)
4927
4928         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4929              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4930              GRE() /
4931              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4932              TCP(sport=1234, dport=1234))
4933         self.pg0.add_stream(p)
4934         self.pg_enable_capture(self.pg_interfaces)
4935         self.pg_start()
4936         p = self.pg0.get_capture(1)
4937         packet = p[0]
4938         try:
4939             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4940             self.assertEqual(packet[IPv6].dst, server.ip6)
4941             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4942         except:
4943             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4944             raise
4945
4946         # server to client
4947         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4948              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4949              GRE() /
4950              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4951              TCP(sport=1234, dport=1234))
4952         self.pg0.add_stream(p)
4953         self.pg_enable_capture(self.pg_interfaces)
4954         self.pg_start()
4955         p = self.pg0.get_capture(1)
4956         packet = p[0]
4957         try:
4958             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4959             self.assertEqual(packet[IPv6].dst, client.ip6)
4960             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4961         except:
4962             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4963             raise
4964
4965     def test_one_armed_nat64(self):
4966         """ One armed NAT64 """
4967         external_port = 0
4968         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4969                                            '64:ff9b::',
4970                                            96)
4971
4972         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4973                                                 self.nat_addr_n)
4974         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4975         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4976
4977         # in2out
4978         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4979              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4980              TCP(sport=12345, dport=80))
4981         self.pg3.add_stream(p)
4982         self.pg_enable_capture(self.pg_interfaces)
4983         self.pg_start()
4984         capture = self.pg3.get_capture(1)
4985         p = capture[0]
4986         try:
4987             ip = p[IP]
4988             tcp = p[TCP]
4989             self.assertEqual(ip.src, self.nat_addr)
4990             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4991             self.assertNotEqual(tcp.sport, 12345)
4992             external_port = tcp.sport
4993             self.assertEqual(tcp.dport, 80)
4994             self.check_tcp_checksum(p)
4995             self.check_ip_checksum(p)
4996         except:
4997             self.logger.error(ppp("Unexpected or invalid packet:", p))
4998             raise
4999
5000         # out2in
5001         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5002              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5003              TCP(sport=80, dport=external_port))
5004         self.pg3.add_stream(p)
5005         self.pg_enable_capture(self.pg_interfaces)
5006         self.pg_start()
5007         capture = self.pg3.get_capture(1)
5008         p = capture[0]
5009         try:
5010             ip = p[IPv6]
5011             tcp = p[TCP]
5012             self.assertEqual(ip.src, remote_host_ip6)
5013             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5014             self.assertEqual(tcp.sport, 80)
5015             self.assertEqual(tcp.dport, 12345)
5016             self.check_tcp_checksum(p)
5017         except:
5018             self.logger.error(ppp("Unexpected or invalid packet:", p))
5019             raise
5020
5021     def test_frag_in_order(self):
5022         """ NAT64 translate fragments arriving in order """
5023         self.tcp_port_in = random.randint(1025, 65535)
5024
5025         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5026                                                 self.nat_addr_n)
5027         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5028         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5029
5030         reass = self.vapi.nat_reass_dump()
5031         reass_n_start = len(reass)
5032
5033         # in2out
5034         data = 'a' * 200
5035         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5036                                            self.tcp_port_in, 20, data)
5037         self.pg0.add_stream(pkts)
5038         self.pg_enable_capture(self.pg_interfaces)
5039         self.pg_start()
5040         frags = self.pg1.get_capture(len(pkts))
5041         p = self.reass_frags_and_verify(frags,
5042                                         self.nat_addr,
5043                                         self.pg1.remote_ip4)
5044         self.assertEqual(p[TCP].dport, 20)
5045         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5046         self.tcp_port_out = p[TCP].sport
5047         self.assertEqual(data, p[Raw].load)
5048
5049         # out2in
5050         data = "A" * 4 + "b" * 16 + "C" * 3
5051         pkts = self.create_stream_frag(self.pg1,
5052                                        self.nat_addr,
5053                                        20,
5054                                        self.tcp_port_out,
5055                                        data)
5056         self.pg1.add_stream(pkts)
5057         self.pg_enable_capture(self.pg_interfaces)
5058         self.pg_start()
5059         frags = self.pg0.get_capture(len(pkts))
5060         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5061         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5062         self.assertEqual(p[TCP].sport, 20)
5063         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5064         self.assertEqual(data, p[Raw].load)
5065
5066         reass = self.vapi.nat_reass_dump()
5067         reass_n_end = len(reass)
5068
5069         self.assertEqual(reass_n_end - reass_n_start, 2)
5070
5071     def test_reass_hairpinning(self):
5072         """ NAT64 fragments hairpinning """
5073         data = 'a' * 200
5074         client = self.pg0.remote_hosts[0]
5075         server = self.pg0.remote_hosts[1]
5076         server_in_port = random.randint(1025, 65535)
5077         server_out_port = random.randint(1025, 65535)
5078         client_in_port = random.randint(1025, 65535)
5079         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5080         nat_addr_ip6 = ip.src
5081
5082         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5083                                                 self.nat_addr_n)
5084         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5085         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5086
5087         # add static BIB entry for server
5088         self.vapi.nat64_add_del_static_bib(server.ip6n,
5089                                            self.nat_addr_n,
5090                                            server_in_port,
5091                                            server_out_port,
5092                                            IP_PROTOS.tcp)
5093
5094         # send packet from host to server
5095         pkts = self.create_stream_frag_ip6(self.pg0,
5096                                            self.nat_addr,
5097                                            client_in_port,
5098                                            server_out_port,
5099                                            data)
5100         self.pg0.add_stream(pkts)
5101         self.pg_enable_capture(self.pg_interfaces)
5102         self.pg_start()
5103         frags = self.pg0.get_capture(len(pkts))
5104         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5105         self.assertNotEqual(p[TCP].sport, client_in_port)
5106         self.assertEqual(p[TCP].dport, server_in_port)
5107         self.assertEqual(data, p[Raw].load)
5108
5109     def test_frag_out_of_order(self):
5110         """ NAT64 translate fragments arriving out of order """
5111         self.tcp_port_in = random.randint(1025, 65535)
5112
5113         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5114                                                 self.nat_addr_n)
5115         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5116         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5117
5118         # in2out
5119         data = 'a' * 200
5120         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5121                                            self.tcp_port_in, 20, data)
5122         pkts.reverse()
5123         self.pg0.add_stream(pkts)
5124         self.pg_enable_capture(self.pg_interfaces)
5125         self.pg_start()
5126         frags = self.pg1.get_capture(len(pkts))
5127         p = self.reass_frags_and_verify(frags,
5128                                         self.nat_addr,
5129                                         self.pg1.remote_ip4)
5130         self.assertEqual(p[TCP].dport, 20)
5131         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5132         self.tcp_port_out = p[TCP].sport
5133         self.assertEqual(data, p[Raw].load)
5134
5135         # out2in
5136         data = "A" * 4 + "B" * 16 + "C" * 3
5137         pkts = self.create_stream_frag(self.pg1,
5138                                        self.nat_addr,
5139                                        20,
5140                                        self.tcp_port_out,
5141                                        data)
5142         pkts.reverse()
5143         self.pg1.add_stream(pkts)
5144         self.pg_enable_capture(self.pg_interfaces)
5145         self.pg_start()
5146         frags = self.pg0.get_capture(len(pkts))
5147         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5148         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5149         self.assertEqual(p[TCP].sport, 20)
5150         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5151         self.assertEqual(data, p[Raw].load)
5152
5153     def test_interface_addr(self):
5154         """ Acquire NAT64 pool addresses from interface """
5155         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5156
5157         # no address in NAT64 pool
5158         adresses = self.vapi.nat44_address_dump()
5159         self.assertEqual(0, len(adresses))
5160
5161         # configure interface address and check NAT64 address pool
5162         self.pg4.config_ip4()
5163         addresses = self.vapi.nat64_pool_addr_dump()
5164         self.assertEqual(len(addresses), 1)
5165         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5166
5167         # remove interface address and check NAT64 address pool
5168         self.pg4.unconfig_ip4()
5169         addresses = self.vapi.nat64_pool_addr_dump()
5170         self.assertEqual(0, len(adresses))
5171
5172     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5173     def test_ipfix_max_bibs_sessions(self):
5174         """ IPFIX logging maximum session and BIB entries exceeded """
5175         max_bibs = 1280
5176         max_sessions = 2560
5177         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5178                                            '64:ff9b::',
5179                                            96)
5180
5181         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5182                                                 self.nat_addr_n)
5183         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5184         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5185
5186         pkts = []
5187         src = ""
5188         for i in range(0, max_bibs):
5189             src = "fd01:aa::%x" % (i)
5190             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5191                  IPv6(src=src, dst=remote_host_ip6) /
5192                  TCP(sport=12345, dport=80))
5193             pkts.append(p)
5194             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5195                  IPv6(src=src, dst=remote_host_ip6) /
5196                  TCP(sport=12345, dport=22))
5197             pkts.append(p)
5198         self.pg0.add_stream(pkts)
5199         self.pg_enable_capture(self.pg_interfaces)
5200         self.pg_start()
5201         self.pg1.get_capture(max_sessions)
5202
5203         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5204                                      src_address=self.pg3.local_ip4n,
5205                                      path_mtu=512,
5206                                      template_interval=10)
5207         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5208                             src_port=self.ipfix_src_port)
5209
5210         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5211              IPv6(src=src, dst=remote_host_ip6) /
5212              TCP(sport=12345, dport=25))
5213         self.pg0.add_stream(p)
5214         self.pg_enable_capture(self.pg_interfaces)
5215         self.pg_start()
5216         self.pg1.get_capture(0)
5217         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5218         capture = self.pg3.get_capture(9)
5219         ipfix = IPFIXDecoder()
5220         # first load template
5221         for p in capture:
5222             self.assertTrue(p.haslayer(IPFIX))
5223             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5224             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5225             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5226             self.assertEqual(p[UDP].dport, 4739)
5227             self.assertEqual(p[IPFIX].observationDomainID,
5228                              self.ipfix_domain_id)
5229             if p.haslayer(Template):
5230                 ipfix.add_template(p.getlayer(Template))
5231         # verify events in data set
5232         for p in capture:
5233             if p.haslayer(Data):
5234                 data = ipfix.decode_data_set(p.getlayer(Set))
5235                 self.verify_ipfix_max_sessions(data, max_sessions)
5236
5237         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5238              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5239              TCP(sport=12345, dport=80))
5240         self.pg0.add_stream(p)
5241         self.pg_enable_capture(self.pg_interfaces)
5242         self.pg_start()
5243         self.pg1.get_capture(0)
5244         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5245         capture = self.pg3.get_capture(1)
5246         # verify events in data set
5247         for p in capture:
5248             self.assertTrue(p.haslayer(IPFIX))
5249             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5250             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5251             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5252             self.assertEqual(p[UDP].dport, 4739)
5253             self.assertEqual(p[IPFIX].observationDomainID,
5254                              self.ipfix_domain_id)
5255             if p.haslayer(Data):
5256                 data = ipfix.decode_data_set(p.getlayer(Set))
5257                 self.verify_ipfix_max_bibs(data, max_bibs)
5258
5259     def test_ipfix_max_frags(self):
5260         """ IPFIX logging maximum fragments pending reassembly exceeded """
5261         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5262                                                 self.nat_addr_n)
5263         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5264         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5265         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5266         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5267                                      src_address=self.pg3.local_ip4n,
5268                                      path_mtu=512,
5269                                      template_interval=10)
5270         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5271                             src_port=self.ipfix_src_port)
5272
5273         data = 'a' * 200
5274         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5275                                            self.tcp_port_in, 20, data)
5276         self.pg0.add_stream(pkts[-1])
5277         self.pg_enable_capture(self.pg_interfaces)
5278         self.pg_start()
5279         self.pg1.get_capture(0)
5280         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5281         capture = self.pg3.get_capture(9)
5282         ipfix = IPFIXDecoder()
5283         # first load template
5284         for p in capture:
5285             self.assertTrue(p.haslayer(IPFIX))
5286             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5287             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5288             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5289             self.assertEqual(p[UDP].dport, 4739)
5290             self.assertEqual(p[IPFIX].observationDomainID,
5291                              self.ipfix_domain_id)
5292             if p.haslayer(Template):
5293                 ipfix.add_template(p.getlayer(Template))
5294         # verify events in data set
5295         for p in capture:
5296             if p.haslayer(Data):
5297                 data = ipfix.decode_data_set(p.getlayer(Set))
5298                 self.verify_ipfix_max_fragments_ip6(data, 0,
5299                                                     self.pg0.remote_ip6n)
5300
5301     def test_ipfix_bib_ses(self):
5302         """ IPFIX logging NAT64 BIB/session create and delete events """
5303         self.tcp_port_in = random.randint(1025, 65535)
5304         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5305                                            '64:ff9b::',
5306                                            96)
5307
5308         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5309                                                 self.nat_addr_n)
5310         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5311         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5312         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5313                                      src_address=self.pg3.local_ip4n,
5314                                      path_mtu=512,
5315                                      template_interval=10)
5316         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5317                             src_port=self.ipfix_src_port)
5318
5319         # Create
5320         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5321              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5322              TCP(sport=self.tcp_port_in, dport=25))
5323         self.pg0.add_stream(p)
5324         self.pg_enable_capture(self.pg_interfaces)
5325         self.pg_start()
5326         p = self.pg1.get_capture(1)
5327         self.tcp_port_out = p[0][TCP].sport
5328         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5329         capture = self.pg3.get_capture(10)
5330         ipfix = IPFIXDecoder()
5331         # first load template
5332         for p in capture:
5333             self.assertTrue(p.haslayer(IPFIX))
5334             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5335             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5336             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5337             self.assertEqual(p[UDP].dport, 4739)
5338             self.assertEqual(p[IPFIX].observationDomainID,
5339                              self.ipfix_domain_id)
5340             if p.haslayer(Template):
5341                 ipfix.add_template(p.getlayer(Template))
5342         # verify events in data set
5343         for p in capture:
5344             if p.haslayer(Data):
5345                 data = ipfix.decode_data_set(p.getlayer(Set))
5346                 if ord(data[0][230]) == 10:
5347                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5348                 elif ord(data[0][230]) == 6:
5349                     self.verify_ipfix_nat64_ses(data,
5350                                                 1,
5351                                                 self.pg0.remote_ip6n,
5352                                                 self.pg1.remote_ip4,
5353                                                 25)
5354                 else:
5355                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5356
5357         # Delete
5358         self.pg_enable_capture(self.pg_interfaces)
5359         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5360                                                 self.nat_addr_n,
5361                                                 is_add=0)
5362         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5363         capture = self.pg3.get_capture(2)
5364         # verify events in data set
5365         for p in capture:
5366             self.assertTrue(p.haslayer(IPFIX))
5367             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5368             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5369             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5370             self.assertEqual(p[UDP].dport, 4739)
5371             self.assertEqual(p[IPFIX].observationDomainID,
5372                              self.ipfix_domain_id)
5373             if p.haslayer(Data):
5374                 data = ipfix.decode_data_set(p.getlayer(Set))
5375                 if ord(data[0][230]) == 11:
5376                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5377                 elif ord(data[0][230]) == 7:
5378                     self.verify_ipfix_nat64_ses(data,
5379                                                 0,
5380                                                 self.pg0.remote_ip6n,
5381                                                 self.pg1.remote_ip4,
5382                                                 25)
5383                 else:
5384                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5385
5386     def nat64_get_ses_num(self):
5387         """
5388         Return number of active NAT64 sessions.
5389         """
5390         st = self.vapi.nat64_st_dump()
5391         return len(st)
5392
5393     def clear_nat64(self):
5394         """
5395         Clear NAT64 configuration.
5396         """
5397         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5398                             domain_id=self.ipfix_domain_id)
5399         self.ipfix_src_port = 4739
5400         self.ipfix_domain_id = 1
5401
5402         self.vapi.nat64_set_timeouts()
5403
5404         interfaces = self.vapi.nat64_interface_dump()
5405         for intf in interfaces:
5406             if intf.is_inside > 1:
5407                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5408                                                   0,
5409                                                   is_add=0)
5410             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5411                                               intf.is_inside,
5412                                               is_add=0)
5413
5414         bib = self.vapi.nat64_bib_dump(255)
5415         for bibe in bib:
5416             if bibe.is_static:
5417                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5418                                                    bibe.o_addr,
5419                                                    bibe.i_port,
5420                                                    bibe.o_port,
5421                                                    bibe.proto,
5422                                                    bibe.vrf_id,
5423                                                    is_add=0)
5424
5425         adresses = self.vapi.nat64_pool_addr_dump()
5426         for addr in adresses:
5427             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5428                                                     addr.address,
5429                                                     vrf_id=addr.vrf_id,
5430                                                     is_add=0)
5431
5432         prefixes = self.vapi.nat64_prefix_dump()
5433         for prefix in prefixes:
5434             self.vapi.nat64_add_del_prefix(prefix.prefix,
5435                                            prefix.prefix_len,
5436                                            vrf_id=prefix.vrf_id,
5437                                            is_add=0)
5438
5439     def tearDown(self):
5440         super(TestNAT64, self).tearDown()
5441         if not self.vpp_dead:
5442             self.logger.info(self.vapi.cli("show nat64 pool"))
5443             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5444             self.logger.info(self.vapi.cli("show nat64 prefix"))
5445             self.logger.info(self.vapi.cli("show nat64 bib all"))
5446             self.logger.info(self.vapi.cli("show nat64 session table all"))
5447             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5448             self.clear_nat64()
5449
5450
5451 class TestDSlite(MethodHolder):
5452     """ DS-Lite Test Cases """
5453
5454     @classmethod
5455     def setUpClass(cls):
5456         super(TestDSlite, cls).setUpClass()
5457
5458         try:
5459             cls.nat_addr = '10.0.0.3'
5460             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5461
5462             cls.create_pg_interfaces(range(2))
5463             cls.pg0.admin_up()
5464             cls.pg0.config_ip4()
5465             cls.pg0.resolve_arp()
5466             cls.pg1.admin_up()
5467             cls.pg1.config_ip6()
5468             cls.pg1.generate_remote_hosts(2)
5469             cls.pg1.configure_ipv6_neighbors()
5470
5471         except Exception:
5472             super(TestDSlite, cls).tearDownClass()
5473             raise
5474
5475     def test_dslite(self):
5476         """ Test DS-Lite """
5477         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5478                                                  self.nat_addr_n)
5479         aftr_ip4 = '192.0.0.1'
5480         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5481         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5482         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5483         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5484
5485         # UDP
5486         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5487              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5488              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5489              UDP(sport=20000, dport=10000))
5490         self.pg1.add_stream(p)
5491         self.pg_enable_capture(self.pg_interfaces)
5492         self.pg_start()
5493         capture = self.pg0.get_capture(1)
5494         capture = capture[0]
5495         self.assertFalse(capture.haslayer(IPv6))
5496         self.assertEqual(capture[IP].src, self.nat_addr)
5497         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5498         self.assertNotEqual(capture[UDP].sport, 20000)
5499         self.assertEqual(capture[UDP].dport, 10000)
5500         self.check_ip_checksum(capture)
5501         out_port = capture[UDP].sport
5502
5503         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5504              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5505              UDP(sport=10000, dport=out_port))
5506         self.pg0.add_stream(p)
5507         self.pg_enable_capture(self.pg_interfaces)
5508         self.pg_start()
5509         capture = self.pg1.get_capture(1)
5510         capture = capture[0]
5511         self.assertEqual(capture[IPv6].src, aftr_ip6)
5512         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5513         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5514         self.assertEqual(capture[IP].dst, '192.168.1.1')
5515         self.assertEqual(capture[UDP].sport, 10000)
5516         self.assertEqual(capture[UDP].dport, 20000)
5517         self.check_ip_checksum(capture)
5518
5519         # TCP
5520         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5521              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5522              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5523              TCP(sport=20001, dport=10001))
5524         self.pg1.add_stream(p)
5525         self.pg_enable_capture(self.pg_interfaces)
5526         self.pg_start()
5527         capture = self.pg0.get_capture(1)
5528         capture = capture[0]
5529         self.assertFalse(capture.haslayer(IPv6))
5530         self.assertEqual(capture[IP].src, self.nat_addr)
5531         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5532         self.assertNotEqual(capture[TCP].sport, 20001)
5533         self.assertEqual(capture[TCP].dport, 10001)
5534         self.check_ip_checksum(capture)
5535         self.check_tcp_checksum(capture)
5536         out_port = capture[TCP].sport
5537
5538         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5539              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5540              TCP(sport=10001, dport=out_port))
5541         self.pg0.add_stream(p)
5542         self.pg_enable_capture(self.pg_interfaces)
5543         self.pg_start()
5544         capture = self.pg1.get_capture(1)
5545         capture = capture[0]
5546         self.assertEqual(capture[IPv6].src, aftr_ip6)
5547         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5548         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5549         self.assertEqual(capture[IP].dst, '192.168.1.1')
5550         self.assertEqual(capture[TCP].sport, 10001)
5551         self.assertEqual(capture[TCP].dport, 20001)
5552         self.check_ip_checksum(capture)
5553         self.check_tcp_checksum(capture)
5554
5555         # ICMP
5556         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5557              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5558              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5559              ICMP(id=4000, type='echo-request'))
5560         self.pg1.add_stream(p)
5561         self.pg_enable_capture(self.pg_interfaces)
5562         self.pg_start()
5563         capture = self.pg0.get_capture(1)
5564         capture = capture[0]
5565         self.assertFalse(capture.haslayer(IPv6))
5566         self.assertEqual(capture[IP].src, self.nat_addr)
5567         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5568         self.assertNotEqual(capture[ICMP].id, 4000)
5569         self.check_ip_checksum(capture)
5570         self.check_icmp_checksum(capture)
5571         out_id = capture[ICMP].id
5572
5573         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5574              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5575              ICMP(id=out_id, type='echo-reply'))
5576         self.pg0.add_stream(p)
5577         self.pg_enable_capture(self.pg_interfaces)
5578         self.pg_start()
5579         capture = self.pg1.get_capture(1)
5580         capture = capture[0]
5581         self.assertEqual(capture[IPv6].src, aftr_ip6)
5582         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5583         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5584         self.assertEqual(capture[IP].dst, '192.168.1.1')
5585         self.assertEqual(capture[ICMP].id, 4000)
5586         self.check_ip_checksum(capture)
5587         self.check_icmp_checksum(capture)
5588
5589         # ping DS-Lite AFTR tunnel endpoint address
5590         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5591              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5592              ICMPv6EchoRequest())
5593         self.pg1.add_stream(p)
5594         self.pg_enable_capture(self.pg_interfaces)
5595         self.pg_start()
5596         capture = self.pg1.get_capture(1)
5597         self.assertEqual(1, len(capture))
5598         capture = capture[0]
5599         self.assertEqual(capture[IPv6].src, aftr_ip6)
5600         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5601         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5602
5603     def tearDown(self):
5604         super(TestDSlite, self).tearDown()
5605         if not self.vpp_dead:
5606             self.logger.info(self.vapi.cli("show dslite pool"))
5607             self.logger.info(
5608                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5609             self.logger.info(self.vapi.cli("show dslite sessions"))
5610
5611 if __name__ == '__main__':
5612     unittest.main(testRunner=VppTestRunner)