NAT44: add opaque string tag to static mapping APIs (VPP-1147)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 tag=sm.tag,
1034                 is_add=0)
1035
1036         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037         for lb_sm in lb_static_mappings:
1038             self.vapi.nat44_add_del_lb_static_mapping(
1039                 lb_sm.external_addr,
1040                 lb_sm.external_port,
1041                 lb_sm.protocol,
1042                 vrf_id=lb_sm.vrf_id,
1043                 twice_nat=lb_sm.twice_nat,
1044                 out2in_only=lb_sm.out2in_only,
1045                 tag=lb_sm.tag,
1046                 is_add=0,
1047                 local_num=0,
1048                 locals=[])
1049
1050         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051         for id_m in identity_mappings:
1052             self.vapi.nat44_add_del_identity_mapping(
1053                 addr_only=id_m.addr_only,
1054                 ip=id_m.ip_address,
1055                 port=id_m.port,
1056                 sw_if_index=id_m.sw_if_index,
1057                 vrf_id=id_m.vrf_id,
1058                 protocol=id_m.protocol,
1059                 is_add=0)
1060
1061         adresses = self.vapi.nat44_address_dump()
1062         for addr in adresses:
1063             self.vapi.nat44_add_del_address_range(addr.ip_address,
1064                                                   addr.ip_address,
1065                                                   twice_nat=addr.twice_nat,
1066                                                   is_add=0)
1067
1068         self.vapi.nat_set_reass()
1069         self.vapi.nat_set_reass(is_ip6=1)
1070
1071     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072                                  local_port=0, external_port=0, vrf_id=0,
1073                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1074                                  proto=0, twice_nat=0, out2in_only=0, tag=""):
1075         """
1076         Add/delete NAT44 static mapping
1077
1078         :param local_ip: Local IP address
1079         :param external_ip: External IP address
1080         :param local_port: Local port number (Optional)
1081         :param external_port: External port number (Optional)
1082         :param vrf_id: VRF ID (Default 0)
1083         :param is_add: 1 if add, 0 if delete (Default add)
1084         :param external_sw_if_index: External interface instead of IP address
1085         :param proto: IP protocol (Mandatory if port specified)
1086         :param twice_nat: 1 if translate external host address and port
1087         :param out2in_only: if 1 rule is matching only out2in direction
1088         :param tag: Opaque string tag
1089         """
1090         addr_only = 1
1091         if local_port and external_port:
1092             addr_only = 0
1093         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095         self.vapi.nat44_add_del_static_mapping(
1096             l_ip,
1097             e_ip,
1098             external_sw_if_index,
1099             local_port,
1100             external_port,
1101             addr_only,
1102             vrf_id,
1103             proto,
1104             twice_nat,
1105             out2in_only,
1106             tag,
1107             is_add)
1108
1109     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1110         """
1111         Add/delete NAT44 address
1112
1113         :param ip: IP address
1114         :param is_add: 1 if add, 0 if delete (Default add)
1115         :param twice_nat: twice NAT address for extenal hosts
1116         """
1117         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1119                                               vrf_id=vrf_id,
1120                                               twice_nat=twice_nat)
1121
1122     def test_dynamic(self):
1123         """ NAT44 dynamic translation test """
1124
1125         self.nat44_add_address(self.nat_addr)
1126         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1128                                                   is_inside=0)
1129
1130         # in2out
1131         pkts = self.create_stream_in(self.pg0, self.pg1)
1132         self.pg0.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135         capture = self.pg1.get_capture(len(pkts))
1136         self.verify_capture_out(capture)
1137
1138         # out2in
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147         """ NAT44 handling of client packets with TTL=1 """
1148
1149         self.nat44_add_address(self.nat_addr)
1150         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                   is_inside=0)
1153
1154         # Client side - generate traffic
1155         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156         self.pg0.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159
1160         # Client side - verify ICMP type 11 packets
1161         capture = self.pg0.get_capture(len(pkts))
1162         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163
1164     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165         """ NAT44 handling of server packets with TTL=1 """
1166
1167         self.nat44_add_address(self.nat_addr)
1168         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170                                                   is_inside=0)
1171
1172         # Client side - create sessions
1173         pkts = self.create_stream_in(self.pg0, self.pg1)
1174         self.pg0.add_stream(pkts)
1175         self.pg_enable_capture(self.pg_interfaces)
1176         self.pg_start()
1177
1178         # Server side - generate traffic
1179         capture = self.pg1.get_capture(len(pkts))
1180         self.verify_capture_out(capture)
1181         pkts = self.create_stream_out(self.pg1, ttl=1)
1182         self.pg1.add_stream(pkts)
1183         self.pg_enable_capture(self.pg_interfaces)
1184         self.pg_start()
1185
1186         # Server side - verify ICMP type 11 packets
1187         capture = self.pg1.get_capture(len(pkts))
1188         self.verify_capture_out_with_icmp_errors(capture,
1189                                                  src_ip=self.pg1.local_ip4)
1190
1191     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192         """ NAT44 handling of error responses to client packets with TTL=2 """
1193
1194         self.nat44_add_address(self.nat_addr)
1195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1197                                                   is_inside=0)
1198
1199         # Client side - generate traffic
1200         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201         self.pg0.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204
1205         # Server side - simulate ICMP type 11 response
1206         capture = self.pg1.get_capture(len(pkts))
1207         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209                 ICMP(type=11) / packet[IP] for packet in capture]
1210         self.pg1.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213
1214         # Client side - verify ICMP type 11 packets
1215         capture = self.pg0.get_capture(len(pkts))
1216         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1217
1218     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219         """ NAT44 handling of error responses to server packets with TTL=2 """
1220
1221         self.nat44_add_address(self.nat_addr)
1222         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1224                                                   is_inside=0)
1225
1226         # Client side - create sessions
1227         pkts = self.create_stream_in(self.pg0, self.pg1)
1228         self.pg0.add_stream(pkts)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231
1232         # Server side - generate traffic
1233         capture = self.pg1.get_capture(len(pkts))
1234         self.verify_capture_out(capture)
1235         pkts = self.create_stream_out(self.pg1, ttl=2)
1236         self.pg1.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239
1240         # Client side - simulate ICMP type 11 response
1241         capture = self.pg0.get_capture(len(pkts))
1242         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244                 ICMP(type=11) / packet[IP] for packet in capture]
1245         self.pg0.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248
1249         # Server side - verify ICMP type 11 packets
1250         capture = self.pg1.get_capture(len(pkts))
1251         self.verify_capture_out_with_icmp_errors(capture)
1252
1253     def test_ping_out_interface_from_outside(self):
1254         """ Ping NAT44 out interface from outside network """
1255
1256         self.nat44_add_address(self.nat_addr)
1257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259                                                   is_inside=0)
1260
1261         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263              ICMP(id=self.icmp_id_out, type='echo-request'))
1264         pkts = [p]
1265         self.pg1.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg1.get_capture(len(pkts))
1269         self.assertEqual(1, len(capture))
1270         packet = capture[0]
1271         try:
1272             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1276         except:
1277             self.logger.error(ppp("Unexpected or invalid packet "
1278                                   "(outside network):", packet))
1279             raise
1280
1281     def test_ping_internal_host_from_outside(self):
1282         """ Ping internal host from outside network """
1283
1284         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287                                                   is_inside=0)
1288
1289         # out2in
1290         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292                ICMP(id=self.icmp_id_out, type='echo-request'))
1293         self.pg1.add_stream(pkt)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         capture = self.pg0.get_capture(1)
1297         self.verify_capture_in(capture, self.pg0, packet_num=1)
1298         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1299
1300         # in2out
1301         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303                ICMP(id=self.icmp_id_in, type='echo-reply'))
1304         self.pg0.add_stream(pkt)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         capture = self.pg1.get_capture(1)
1308         self.verify_capture_out(capture, same_port=True, packet_num=1)
1309         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1310
1311     def test_forwarding(self):
1312         """ NAT44 forwarding test """
1313
1314         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1316                                                   is_inside=0)
1317         self.vapi.nat44_forwarding_enable_disable(1)
1318
1319         real_ip = self.pg0.remote_ip4n
1320         alias_ip = self.nat_addr_n
1321         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322                                                external_ip=alias_ip)
1323
1324         try:
1325             # in2out - static mapping match
1326
1327             pkts = self.create_stream_out(self.pg1)
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, same_port=True)
1340
1341             # in2out - no static mapping match
1342
1343             host0 = self.pg0.remote_hosts[0]
1344             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1345             try:
1346                 pkts = self.create_stream_out(self.pg1,
1347                                               dst_ip=self.pg0.remote_ip4,
1348                                               use_inside_ports=True)
1349                 self.pg1.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg0.get_capture(len(pkts))
1353                 self.verify_capture_in(capture, self.pg0)
1354
1355                 pkts = self.create_stream_in(self.pg0, self.pg1)
1356                 self.pg0.add_stream(pkts)
1357                 self.pg_enable_capture(self.pg_interfaces)
1358                 self.pg_start()
1359                 capture = self.pg1.get_capture(len(pkts))
1360                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1361                                         same_port=True)
1362             finally:
1363                 self.pg0.remote_hosts[0] = host0
1364
1365         finally:
1366             self.vapi.nat44_forwarding_enable_disable(0)
1367             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368                                                    external_ip=alias_ip,
1369                                                    is_add=0)
1370
1371     def test_static_in(self):
1372         """ 1:1 NAT initialized from inside network """
1373
1374         nat_ip = "10.0.0.10"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378
1379         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1382                                                   is_inside=0)
1383         sm = self.vapi.nat44_static_mapping_dump()
1384         self.assertEqual(len(sm), 1)
1385         self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386
1387         # in2out
1388         pkts = self.create_stream_in(self.pg0, self.pg1)
1389         self.pg0.add_stream(pkts)
1390         self.pg_enable_capture(self.pg_interfaces)
1391         self.pg_start()
1392         capture = self.pg1.get_capture(len(pkts))
1393         self.verify_capture_out(capture, nat_ip, True)
1394
1395         # out2in
1396         pkts = self.create_stream_out(self.pg1, nat_ip)
1397         self.pg1.add_stream(pkts)
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         capture = self.pg0.get_capture(len(pkts))
1401         self.verify_capture_in(capture, self.pg0)
1402
1403     def test_static_out(self):
1404         """ 1:1 NAT initialized from outside network """
1405
1406         nat_ip = "10.0.0.20"
1407         self.tcp_port_out = 6303
1408         self.udp_port_out = 6304
1409         self.icmp_id_out = 6305
1410         tag = "testTAG"
1411
1412         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1413         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1415                                                   is_inside=0)
1416         sm = self.vapi.nat44_static_mapping_dump()
1417         self.assertEqual(len(sm), 1)
1418         self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1419
1420         # out2in
1421         pkts = self.create_stream_out(self.pg1, nat_ip)
1422         self.pg1.add_stream(pkts)
1423         self.pg_enable_capture(self.pg_interfaces)
1424         self.pg_start()
1425         capture = self.pg0.get_capture(len(pkts))
1426         self.verify_capture_in(capture, self.pg0)
1427
1428         # in2out
1429         pkts = self.create_stream_in(self.pg0, self.pg1)
1430         self.pg0.add_stream(pkts)
1431         self.pg_enable_capture(self.pg_interfaces)
1432         self.pg_start()
1433         capture = self.pg1.get_capture(len(pkts))
1434         self.verify_capture_out(capture, nat_ip, True)
1435
1436     def test_static_with_port_in(self):
1437         """ 1:1 NAPT initialized from inside network """
1438
1439         self.tcp_port_out = 3606
1440         self.udp_port_out = 3607
1441         self.icmp_id_out = 3608
1442
1443         self.nat44_add_address(self.nat_addr)
1444         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445                                       self.tcp_port_in, self.tcp_port_out,
1446                                       proto=IP_PROTOS.tcp)
1447         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448                                       self.udp_port_in, self.udp_port_out,
1449                                       proto=IP_PROTOS.udp)
1450         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451                                       self.icmp_id_in, self.icmp_id_out,
1452                                       proto=IP_PROTOS.icmp)
1453         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1454         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1455                                                   is_inside=0)
1456
1457         # in2out
1458         pkts = self.create_stream_in(self.pg0, self.pg1)
1459         self.pg0.add_stream(pkts)
1460         self.pg_enable_capture(self.pg_interfaces)
1461         self.pg_start()
1462         capture = self.pg1.get_capture(len(pkts))
1463         self.verify_capture_out(capture)
1464
1465         # out2in
1466         pkts = self.create_stream_out(self.pg1)
1467         self.pg1.add_stream(pkts)
1468         self.pg_enable_capture(self.pg_interfaces)
1469         self.pg_start()
1470         capture = self.pg0.get_capture(len(pkts))
1471         self.verify_capture_in(capture, self.pg0)
1472
1473     def test_static_with_port_out(self):
1474         """ 1:1 NAPT initialized from outside network """
1475
1476         self.tcp_port_out = 30606
1477         self.udp_port_out = 30607
1478         self.icmp_id_out = 30608
1479
1480         self.nat44_add_address(self.nat_addr)
1481         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1482                                       self.tcp_port_in, self.tcp_port_out,
1483                                       proto=IP_PROTOS.tcp)
1484         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485                                       self.udp_port_in, self.udp_port_out,
1486                                       proto=IP_PROTOS.udp)
1487         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488                                       self.icmp_id_in, self.icmp_id_out,
1489                                       proto=IP_PROTOS.icmp)
1490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1492                                                   is_inside=0)
1493
1494         # out2in
1495         pkts = self.create_stream_out(self.pg1)
1496         self.pg1.add_stream(pkts)
1497         self.pg_enable_capture(self.pg_interfaces)
1498         self.pg_start()
1499         capture = self.pg0.get_capture(len(pkts))
1500         self.verify_capture_in(capture, self.pg0)
1501
1502         # in2out
1503         pkts = self.create_stream_in(self.pg0, self.pg1)
1504         self.pg0.add_stream(pkts)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         capture = self.pg1.get_capture(len(pkts))
1508         self.verify_capture_out(capture)
1509
1510     def test_static_with_port_out2(self):
1511         """ 1:1 NAPT symmetrical rule """
1512
1513         external_port = 80
1514         local_port = 8080
1515
1516         self.vapi.nat44_forwarding_enable_disable(1)
1517         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1518                                       local_port, external_port,
1519                                       proto=IP_PROTOS.tcp, out2in_only=1)
1520         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1521         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1522                                                   is_inside=0)
1523
1524         # from client to service
1525         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1526              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1527              TCP(sport=12345, dport=external_port))
1528         self.pg1.add_stream(p)
1529         self.pg_enable_capture(self.pg_interfaces)
1530         self.pg_start()
1531         capture = self.pg0.get_capture(1)
1532         p = capture[0]
1533         server = None
1534         try:
1535             ip = p[IP]
1536             tcp = p[TCP]
1537             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1538             self.assertEqual(tcp.dport, local_port)
1539             self.check_tcp_checksum(p)
1540             self.check_ip_checksum(p)
1541         except:
1542             self.logger.error(ppp("Unexpected or invalid packet:", p))
1543             raise
1544
1545         # from service back to client
1546         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1547              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1548              TCP(sport=local_port, dport=12345))
1549         self.pg0.add_stream(p)
1550         self.pg_enable_capture(self.pg_interfaces)
1551         self.pg_start()
1552         capture = self.pg1.get_capture(1)
1553         p = capture[0]
1554         try:
1555             ip = p[IP]
1556             tcp = p[TCP]
1557             self.assertEqual(ip.src, self.nat_addr)
1558             self.assertEqual(tcp.sport, external_port)
1559             self.check_tcp_checksum(p)
1560             self.check_ip_checksum(p)
1561         except:
1562             self.logger.error(ppp("Unexpected or invalid packet:", p))
1563             raise
1564
1565         # from client to server (no translation)
1566         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1567              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1568              TCP(sport=12346, dport=local_port))
1569         self.pg1.add_stream(p)
1570         self.pg_enable_capture(self.pg_interfaces)
1571         self.pg_start()
1572         capture = self.pg0.get_capture(1)
1573         p = capture[0]
1574         server = None
1575         try:
1576             ip = p[IP]
1577             tcp = p[TCP]
1578             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1579             self.assertEqual(tcp.dport, local_port)
1580             self.check_tcp_checksum(p)
1581             self.check_ip_checksum(p)
1582         except:
1583             self.logger.error(ppp("Unexpected or invalid packet:", p))
1584             raise
1585
1586         # from service back to client (no translation)
1587         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1588              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1589              TCP(sport=local_port, dport=12346))
1590         self.pg0.add_stream(p)
1591         self.pg_enable_capture(self.pg_interfaces)
1592         self.pg_start()
1593         capture = self.pg1.get_capture(1)
1594         p = capture[0]
1595         try:
1596             ip = p[IP]
1597             tcp = p[TCP]
1598             self.assertEqual(ip.src, self.pg0.remote_ip4)
1599             self.assertEqual(tcp.sport, local_port)
1600             self.check_tcp_checksum(p)
1601             self.check_ip_checksum(p)
1602         except:
1603             self.logger.error(ppp("Unexpected or invalid packet:", p))
1604             raise
1605
1606     def test_static_vrf_aware(self):
1607         """ 1:1 NAT VRF awareness """
1608
1609         nat_ip1 = "10.0.0.30"
1610         nat_ip2 = "10.0.0.40"
1611         self.tcp_port_out = 6303
1612         self.udp_port_out = 6304
1613         self.icmp_id_out = 6305
1614
1615         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1616                                       vrf_id=10)
1617         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1618                                       vrf_id=10)
1619         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1620                                                   is_inside=0)
1621         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1622         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1623
1624         # inside interface VRF match NAT44 static mapping VRF
1625         pkts = self.create_stream_in(self.pg4, self.pg3)
1626         self.pg4.add_stream(pkts)
1627         self.pg_enable_capture(self.pg_interfaces)
1628         self.pg_start()
1629         capture = self.pg3.get_capture(len(pkts))
1630         self.verify_capture_out(capture, nat_ip1, True)
1631
1632         # inside interface VRF don't match NAT44 static mapping VRF (packets
1633         # are dropped)
1634         pkts = self.create_stream_in(self.pg0, self.pg3)
1635         self.pg0.add_stream(pkts)
1636         self.pg_enable_capture(self.pg_interfaces)
1637         self.pg_start()
1638         self.pg3.assert_nothing_captured()
1639
1640     def test_identity_nat(self):
1641         """ Identity NAT """
1642
1643         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1644         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1645         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1646                                                   is_inside=0)
1647
1648         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1649              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1650              TCP(sport=12345, dport=56789))
1651         self.pg1.add_stream(p)
1652         self.pg_enable_capture(self.pg_interfaces)
1653         self.pg_start()
1654         capture = self.pg0.get_capture(1)
1655         p = capture[0]
1656         try:
1657             ip = p[IP]
1658             tcp = p[TCP]
1659             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1660             self.assertEqual(ip.src, self.pg1.remote_ip4)
1661             self.assertEqual(tcp.dport, 56789)
1662             self.assertEqual(tcp.sport, 12345)
1663             self.check_tcp_checksum(p)
1664             self.check_ip_checksum(p)
1665         except:
1666             self.logger.error(ppp("Unexpected or invalid packet:", p))
1667             raise
1668
1669     def test_static_lb(self):
1670         """ NAT44 local service load balancing """
1671         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1672         external_port = 80
1673         local_port = 8080
1674         server1 = self.pg0.remote_hosts[0]
1675         server2 = self.pg0.remote_hosts[1]
1676
1677         locals = [{'addr': server1.ip4n,
1678                    'port': local_port,
1679                    'probability': 70},
1680                   {'addr': server2.ip4n,
1681                    'port': local_port,
1682                    'probability': 30}]
1683
1684         self.nat44_add_address(self.nat_addr)
1685         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1686                                                   external_port,
1687                                                   IP_PROTOS.tcp,
1688                                                   local_num=len(locals),
1689                                                   locals=locals)
1690         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1691         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692                                                   is_inside=0)
1693
1694         # from client to service
1695         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1696              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1697              TCP(sport=12345, dport=external_port))
1698         self.pg1.add_stream(p)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         capture = self.pg0.get_capture(1)
1702         p = capture[0]
1703         server = None
1704         try:
1705             ip = p[IP]
1706             tcp = p[TCP]
1707             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1708             if ip.dst == server1.ip4:
1709                 server = server1
1710             else:
1711                 server = server2
1712             self.assertEqual(tcp.dport, local_port)
1713             self.check_tcp_checksum(p)
1714             self.check_ip_checksum(p)
1715         except:
1716             self.logger.error(ppp("Unexpected or invalid packet:", p))
1717             raise
1718
1719         # from service back to client
1720         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1721              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1722              TCP(sport=local_port, dport=12345))
1723         self.pg0.add_stream(p)
1724         self.pg_enable_capture(self.pg_interfaces)
1725         self.pg_start()
1726         capture = self.pg1.get_capture(1)
1727         p = capture[0]
1728         try:
1729             ip = p[IP]
1730             tcp = p[TCP]
1731             self.assertEqual(ip.src, self.nat_addr)
1732             self.assertEqual(tcp.sport, external_port)
1733             self.check_tcp_checksum(p)
1734             self.check_ip_checksum(p)
1735         except:
1736             self.logger.error(ppp("Unexpected or invalid packet:", p))
1737             raise
1738
1739         # multiple clients
1740         server1_n = 0
1741         server2_n = 0
1742         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1743         pkts = []
1744         for client in clients:
1745             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1746                  IP(src=client, dst=self.nat_addr) /
1747                  TCP(sport=12345, dport=external_port))
1748             pkts.append(p)
1749         self.pg1.add_stream(pkts)
1750         self.pg_enable_capture(self.pg_interfaces)
1751         self.pg_start()
1752         capture = self.pg0.get_capture(len(pkts))
1753         for p in capture:
1754             if p[IP].dst == server1.ip4:
1755                 server1_n += 1
1756             else:
1757                 server2_n += 1
1758         self.assertTrue(server1_n > server2_n)
1759
1760     def test_static_lb_2(self):
1761         """ NAT44 local service load balancing (asymmetrical rule) """
1762         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1763         external_port = 80
1764         local_port = 8080
1765         server1 = self.pg0.remote_hosts[0]
1766         server2 = self.pg0.remote_hosts[1]
1767
1768         locals = [{'addr': server1.ip4n,
1769                    'port': local_port,
1770                    'probability': 70},
1771                   {'addr': server2.ip4n,
1772                    'port': local_port,
1773                    'probability': 30}]
1774
1775         self.vapi.nat44_forwarding_enable_disable(1)
1776         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1777                                                   external_port,
1778                                                   IP_PROTOS.tcp,
1779                                                   out2in_only=1,
1780                                                   local_num=len(locals),
1781                                                   locals=locals)
1782         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1783         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1784                                                   is_inside=0)
1785
1786         # from client to service
1787         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1788              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1789              TCP(sport=12345, dport=external_port))
1790         self.pg1.add_stream(p)
1791         self.pg_enable_capture(self.pg_interfaces)
1792         self.pg_start()
1793         capture = self.pg0.get_capture(1)
1794         p = capture[0]
1795         server = None
1796         try:
1797             ip = p[IP]
1798             tcp = p[TCP]
1799             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1800             if ip.dst == server1.ip4:
1801                 server = server1
1802             else:
1803                 server = server2
1804             self.assertEqual(tcp.dport, local_port)
1805             self.check_tcp_checksum(p)
1806             self.check_ip_checksum(p)
1807         except:
1808             self.logger.error(ppp("Unexpected or invalid packet:", p))
1809             raise
1810
1811         # from service back to client
1812         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1814              TCP(sport=local_port, dport=12345))
1815         self.pg0.add_stream(p)
1816         self.pg_enable_capture(self.pg_interfaces)
1817         self.pg_start()
1818         capture = self.pg1.get_capture(1)
1819         p = capture[0]
1820         try:
1821             ip = p[IP]
1822             tcp = p[TCP]
1823             self.assertEqual(ip.src, self.nat_addr)
1824             self.assertEqual(tcp.sport, external_port)
1825             self.check_tcp_checksum(p)
1826             self.check_ip_checksum(p)
1827         except:
1828             self.logger.error(ppp("Unexpected or invalid packet:", p))
1829             raise
1830
1831         # from client to server (no translation)
1832         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1833              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1834              TCP(sport=12346, dport=local_port))
1835         self.pg1.add_stream(p)
1836         self.pg_enable_capture(self.pg_interfaces)
1837         self.pg_start()
1838         capture = self.pg0.get_capture(1)
1839         p = capture[0]
1840         server = None
1841         try:
1842             ip = p[IP]
1843             tcp = p[TCP]
1844             self.assertEqual(ip.dst, server1.ip4)
1845             self.assertEqual(tcp.dport, local_port)
1846             self.check_tcp_checksum(p)
1847             self.check_ip_checksum(p)
1848         except:
1849             self.logger.error(ppp("Unexpected or invalid packet:", p))
1850             raise
1851
1852         # from service back to client (no translation)
1853         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1854              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1855              TCP(sport=local_port, dport=12346))
1856         self.pg0.add_stream(p)
1857         self.pg_enable_capture(self.pg_interfaces)
1858         self.pg_start()
1859         capture = self.pg1.get_capture(1)
1860         p = capture[0]
1861         try:
1862             ip = p[IP]
1863             tcp = p[TCP]
1864             self.assertEqual(ip.src, server1.ip4)
1865             self.assertEqual(tcp.sport, local_port)
1866             self.check_tcp_checksum(p)
1867             self.check_ip_checksum(p)
1868         except:
1869             self.logger.error(ppp("Unexpected or invalid packet:", p))
1870             raise
1871
1872     def test_multiple_inside_interfaces(self):
1873         """ NAT44 multiple non-overlapping address space inside interfaces """
1874
1875         self.nat44_add_address(self.nat_addr)
1876         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1878         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1879                                                   is_inside=0)
1880
1881         # between two NAT44 inside interfaces (no translation)
1882         pkts = self.create_stream_in(self.pg0, self.pg1)
1883         self.pg0.add_stream(pkts)
1884         self.pg_enable_capture(self.pg_interfaces)
1885         self.pg_start()
1886         capture = self.pg1.get_capture(len(pkts))
1887         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1888
1889         # from NAT44 inside to interface without NAT44 feature (no translation)
1890         pkts = self.create_stream_in(self.pg0, self.pg2)
1891         self.pg0.add_stream(pkts)
1892         self.pg_enable_capture(self.pg_interfaces)
1893         self.pg_start()
1894         capture = self.pg2.get_capture(len(pkts))
1895         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1896
1897         # in2out 1st interface
1898         pkts = self.create_stream_in(self.pg0, self.pg3)
1899         self.pg0.add_stream(pkts)
1900         self.pg_enable_capture(self.pg_interfaces)
1901         self.pg_start()
1902         capture = self.pg3.get_capture(len(pkts))
1903         self.verify_capture_out(capture)
1904
1905         # out2in 1st interface
1906         pkts = self.create_stream_out(self.pg3)
1907         self.pg3.add_stream(pkts)
1908         self.pg_enable_capture(self.pg_interfaces)
1909         self.pg_start()
1910         capture = self.pg0.get_capture(len(pkts))
1911         self.verify_capture_in(capture, self.pg0)
1912
1913         # in2out 2nd interface
1914         pkts = self.create_stream_in(self.pg1, self.pg3)
1915         self.pg1.add_stream(pkts)
1916         self.pg_enable_capture(self.pg_interfaces)
1917         self.pg_start()
1918         capture = self.pg3.get_capture(len(pkts))
1919         self.verify_capture_out(capture)
1920
1921         # out2in 2nd interface
1922         pkts = self.create_stream_out(self.pg3)
1923         self.pg3.add_stream(pkts)
1924         self.pg_enable_capture(self.pg_interfaces)
1925         self.pg_start()
1926         capture = self.pg1.get_capture(len(pkts))
1927         self.verify_capture_in(capture, self.pg1)
1928
1929     def test_inside_overlapping_interfaces(self):
1930         """ NAT44 multiple inside interfaces with overlapping address space """
1931
1932         static_nat_ip = "10.0.0.10"
1933         self.nat44_add_address(self.nat_addr)
1934         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1935                                                   is_inside=0)
1936         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1937         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1938         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1939         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1940                                       vrf_id=20)
1941
1942         # between NAT44 inside interfaces with same VRF (no translation)
1943         pkts = self.create_stream_in(self.pg4, self.pg5)
1944         self.pg4.add_stream(pkts)
1945         self.pg_enable_capture(self.pg_interfaces)
1946         self.pg_start()
1947         capture = self.pg5.get_capture(len(pkts))
1948         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1949
1950         # between NAT44 inside interfaces with different VRF (hairpinning)
1951         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1952              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1953              TCP(sport=1234, dport=5678))
1954         self.pg4.add_stream(p)
1955         self.pg_enable_capture(self.pg_interfaces)
1956         self.pg_start()
1957         capture = self.pg6.get_capture(1)
1958         p = capture[0]
1959         try:
1960             ip = p[IP]
1961             tcp = p[TCP]
1962             self.assertEqual(ip.src, self.nat_addr)
1963             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1964             self.assertNotEqual(tcp.sport, 1234)
1965             self.assertEqual(tcp.dport, 5678)
1966         except:
1967             self.logger.error(ppp("Unexpected or invalid packet:", p))
1968             raise
1969
1970         # in2out 1st interface
1971         pkts = self.create_stream_in(self.pg4, self.pg3)
1972         self.pg4.add_stream(pkts)
1973         self.pg_enable_capture(self.pg_interfaces)
1974         self.pg_start()
1975         capture = self.pg3.get_capture(len(pkts))
1976         self.verify_capture_out(capture)
1977
1978         # out2in 1st interface
1979         pkts = self.create_stream_out(self.pg3)
1980         self.pg3.add_stream(pkts)
1981         self.pg_enable_capture(self.pg_interfaces)
1982         self.pg_start()
1983         capture = self.pg4.get_capture(len(pkts))
1984         self.verify_capture_in(capture, self.pg4)
1985
1986         # in2out 2nd interface
1987         pkts = self.create_stream_in(self.pg5, self.pg3)
1988         self.pg5.add_stream(pkts)
1989         self.pg_enable_capture(self.pg_interfaces)
1990         self.pg_start()
1991         capture = self.pg3.get_capture(len(pkts))
1992         self.verify_capture_out(capture)
1993
1994         # out2in 2nd interface
1995         pkts = self.create_stream_out(self.pg3)
1996         self.pg3.add_stream(pkts)
1997         self.pg_enable_capture(self.pg_interfaces)
1998         self.pg_start()
1999         capture = self.pg5.get_capture(len(pkts))
2000         self.verify_capture_in(capture, self.pg5)
2001
2002         # pg5 session dump
2003         addresses = self.vapi.nat44_address_dump()
2004         self.assertEqual(len(addresses), 1)
2005         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2006         self.assertEqual(len(sessions), 3)
2007         for session in sessions:
2008             self.assertFalse(session.is_static)
2009             self.assertEqual(session.inside_ip_address[0:4],
2010                              self.pg5.remote_ip4n)
2011             self.assertEqual(session.outside_ip_address,
2012                              addresses[0].ip_address)
2013         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2014         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2015         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2016         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2017         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2018         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2019         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2020         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2021         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2022
2023         # in2out 3rd interface
2024         pkts = self.create_stream_in(self.pg6, self.pg3)
2025         self.pg6.add_stream(pkts)
2026         self.pg_enable_capture(self.pg_interfaces)
2027         self.pg_start()
2028         capture = self.pg3.get_capture(len(pkts))
2029         self.verify_capture_out(capture, static_nat_ip, True)
2030
2031         # out2in 3rd interface
2032         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2033         self.pg3.add_stream(pkts)
2034         self.pg_enable_capture(self.pg_interfaces)
2035         self.pg_start()
2036         capture = self.pg6.get_capture(len(pkts))
2037         self.verify_capture_in(capture, self.pg6)
2038
2039         # general user and session dump verifications
2040         users = self.vapi.nat44_user_dump()
2041         self.assertTrue(len(users) >= 3)
2042         addresses = self.vapi.nat44_address_dump()
2043         self.assertEqual(len(addresses), 1)
2044         for user in users:
2045             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2046                                                          user.vrf_id)
2047             for session in sessions:
2048                 self.assertEqual(user.ip_address, session.inside_ip_address)
2049                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2050                 self.assertTrue(session.protocol in
2051                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2052                                  IP_PROTOS.icmp])
2053
2054         # pg4 session dump
2055         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2056         self.assertTrue(len(sessions) >= 4)
2057         for session in sessions:
2058             self.assertFalse(session.is_static)
2059             self.assertEqual(session.inside_ip_address[0:4],
2060                              self.pg4.remote_ip4n)
2061             self.assertEqual(session.outside_ip_address,
2062                              addresses[0].ip_address)
2063
2064         # pg6 session dump
2065         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2066         self.assertTrue(len(sessions) >= 3)
2067         for session in sessions:
2068             self.assertTrue(session.is_static)
2069             self.assertEqual(session.inside_ip_address[0:4],
2070                              self.pg6.remote_ip4n)
2071             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2072                              map(int, static_nat_ip.split('.')))
2073             self.assertTrue(session.inside_port in
2074                             [self.tcp_port_in, self.udp_port_in,
2075                              self.icmp_id_in])
2076
2077     def test_hairpinning(self):
2078         """ NAT44 hairpinning - 1:1 NAPT """
2079
2080         host = self.pg0.remote_hosts[0]
2081         server = self.pg0.remote_hosts[1]
2082         host_in_port = 1234
2083         host_out_port = 0
2084         server_in_port = 5678
2085         server_out_port = 8765
2086
2087         self.nat44_add_address(self.nat_addr)
2088         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2089         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2090                                                   is_inside=0)
2091         # add static mapping for server
2092         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2093                                       server_in_port, server_out_port,
2094                                       proto=IP_PROTOS.tcp)
2095
2096         # send packet from host to server
2097         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2098              IP(src=host.ip4, dst=self.nat_addr) /
2099              TCP(sport=host_in_port, dport=server_out_port))
2100         self.pg0.add_stream(p)
2101         self.pg_enable_capture(self.pg_interfaces)
2102         self.pg_start()
2103         capture = self.pg0.get_capture(1)
2104         p = capture[0]
2105         try:
2106             ip = p[IP]
2107             tcp = p[TCP]
2108             self.assertEqual(ip.src, self.nat_addr)
2109             self.assertEqual(ip.dst, server.ip4)
2110             self.assertNotEqual(tcp.sport, host_in_port)
2111             self.assertEqual(tcp.dport, server_in_port)
2112             self.check_tcp_checksum(p)
2113             host_out_port = tcp.sport
2114         except:
2115             self.logger.error(ppp("Unexpected or invalid packet:", p))
2116             raise
2117
2118         # send reply from server to host
2119         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2120              IP(src=server.ip4, dst=self.nat_addr) /
2121              TCP(sport=server_in_port, dport=host_out_port))
2122         self.pg0.add_stream(p)
2123         self.pg_enable_capture(self.pg_interfaces)
2124         self.pg_start()
2125         capture = self.pg0.get_capture(1)
2126         p = capture[0]
2127         try:
2128             ip = p[IP]
2129             tcp = p[TCP]
2130             self.assertEqual(ip.src, self.nat_addr)
2131             self.assertEqual(ip.dst, host.ip4)
2132             self.assertEqual(tcp.sport, server_out_port)
2133             self.assertEqual(tcp.dport, host_in_port)
2134             self.check_tcp_checksum(p)
2135         except:
2136             self.logger.error(ppp("Unexpected or invalid packet:", p))
2137             raise
2138
2139     def test_hairpinning2(self):
2140         """ NAT44 hairpinning - 1:1 NAT"""
2141
2142         server1_nat_ip = "10.0.0.10"
2143         server2_nat_ip = "10.0.0.11"
2144         host = self.pg0.remote_hosts[0]
2145         server1 = self.pg0.remote_hosts[1]
2146         server2 = self.pg0.remote_hosts[2]
2147         server_tcp_port = 22
2148         server_udp_port = 20
2149
2150         self.nat44_add_address(self.nat_addr)
2151         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2152         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2153                                                   is_inside=0)
2154
2155         # add static mapping for servers
2156         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2157         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2158
2159         # host to server1
2160         pkts = []
2161         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2162              IP(src=host.ip4, dst=server1_nat_ip) /
2163              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2164         pkts.append(p)
2165         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2166              IP(src=host.ip4, dst=server1_nat_ip) /
2167              UDP(sport=self.udp_port_in, dport=server_udp_port))
2168         pkts.append(p)
2169         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2170              IP(src=host.ip4, dst=server1_nat_ip) /
2171              ICMP(id=self.icmp_id_in, type='echo-request'))
2172         pkts.append(p)
2173         self.pg0.add_stream(pkts)
2174         self.pg_enable_capture(self.pg_interfaces)
2175         self.pg_start()
2176         capture = self.pg0.get_capture(len(pkts))
2177         for packet in capture:
2178             try:
2179                 self.assertEqual(packet[IP].src, self.nat_addr)
2180                 self.assertEqual(packet[IP].dst, server1.ip4)
2181                 if packet.haslayer(TCP):
2182                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2183                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2184                     self.tcp_port_out = packet[TCP].sport
2185                     self.check_tcp_checksum(packet)
2186                 elif packet.haslayer(UDP):
2187                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2188                     self.assertEqual(packet[UDP].dport, server_udp_port)
2189                     self.udp_port_out = packet[UDP].sport
2190                 else:
2191                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2192                     self.icmp_id_out = packet[ICMP].id
2193             except:
2194                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2195                 raise
2196
2197         # server1 to host
2198         pkts = []
2199         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2200              IP(src=server1.ip4, dst=self.nat_addr) /
2201              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2202         pkts.append(p)
2203         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2204              IP(src=server1.ip4, dst=self.nat_addr) /
2205              UDP(sport=server_udp_port, dport=self.udp_port_out))
2206         pkts.append(p)
2207         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2208              IP(src=server1.ip4, dst=self.nat_addr) /
2209              ICMP(id=self.icmp_id_out, type='echo-reply'))
2210         pkts.append(p)
2211         self.pg0.add_stream(pkts)
2212         self.pg_enable_capture(self.pg_interfaces)
2213         self.pg_start()
2214         capture = self.pg0.get_capture(len(pkts))
2215         for packet in capture:
2216             try:
2217                 self.assertEqual(packet[IP].src, server1_nat_ip)
2218                 self.assertEqual(packet[IP].dst, host.ip4)
2219                 if packet.haslayer(TCP):
2220                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2221                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2222                     self.check_tcp_checksum(packet)
2223                 elif packet.haslayer(UDP):
2224                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2225                     self.assertEqual(packet[UDP].sport, server_udp_port)
2226                 else:
2227                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2228             except:
2229                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2230                 raise
2231
2232         # server2 to server1
2233         pkts = []
2234         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2235              IP(src=server2.ip4, dst=server1_nat_ip) /
2236              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2237         pkts.append(p)
2238         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239              IP(src=server2.ip4, dst=server1_nat_ip) /
2240              UDP(sport=self.udp_port_in, dport=server_udp_port))
2241         pkts.append(p)
2242         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2243              IP(src=server2.ip4, dst=server1_nat_ip) /
2244              ICMP(id=self.icmp_id_in, type='echo-request'))
2245         pkts.append(p)
2246         self.pg0.add_stream(pkts)
2247         self.pg_enable_capture(self.pg_interfaces)
2248         self.pg_start()
2249         capture = self.pg0.get_capture(len(pkts))
2250         for packet in capture:
2251             try:
2252                 self.assertEqual(packet[IP].src, server2_nat_ip)
2253                 self.assertEqual(packet[IP].dst, server1.ip4)
2254                 if packet.haslayer(TCP):
2255                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2256                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2257                     self.tcp_port_out = packet[TCP].sport
2258                     self.check_tcp_checksum(packet)
2259                 elif packet.haslayer(UDP):
2260                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2261                     self.assertEqual(packet[UDP].dport, server_udp_port)
2262                     self.udp_port_out = packet[UDP].sport
2263                 else:
2264                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2265                     self.icmp_id_out = packet[ICMP].id
2266             except:
2267                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2268                 raise
2269
2270         # server1 to server2
2271         pkts = []
2272         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2273              IP(src=server1.ip4, dst=server2_nat_ip) /
2274              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2275         pkts.append(p)
2276         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2277              IP(src=server1.ip4, dst=server2_nat_ip) /
2278              UDP(sport=server_udp_port, dport=self.udp_port_out))
2279         pkts.append(p)
2280         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2281              IP(src=server1.ip4, dst=server2_nat_ip) /
2282              ICMP(id=self.icmp_id_out, type='echo-reply'))
2283         pkts.append(p)
2284         self.pg0.add_stream(pkts)
2285         self.pg_enable_capture(self.pg_interfaces)
2286         self.pg_start()
2287         capture = self.pg0.get_capture(len(pkts))
2288         for packet in capture:
2289             try:
2290                 self.assertEqual(packet[IP].src, server1_nat_ip)
2291                 self.assertEqual(packet[IP].dst, server2.ip4)
2292                 if packet.haslayer(TCP):
2293                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2294                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2295                     self.check_tcp_checksum(packet)
2296                 elif packet.haslayer(UDP):
2297                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2298                     self.assertEqual(packet[UDP].sport, server_udp_port)
2299                 else:
2300                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2301             except:
2302                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2303                 raise
2304
2305     def test_max_translations_per_user(self):
2306         """ MAX translations per user - recycle the least recently used """
2307
2308         self.nat44_add_address(self.nat_addr)
2309         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2310         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2311                                                   is_inside=0)
2312
2313         # get maximum number of translations per user
2314         nat44_config = self.vapi.nat_show_config()
2315
2316         # send more than maximum number of translations per user packets
2317         pkts_num = nat44_config.max_translations_per_user + 5
2318         pkts = []
2319         for port in range(0, pkts_num):
2320             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2321                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2322                  TCP(sport=1025 + port))
2323             pkts.append(p)
2324         self.pg0.add_stream(pkts)
2325         self.pg_enable_capture(self.pg_interfaces)
2326         self.pg_start()
2327
2328         # verify number of translated packet
2329         self.pg1.get_capture(pkts_num)
2330
2331     def test_interface_addr(self):
2332         """ Acquire NAT44 addresses from interface """
2333         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2334
2335         # no address in NAT pool
2336         adresses = self.vapi.nat44_address_dump()
2337         self.assertEqual(0, len(adresses))
2338
2339         # configure interface address and check NAT address pool
2340         self.pg7.config_ip4()
2341         adresses = self.vapi.nat44_address_dump()
2342         self.assertEqual(1, len(adresses))
2343         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2344
2345         # remove interface address and check NAT address pool
2346         self.pg7.unconfig_ip4()
2347         adresses = self.vapi.nat44_address_dump()
2348         self.assertEqual(0, len(adresses))
2349
2350     def test_interface_addr_static_mapping(self):
2351         """ Static mapping with addresses from interface """
2352         tag = "testTAG"
2353
2354         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2355         self.nat44_add_static_mapping(
2356             '1.2.3.4',
2357             external_sw_if_index=self.pg7.sw_if_index,
2358             tag=tag)
2359
2360         # static mappings with external interface
2361         static_mappings = self.vapi.nat44_static_mapping_dump()
2362         self.assertEqual(1, len(static_mappings))
2363         self.assertEqual(self.pg7.sw_if_index,
2364                          static_mappings[0].external_sw_if_index)
2365         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2366
2367         # configure interface address and check static mappings
2368         self.pg7.config_ip4()
2369         static_mappings = self.vapi.nat44_static_mapping_dump()
2370         self.assertEqual(1, len(static_mappings))
2371         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2372                          self.pg7.local_ip4n)
2373         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2374         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2375
2376         # remove interface address and check static mappings
2377         self.pg7.unconfig_ip4()
2378         static_mappings = self.vapi.nat44_static_mapping_dump()
2379         self.assertEqual(0, len(static_mappings))
2380
2381     def test_interface_addr_identity_nat(self):
2382         """ Identity NAT with addresses from interface """
2383
2384         port = 53053
2385         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2386         self.vapi.nat44_add_del_identity_mapping(
2387             sw_if_index=self.pg7.sw_if_index,
2388             port=port,
2389             protocol=IP_PROTOS.tcp,
2390             addr_only=0)
2391
2392         # identity mappings with external interface
2393         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2394         self.assertEqual(1, len(identity_mappings))
2395         self.assertEqual(self.pg7.sw_if_index,
2396                          identity_mappings[0].sw_if_index)
2397
2398         # configure interface address and check identity mappings
2399         self.pg7.config_ip4()
2400         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2401         self.assertEqual(1, len(identity_mappings))
2402         self.assertEqual(identity_mappings[0].ip_address,
2403                          self.pg7.local_ip4n)
2404         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2405         self.assertEqual(port, identity_mappings[0].port)
2406         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2407
2408         # remove interface address and check identity mappings
2409         self.pg7.unconfig_ip4()
2410         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2411         self.assertEqual(0, len(identity_mappings))
2412
2413     def test_ipfix_nat44_sess(self):
2414         """ IPFIX logging NAT44 session created/delted """
2415         self.ipfix_domain_id = 10
2416         self.ipfix_src_port = 20202
2417         colector_port = 30303
2418         bind_layers(UDP, IPFIX, dport=30303)
2419         self.nat44_add_address(self.nat_addr)
2420         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2421         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2422                                                   is_inside=0)
2423         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2424                                      src_address=self.pg3.local_ip4n,
2425                                      path_mtu=512,
2426                                      template_interval=10,
2427                                      collector_port=colector_port)
2428         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2429                             src_port=self.ipfix_src_port)
2430
2431         pkts = self.create_stream_in(self.pg0, self.pg1)
2432         self.pg0.add_stream(pkts)
2433         self.pg_enable_capture(self.pg_interfaces)
2434         self.pg_start()
2435         capture = self.pg1.get_capture(len(pkts))
2436         self.verify_capture_out(capture)
2437         self.nat44_add_address(self.nat_addr, is_add=0)
2438         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2439         capture = self.pg3.get_capture(9)
2440         ipfix = IPFIXDecoder()
2441         # first load template
2442         for p in capture:
2443             self.assertTrue(p.haslayer(IPFIX))
2444             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2445             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2446             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2447             self.assertEqual(p[UDP].dport, colector_port)
2448             self.assertEqual(p[IPFIX].observationDomainID,
2449                              self.ipfix_domain_id)
2450             if p.haslayer(Template):
2451                 ipfix.add_template(p.getlayer(Template))
2452         # verify events in data set
2453         for p in capture:
2454             if p.haslayer(Data):
2455                 data = ipfix.decode_data_set(p.getlayer(Set))
2456                 self.verify_ipfix_nat44_ses(data)
2457
2458     def test_ipfix_addr_exhausted(self):
2459         """ IPFIX logging NAT addresses exhausted """
2460         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2461         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2462                                                   is_inside=0)
2463         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2464                                      src_address=self.pg3.local_ip4n,
2465                                      path_mtu=512,
2466                                      template_interval=10)
2467         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2468                             src_port=self.ipfix_src_port)
2469
2470         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2471              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2472              TCP(sport=3025))
2473         self.pg0.add_stream(p)
2474         self.pg_enable_capture(self.pg_interfaces)
2475         self.pg_start()
2476         capture = self.pg1.get_capture(0)
2477         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2478         capture = self.pg3.get_capture(9)
2479         ipfix = IPFIXDecoder()
2480         # first load template
2481         for p in capture:
2482             self.assertTrue(p.haslayer(IPFIX))
2483             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2484             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2485             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2486             self.assertEqual(p[UDP].dport, 4739)
2487             self.assertEqual(p[IPFIX].observationDomainID,
2488                              self.ipfix_domain_id)
2489             if p.haslayer(Template):
2490                 ipfix.add_template(p.getlayer(Template))
2491         # verify events in data set
2492         for p in capture:
2493             if p.haslayer(Data):
2494                 data = ipfix.decode_data_set(p.getlayer(Set))
2495                 self.verify_ipfix_addr_exhausted(data)
2496
2497     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2498     def test_ipfix_max_sessions(self):
2499         """ IPFIX logging maximum session entries exceeded """
2500         self.nat44_add_address(self.nat_addr)
2501         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2502         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2503                                                   is_inside=0)
2504
2505         nat44_config = self.vapi.nat_show_config()
2506         max_sessions = 10 * nat44_config.translation_buckets
2507
2508         pkts = []
2509         for i in range(0, max_sessions):
2510             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2511             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512                  IP(src=src, dst=self.pg1.remote_ip4) /
2513                  TCP(sport=1025))
2514             pkts.append(p)
2515         self.pg0.add_stream(pkts)
2516         self.pg_enable_capture(self.pg_interfaces)
2517         self.pg_start()
2518
2519         self.pg1.get_capture(max_sessions)
2520         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2521                                      src_address=self.pg3.local_ip4n,
2522                                      path_mtu=512,
2523                                      template_interval=10)
2524         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2525                             src_port=self.ipfix_src_port)
2526
2527         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2528              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2529              TCP(sport=1025))
2530         self.pg0.add_stream(p)
2531         self.pg_enable_capture(self.pg_interfaces)
2532         self.pg_start()
2533         self.pg1.get_capture(0)
2534         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2535         capture = self.pg3.get_capture(9)
2536         ipfix = IPFIXDecoder()
2537         # first load template
2538         for p in capture:
2539             self.assertTrue(p.haslayer(IPFIX))
2540             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543             self.assertEqual(p[UDP].dport, 4739)
2544             self.assertEqual(p[IPFIX].observationDomainID,
2545                              self.ipfix_domain_id)
2546             if p.haslayer(Template):
2547                 ipfix.add_template(p.getlayer(Template))
2548         # verify events in data set
2549         for p in capture:
2550             if p.haslayer(Data):
2551                 data = ipfix.decode_data_set(p.getlayer(Set))
2552                 self.verify_ipfix_max_sessions(data, max_sessions)
2553
2554     def test_pool_addr_fib(self):
2555         """ NAT44 add pool addresses to FIB """
2556         static_addr = '10.0.0.10'
2557         self.nat44_add_address(self.nat_addr)
2558         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2559         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2560                                                   is_inside=0)
2561         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2562
2563         # NAT44 address
2564         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2565              ARP(op=ARP.who_has, pdst=self.nat_addr,
2566                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2567         self.pg1.add_stream(p)
2568         self.pg_enable_capture(self.pg_interfaces)
2569         self.pg_start()
2570         capture = self.pg1.get_capture(1)
2571         self.assertTrue(capture[0].haslayer(ARP))
2572         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2573
2574         # 1:1 NAT address
2575         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2576              ARP(op=ARP.who_has, pdst=static_addr,
2577                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2578         self.pg1.add_stream(p)
2579         self.pg_enable_capture(self.pg_interfaces)
2580         self.pg_start()
2581         capture = self.pg1.get_capture(1)
2582         self.assertTrue(capture[0].haslayer(ARP))
2583         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2584
2585         # send ARP to non-NAT44 interface
2586         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2587              ARP(op=ARP.who_has, pdst=self.nat_addr,
2588                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2589         self.pg2.add_stream(p)
2590         self.pg_enable_capture(self.pg_interfaces)
2591         self.pg_start()
2592         capture = self.pg1.get_capture(0)
2593
2594         # remove addresses and verify
2595         self.nat44_add_address(self.nat_addr, is_add=0)
2596         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2597                                       is_add=0)
2598
2599         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2600              ARP(op=ARP.who_has, pdst=self.nat_addr,
2601                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2602         self.pg1.add_stream(p)
2603         self.pg_enable_capture(self.pg_interfaces)
2604         self.pg_start()
2605         capture = self.pg1.get_capture(0)
2606
2607         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2608              ARP(op=ARP.who_has, pdst=static_addr,
2609                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2610         self.pg1.add_stream(p)
2611         self.pg_enable_capture(self.pg_interfaces)
2612         self.pg_start()
2613         capture = self.pg1.get_capture(0)
2614
2615     def test_vrf_mode(self):
2616         """ NAT44 tenant VRF aware address pool mode """
2617
2618         vrf_id1 = 1
2619         vrf_id2 = 2
2620         nat_ip1 = "10.0.0.10"
2621         nat_ip2 = "10.0.0.11"
2622
2623         self.pg0.unconfig_ip4()
2624         self.pg1.unconfig_ip4()
2625         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2626         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2627         self.pg0.set_table_ip4(vrf_id1)
2628         self.pg1.set_table_ip4(vrf_id2)
2629         self.pg0.config_ip4()
2630         self.pg1.config_ip4()
2631
2632         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2633         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2634         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2635         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2636         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2637                                                   is_inside=0)
2638
2639         # first VRF
2640         pkts = self.create_stream_in(self.pg0, self.pg2)
2641         self.pg0.add_stream(pkts)
2642         self.pg_enable_capture(self.pg_interfaces)
2643         self.pg_start()
2644         capture = self.pg2.get_capture(len(pkts))
2645         self.verify_capture_out(capture, nat_ip1)
2646
2647         # second VRF
2648         pkts = self.create_stream_in(self.pg1, self.pg2)
2649         self.pg1.add_stream(pkts)
2650         self.pg_enable_capture(self.pg_interfaces)
2651         self.pg_start()
2652         capture = self.pg2.get_capture(len(pkts))
2653         self.verify_capture_out(capture, nat_ip2)
2654
2655         self.pg0.unconfig_ip4()
2656         self.pg1.unconfig_ip4()
2657         self.pg0.set_table_ip4(0)
2658         self.pg1.set_table_ip4(0)
2659         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2660         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2661
2662     def test_vrf_feature_independent(self):
2663         """ NAT44 tenant VRF independent address pool mode """
2664
2665         nat_ip1 = "10.0.0.10"
2666         nat_ip2 = "10.0.0.11"
2667
2668         self.nat44_add_address(nat_ip1)
2669         self.nat44_add_address(nat_ip2, vrf_id=99)
2670         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2671         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2672         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2673                                                   is_inside=0)
2674
2675         # first VRF
2676         pkts = self.create_stream_in(self.pg0, self.pg2)
2677         self.pg0.add_stream(pkts)
2678         self.pg_enable_capture(self.pg_interfaces)
2679         self.pg_start()
2680         capture = self.pg2.get_capture(len(pkts))
2681         self.verify_capture_out(capture, nat_ip1)
2682
2683         # second VRF
2684         pkts = self.create_stream_in(self.pg1, self.pg2)
2685         self.pg1.add_stream(pkts)
2686         self.pg_enable_capture(self.pg_interfaces)
2687         self.pg_start()
2688         capture = self.pg2.get_capture(len(pkts))
2689         self.verify_capture_out(capture, nat_ip1)
2690
2691     def test_dynamic_ipless_interfaces(self):
2692         """ NAT44 interfaces without configured IP address """
2693
2694         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2695                                       mactobinary(self.pg7.remote_mac),
2696                                       self.pg7.remote_ip4n,
2697                                       is_static=1)
2698         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2699                                       mactobinary(self.pg8.remote_mac),
2700                                       self.pg8.remote_ip4n,
2701                                       is_static=1)
2702
2703         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2704                                    dst_address_length=32,
2705                                    next_hop_address=self.pg7.remote_ip4n,
2706                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2707         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2708                                    dst_address_length=32,
2709                                    next_hop_address=self.pg8.remote_ip4n,
2710                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2711
2712         self.nat44_add_address(self.nat_addr)
2713         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2714         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2715                                                   is_inside=0)
2716
2717         # in2out
2718         pkts = self.create_stream_in(self.pg7, self.pg8)
2719         self.pg7.add_stream(pkts)
2720         self.pg_enable_capture(self.pg_interfaces)
2721         self.pg_start()
2722         capture = self.pg8.get_capture(len(pkts))
2723         self.verify_capture_out(capture)
2724
2725         # out2in
2726         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2727         self.pg8.add_stream(pkts)
2728         self.pg_enable_capture(self.pg_interfaces)
2729         self.pg_start()
2730         capture = self.pg7.get_capture(len(pkts))
2731         self.verify_capture_in(capture, self.pg7)
2732
2733     def test_static_ipless_interfaces(self):
2734         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2735
2736         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2737                                       mactobinary(self.pg7.remote_mac),
2738                                       self.pg7.remote_ip4n,
2739                                       is_static=1)
2740         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2741                                       mactobinary(self.pg8.remote_mac),
2742                                       self.pg8.remote_ip4n,
2743                                       is_static=1)
2744
2745         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2746                                    dst_address_length=32,
2747                                    next_hop_address=self.pg7.remote_ip4n,
2748                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2749         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2750                                    dst_address_length=32,
2751                                    next_hop_address=self.pg8.remote_ip4n,
2752                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2753
2754         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2755         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2756         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2757                                                   is_inside=0)
2758
2759         # out2in
2760         pkts = self.create_stream_out(self.pg8)
2761         self.pg8.add_stream(pkts)
2762         self.pg_enable_capture(self.pg_interfaces)
2763         self.pg_start()
2764         capture = self.pg7.get_capture(len(pkts))
2765         self.verify_capture_in(capture, self.pg7)
2766
2767         # in2out
2768         pkts = self.create_stream_in(self.pg7, self.pg8)
2769         self.pg7.add_stream(pkts)
2770         self.pg_enable_capture(self.pg_interfaces)
2771         self.pg_start()
2772         capture = self.pg8.get_capture(len(pkts))
2773         self.verify_capture_out(capture, self.nat_addr, True)
2774
2775     def test_static_with_port_ipless_interfaces(self):
2776         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2777
2778         self.tcp_port_out = 30606
2779         self.udp_port_out = 30607
2780         self.icmp_id_out = 30608
2781
2782         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2783                                       mactobinary(self.pg7.remote_mac),
2784                                       self.pg7.remote_ip4n,
2785                                       is_static=1)
2786         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2787                                       mactobinary(self.pg8.remote_mac),
2788                                       self.pg8.remote_ip4n,
2789                                       is_static=1)
2790
2791         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2792                                    dst_address_length=32,
2793                                    next_hop_address=self.pg7.remote_ip4n,
2794                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2795         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2796                                    dst_address_length=32,
2797                                    next_hop_address=self.pg8.remote_ip4n,
2798                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2799
2800         self.nat44_add_address(self.nat_addr)
2801         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2802                                       self.tcp_port_in, self.tcp_port_out,
2803                                       proto=IP_PROTOS.tcp)
2804         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2805                                       self.udp_port_in, self.udp_port_out,
2806                                       proto=IP_PROTOS.udp)
2807         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2808                                       self.icmp_id_in, self.icmp_id_out,
2809                                       proto=IP_PROTOS.icmp)
2810         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2811         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2812                                                   is_inside=0)
2813
2814         # out2in
2815         pkts = self.create_stream_out(self.pg8)
2816         self.pg8.add_stream(pkts)
2817         self.pg_enable_capture(self.pg_interfaces)
2818         self.pg_start()
2819         capture = self.pg7.get_capture(len(pkts))
2820         self.verify_capture_in(capture, self.pg7)
2821
2822         # in2out
2823         pkts = self.create_stream_in(self.pg7, self.pg8)
2824         self.pg7.add_stream(pkts)
2825         self.pg_enable_capture(self.pg_interfaces)
2826         self.pg_start()
2827         capture = self.pg8.get_capture(len(pkts))
2828         self.verify_capture_out(capture)
2829
2830     def test_static_unknown_proto(self):
2831         """ 1:1 NAT translate packet with unknown protocol """
2832         nat_ip = "10.0.0.10"
2833         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2834         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2835         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2836                                                   is_inside=0)
2837
2838         # in2out
2839         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2840              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2841              GRE() /
2842              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2843              TCP(sport=1234, dport=1234))
2844         self.pg0.add_stream(p)
2845         self.pg_enable_capture(self.pg_interfaces)
2846         self.pg_start()
2847         p = self.pg1.get_capture(1)
2848         packet = p[0]
2849         try:
2850             self.assertEqual(packet[IP].src, nat_ip)
2851             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2852             self.assertTrue(packet.haslayer(GRE))
2853             self.check_ip_checksum(packet)
2854         except:
2855             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2856             raise
2857
2858         # out2in
2859         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2860              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2861              GRE() /
2862              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2863              TCP(sport=1234, dport=1234))
2864         self.pg1.add_stream(p)
2865         self.pg_enable_capture(self.pg_interfaces)
2866         self.pg_start()
2867         p = self.pg0.get_capture(1)
2868         packet = p[0]
2869         try:
2870             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2871             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2872             self.assertTrue(packet.haslayer(GRE))
2873             self.check_ip_checksum(packet)
2874         except:
2875             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2876             raise
2877
2878     def test_hairpinning_static_unknown_proto(self):
2879         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2880
2881         host = self.pg0.remote_hosts[0]
2882         server = self.pg0.remote_hosts[1]
2883
2884         host_nat_ip = "10.0.0.10"
2885         server_nat_ip = "10.0.0.11"
2886
2887         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2888         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2889         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2890         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2891                                                   is_inside=0)
2892
2893         # host to server
2894         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2895              IP(src=host.ip4, dst=server_nat_ip) /
2896              GRE() /
2897              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2898              TCP(sport=1234, dport=1234))
2899         self.pg0.add_stream(p)
2900         self.pg_enable_capture(self.pg_interfaces)
2901         self.pg_start()
2902         p = self.pg0.get_capture(1)
2903         packet = p[0]
2904         try:
2905             self.assertEqual(packet[IP].src, host_nat_ip)
2906             self.assertEqual(packet[IP].dst, server.ip4)
2907             self.assertTrue(packet.haslayer(GRE))
2908             self.check_ip_checksum(packet)
2909         except:
2910             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2911             raise
2912
2913         # server to host
2914         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2915              IP(src=server.ip4, dst=host_nat_ip) /
2916              GRE() /
2917              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2918              TCP(sport=1234, dport=1234))
2919         self.pg0.add_stream(p)
2920         self.pg_enable_capture(self.pg_interfaces)
2921         self.pg_start()
2922         p = self.pg0.get_capture(1)
2923         packet = p[0]
2924         try:
2925             self.assertEqual(packet[IP].src, server_nat_ip)
2926             self.assertEqual(packet[IP].dst, host.ip4)
2927             self.assertTrue(packet.haslayer(GRE))
2928             self.check_ip_checksum(packet)
2929         except:
2930             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2931             raise
2932
2933     def test_unknown_proto(self):
2934         """ NAT44 translate packet with unknown protocol """
2935         self.nat44_add_address(self.nat_addr)
2936         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2937         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2938                                                   is_inside=0)
2939
2940         # in2out
2941         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2942              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2943              TCP(sport=self.tcp_port_in, dport=20))
2944         self.pg0.add_stream(p)
2945         self.pg_enable_capture(self.pg_interfaces)
2946         self.pg_start()
2947         p = self.pg1.get_capture(1)
2948
2949         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2950              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2951              GRE() /
2952              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2953              TCP(sport=1234, dport=1234))
2954         self.pg0.add_stream(p)
2955         self.pg_enable_capture(self.pg_interfaces)
2956         self.pg_start()
2957         p = self.pg1.get_capture(1)
2958         packet = p[0]
2959         try:
2960             self.assertEqual(packet[IP].src, self.nat_addr)
2961             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2962             self.assertTrue(packet.haslayer(GRE))
2963             self.check_ip_checksum(packet)
2964         except:
2965             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2966             raise
2967
2968         # out2in
2969         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2970              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2971              GRE() /
2972              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2973              TCP(sport=1234, dport=1234))
2974         self.pg1.add_stream(p)
2975         self.pg_enable_capture(self.pg_interfaces)
2976         self.pg_start()
2977         p = self.pg0.get_capture(1)
2978         packet = p[0]
2979         try:
2980             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2981             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2982             self.assertTrue(packet.haslayer(GRE))
2983             self.check_ip_checksum(packet)
2984         except:
2985             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2986             raise
2987
2988     def test_hairpinning_unknown_proto(self):
2989         """ NAT44 translate packet with unknown protocol - hairpinning """
2990         host = self.pg0.remote_hosts[0]
2991         server = self.pg0.remote_hosts[1]
2992         host_in_port = 1234
2993         host_out_port = 0
2994         server_in_port = 5678
2995         server_out_port = 8765
2996         server_nat_ip = "10.0.0.11"
2997
2998         self.nat44_add_address(self.nat_addr)
2999         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3000         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3001                                                   is_inside=0)
3002
3003         # add static mapping for server
3004         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3005
3006         # host to server
3007         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3008              IP(src=host.ip4, dst=server_nat_ip) /
3009              TCP(sport=host_in_port, dport=server_out_port))
3010         self.pg0.add_stream(p)
3011         self.pg_enable_capture(self.pg_interfaces)
3012         self.pg_start()
3013         capture = self.pg0.get_capture(1)
3014
3015         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3016              IP(src=host.ip4, dst=server_nat_ip) /
3017              GRE() /
3018              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3019              TCP(sport=1234, dport=1234))
3020         self.pg0.add_stream(p)
3021         self.pg_enable_capture(self.pg_interfaces)
3022         self.pg_start()
3023         p = self.pg0.get_capture(1)
3024         packet = p[0]
3025         try:
3026             self.assertEqual(packet[IP].src, self.nat_addr)
3027             self.assertEqual(packet[IP].dst, server.ip4)
3028             self.assertTrue(packet.haslayer(GRE))
3029             self.check_ip_checksum(packet)
3030         except:
3031             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3032             raise
3033
3034         # server to host
3035         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3036              IP(src=server.ip4, dst=self.nat_addr) /
3037              GRE() /
3038              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3039              TCP(sport=1234, dport=1234))
3040         self.pg0.add_stream(p)
3041         self.pg_enable_capture(self.pg_interfaces)
3042         self.pg_start()
3043         p = self.pg0.get_capture(1)
3044         packet = p[0]
3045         try:
3046             self.assertEqual(packet[IP].src, server_nat_ip)
3047             self.assertEqual(packet[IP].dst, host.ip4)
3048             self.assertTrue(packet.haslayer(GRE))
3049             self.check_ip_checksum(packet)
3050         except:
3051             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3052             raise
3053
3054     def test_output_feature(self):
3055         """ NAT44 interface output feature (in2out postrouting) """
3056         self.nat44_add_address(self.nat_addr)
3057         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3058         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3059         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3060                                                          is_inside=0)
3061
3062         # in2out
3063         pkts = self.create_stream_in(self.pg0, self.pg3)
3064         self.pg0.add_stream(pkts)
3065         self.pg_enable_capture(self.pg_interfaces)
3066         self.pg_start()
3067         capture = self.pg3.get_capture(len(pkts))
3068         self.verify_capture_out(capture)
3069
3070         # out2in
3071         pkts = self.create_stream_out(self.pg3)
3072         self.pg3.add_stream(pkts)
3073         self.pg_enable_capture(self.pg_interfaces)
3074         self.pg_start()
3075         capture = self.pg0.get_capture(len(pkts))
3076         self.verify_capture_in(capture, self.pg0)
3077
3078         # from non-NAT interface to NAT inside interface
3079         pkts = self.create_stream_in(self.pg2, self.pg0)
3080         self.pg2.add_stream(pkts)
3081         self.pg_enable_capture(self.pg_interfaces)
3082         self.pg_start()
3083         capture = self.pg0.get_capture(len(pkts))
3084         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3085
3086     def test_output_feature_vrf_aware(self):
3087         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3088         nat_ip_vrf10 = "10.0.0.10"
3089         nat_ip_vrf20 = "10.0.0.20"
3090
3091         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3092                                    dst_address_length=32,
3093                                    next_hop_address=self.pg3.remote_ip4n,
3094                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3095                                    table_id=10)
3096         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3097                                    dst_address_length=32,
3098                                    next_hop_address=self.pg3.remote_ip4n,
3099                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3100                                    table_id=20)
3101
3102         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3103         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3104         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3105         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3106         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3107                                                          is_inside=0)
3108
3109         # in2out VRF 10
3110         pkts = self.create_stream_in(self.pg4, self.pg3)
3111         self.pg4.add_stream(pkts)
3112         self.pg_enable_capture(self.pg_interfaces)
3113         self.pg_start()
3114         capture = self.pg3.get_capture(len(pkts))
3115         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3116
3117         # out2in VRF 10
3118         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3119         self.pg3.add_stream(pkts)
3120         self.pg_enable_capture(self.pg_interfaces)
3121         self.pg_start()
3122         capture = self.pg4.get_capture(len(pkts))
3123         self.verify_capture_in(capture, self.pg4)
3124
3125         # in2out VRF 20
3126         pkts = self.create_stream_in(self.pg6, self.pg3)
3127         self.pg6.add_stream(pkts)
3128         self.pg_enable_capture(self.pg_interfaces)
3129         self.pg_start()
3130         capture = self.pg3.get_capture(len(pkts))
3131         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3132
3133         # out2in VRF 20
3134         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3135         self.pg3.add_stream(pkts)
3136         self.pg_enable_capture(self.pg_interfaces)
3137         self.pg_start()
3138         capture = self.pg6.get_capture(len(pkts))
3139         self.verify_capture_in(capture, self.pg6)
3140
3141     def test_output_feature_hairpinning(self):
3142         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3143         host = self.pg0.remote_hosts[0]
3144         server = self.pg0.remote_hosts[1]
3145         host_in_port = 1234
3146         host_out_port = 0
3147         server_in_port = 5678
3148         server_out_port = 8765
3149
3150         self.nat44_add_address(self.nat_addr)
3151         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3152         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3153                                                          is_inside=0)
3154
3155         # add static mapping for server
3156         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3157                                       server_in_port, server_out_port,
3158                                       proto=IP_PROTOS.tcp)
3159
3160         # send packet from host to server
3161         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3162              IP(src=host.ip4, dst=self.nat_addr) /
3163              TCP(sport=host_in_port, dport=server_out_port))
3164         self.pg0.add_stream(p)
3165         self.pg_enable_capture(self.pg_interfaces)
3166         self.pg_start()
3167         capture = self.pg0.get_capture(1)
3168         p = capture[0]
3169         try:
3170             ip = p[IP]
3171             tcp = p[TCP]
3172             self.assertEqual(ip.src, self.nat_addr)
3173             self.assertEqual(ip.dst, server.ip4)
3174             self.assertNotEqual(tcp.sport, host_in_port)
3175             self.assertEqual(tcp.dport, server_in_port)
3176             self.check_tcp_checksum(p)
3177             host_out_port = tcp.sport
3178         except:
3179             self.logger.error(ppp("Unexpected or invalid packet:", p))
3180             raise
3181
3182         # send reply from server to host
3183         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3184              IP(src=server.ip4, dst=self.nat_addr) /
3185              TCP(sport=server_in_port, dport=host_out_port))
3186         self.pg0.add_stream(p)
3187         self.pg_enable_capture(self.pg_interfaces)
3188         self.pg_start()
3189         capture = self.pg0.get_capture(1)
3190         p = capture[0]
3191         try:
3192             ip = p[IP]
3193             tcp = p[TCP]
3194             self.assertEqual(ip.src, self.nat_addr)
3195             self.assertEqual(ip.dst, host.ip4)
3196             self.assertEqual(tcp.sport, server_out_port)
3197             self.assertEqual(tcp.dport, host_in_port)
3198             self.check_tcp_checksum(p)
3199         except:
3200             self.logger.error(ppp("Unexpected or invalid packet:", p))
3201             raise
3202
3203     def test_one_armed_nat44(self):
3204         """ One armed NAT44 """
3205         remote_host = self.pg9.remote_hosts[0]
3206         local_host = self.pg9.remote_hosts[1]
3207         external_port = 0
3208
3209         self.nat44_add_address(self.nat_addr)
3210         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3211         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3212                                                   is_inside=0)
3213
3214         # in2out
3215         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3216              IP(src=local_host.ip4, dst=remote_host.ip4) /
3217              TCP(sport=12345, dport=80))
3218         self.pg9.add_stream(p)
3219         self.pg_enable_capture(self.pg_interfaces)
3220         self.pg_start()
3221         capture = self.pg9.get_capture(1)
3222         p = capture[0]
3223         try:
3224             ip = p[IP]
3225             tcp = p[TCP]
3226             self.assertEqual(ip.src, self.nat_addr)
3227             self.assertEqual(ip.dst, remote_host.ip4)
3228             self.assertNotEqual(tcp.sport, 12345)
3229             external_port = tcp.sport
3230             self.assertEqual(tcp.dport, 80)
3231             self.check_tcp_checksum(p)
3232             self.check_ip_checksum(p)
3233         except:
3234             self.logger.error(ppp("Unexpected or invalid packet:", p))
3235             raise
3236
3237         # out2in
3238         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3239              IP(src=remote_host.ip4, dst=self.nat_addr) /
3240              TCP(sport=80, dport=external_port))
3241         self.pg9.add_stream(p)
3242         self.pg_enable_capture(self.pg_interfaces)
3243         self.pg_start()
3244         capture = self.pg9.get_capture(1)
3245         p = capture[0]
3246         try:
3247             ip = p[IP]
3248             tcp = p[TCP]
3249             self.assertEqual(ip.src, remote_host.ip4)
3250             self.assertEqual(ip.dst, local_host.ip4)
3251             self.assertEqual(tcp.sport, 80)
3252             self.assertEqual(tcp.dport, 12345)
3253             self.check_tcp_checksum(p)
3254             self.check_ip_checksum(p)
3255         except:
3256             self.logger.error(ppp("Unexpected or invalid packet:", p))
3257             raise
3258
3259     def test_one_armed_nat44_static(self):
3260         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3261         remote_host = self.pg9.remote_hosts[0]
3262         local_host = self.pg9.remote_hosts[1]
3263         external_port = 80
3264         local_port = 8080
3265         eh_port_in = 0
3266
3267         self.vapi.nat44_forwarding_enable_disable(1)
3268         self.nat44_add_address(self.nat_addr, twice_nat=1)
3269         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3270                                       local_port, external_port,
3271                                       proto=IP_PROTOS.tcp, out2in_only=1,
3272                                       twice_nat=1)
3273         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3274         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3275                                                   is_inside=0)
3276
3277         # from client to service
3278         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3279              IP(src=remote_host.ip4, dst=self.nat_addr) /
3280              TCP(sport=12345, dport=external_port))
3281         self.pg9.add_stream(p)
3282         self.pg_enable_capture(self.pg_interfaces)
3283         self.pg_start()
3284         capture = self.pg9.get_capture(1)
3285         p = capture[0]
3286         server = None
3287         try:
3288             ip = p[IP]
3289             tcp = p[TCP]
3290             self.assertEqual(ip.dst, local_host.ip4)
3291             self.assertEqual(ip.src, self.nat_addr)
3292             self.assertEqual(tcp.dport, local_port)
3293             self.assertNotEqual(tcp.sport, 12345)
3294             eh_port_in = tcp.sport
3295             self.check_tcp_checksum(p)
3296             self.check_ip_checksum(p)
3297         except:
3298             self.logger.error(ppp("Unexpected or invalid packet:", p))
3299             raise
3300
3301         # from service back to client
3302         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3303              IP(src=local_host.ip4, dst=self.nat_addr) /
3304              TCP(sport=local_port, dport=eh_port_in))
3305         self.pg9.add_stream(p)
3306         self.pg_enable_capture(self.pg_interfaces)
3307         self.pg_start()
3308         capture = self.pg9.get_capture(1)
3309         p = capture[0]
3310         try:
3311             ip = p[IP]
3312             tcp = p[TCP]
3313             self.assertEqual(ip.src, self.nat_addr)
3314             self.assertEqual(ip.dst, remote_host.ip4)
3315             self.assertEqual(tcp.sport, external_port)
3316             self.assertEqual(tcp.dport, 12345)
3317             self.check_tcp_checksum(p)
3318             self.check_ip_checksum(p)
3319         except:
3320             self.logger.error(ppp("Unexpected or invalid packet:", p))
3321             raise
3322
3323     def test_del_session(self):
3324         """ Delete NAT44 session """
3325         self.nat44_add_address(self.nat_addr)
3326         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3327         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3328                                                   is_inside=0)
3329
3330         pkts = self.create_stream_in(self.pg0, self.pg1)
3331         self.pg0.add_stream(pkts)
3332         self.pg_enable_capture(self.pg_interfaces)
3333         self.pg_start()
3334         capture = self.pg1.get_capture(len(pkts))
3335
3336         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3337         nsessions = len(sessions)
3338
3339         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3340                                     sessions[0].inside_port,
3341                                     sessions[0].protocol)
3342         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3343                                     sessions[1].outside_port,
3344                                     sessions[1].protocol,
3345                                     is_in=0)
3346
3347         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3348         self.assertEqual(nsessions - len(sessions), 2)
3349
3350     def test_set_get_reass(self):
3351         """ NAT44 set/get virtual fragmentation reassembly """
3352         reas_cfg1 = self.vapi.nat_get_reass()
3353
3354         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3355                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3356                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3357
3358         reas_cfg2 = self.vapi.nat_get_reass()
3359
3360         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3361         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3362         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3363
3364         self.vapi.nat_set_reass(drop_frag=1)
3365         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3366
3367     def test_frag_in_order(self):
3368         """ NAT44 translate fragments arriving in order """
3369         self.nat44_add_address(self.nat_addr)
3370         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3371         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3372                                                   is_inside=0)
3373
3374         data = "A" * 4 + "B" * 16 + "C" * 3
3375         self.tcp_port_in = random.randint(1025, 65535)
3376
3377         reass = self.vapi.nat_reass_dump()
3378         reass_n_start = len(reass)
3379
3380         # in2out
3381         pkts = self.create_stream_frag(self.pg0,
3382                                        self.pg1.remote_ip4,
3383                                        self.tcp_port_in,
3384                                        20,
3385                                        data)
3386         self.pg0.add_stream(pkts)
3387         self.pg_enable_capture(self.pg_interfaces)
3388         self.pg_start()
3389         frags = self.pg1.get_capture(len(pkts))
3390         p = self.reass_frags_and_verify(frags,
3391                                         self.nat_addr,
3392                                         self.pg1.remote_ip4)
3393         self.assertEqual(p[TCP].dport, 20)
3394         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3395         self.tcp_port_out = p[TCP].sport
3396         self.assertEqual(data, p[Raw].load)
3397
3398         # out2in
3399         pkts = self.create_stream_frag(self.pg1,
3400                                        self.nat_addr,
3401                                        20,
3402                                        self.tcp_port_out,
3403                                        data)
3404         self.pg1.add_stream(pkts)
3405         self.pg_enable_capture(self.pg_interfaces)
3406         self.pg_start()
3407         frags = self.pg0.get_capture(len(pkts))
3408         p = self.reass_frags_and_verify(frags,
3409                                         self.pg1.remote_ip4,
3410                                         self.pg0.remote_ip4)
3411         self.assertEqual(p[TCP].sport, 20)
3412         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3413         self.assertEqual(data, p[Raw].load)
3414
3415         reass = self.vapi.nat_reass_dump()
3416         reass_n_end = len(reass)
3417
3418         self.assertEqual(reass_n_end - reass_n_start, 2)
3419
3420     def test_reass_hairpinning(self):
3421         """ NAT44 fragments hairpinning """
3422         host = self.pg0.remote_hosts[0]
3423         server = self.pg0.remote_hosts[1]
3424         host_in_port = random.randint(1025, 65535)
3425         host_out_port = 0
3426         server_in_port = random.randint(1025, 65535)
3427         server_out_port = random.randint(1025, 65535)
3428         data = "A" * 4 + "B" * 16 + "C" * 3
3429
3430         self.nat44_add_address(self.nat_addr)
3431         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3432         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3433                                                   is_inside=0)
3434         # add static mapping for server
3435         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3436                                       server_in_port, server_out_port,
3437                                       proto=IP_PROTOS.tcp)
3438
3439         # send packet from host to server
3440         pkts = self.create_stream_frag(self.pg0,
3441                                        self.nat_addr,
3442                                        host_in_port,
3443                                        server_out_port,
3444                                        data)
3445         self.pg0.add_stream(pkts)
3446         self.pg_enable_capture(self.pg_interfaces)
3447         self.pg_start()
3448         frags = self.pg0.get_capture(len(pkts))
3449         p = self.reass_frags_and_verify(frags,
3450                                         self.nat_addr,
3451                                         server.ip4)
3452         self.assertNotEqual(p[TCP].sport, host_in_port)
3453         self.assertEqual(p[TCP].dport, server_in_port)
3454         self.assertEqual(data, p[Raw].load)
3455
3456     def test_frag_out_of_order(self):
3457         """ NAT44 translate fragments arriving out of order """
3458         self.nat44_add_address(self.nat_addr)
3459         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3460         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3461                                                   is_inside=0)
3462
3463         data = "A" * 4 + "B" * 16 + "C" * 3
3464         random.randint(1025, 65535)
3465
3466         # in2out
3467         pkts = self.create_stream_frag(self.pg0,
3468                                        self.pg1.remote_ip4,
3469                                        self.tcp_port_in,
3470                                        20,
3471                                        data)
3472         pkts.reverse()
3473         self.pg0.add_stream(pkts)
3474         self.pg_enable_capture(self.pg_interfaces)
3475         self.pg_start()
3476         frags = self.pg1.get_capture(len(pkts))
3477         p = self.reass_frags_and_verify(frags,
3478                                         self.nat_addr,
3479                                         self.pg1.remote_ip4)
3480         self.assertEqual(p[TCP].dport, 20)
3481         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3482         self.tcp_port_out = p[TCP].sport
3483         self.assertEqual(data, p[Raw].load)
3484
3485         # out2in
3486         pkts = self.create_stream_frag(self.pg1,
3487                                        self.nat_addr,
3488                                        20,
3489                                        self.tcp_port_out,
3490                                        data)
3491         pkts.reverse()
3492         self.pg1.add_stream(pkts)
3493         self.pg_enable_capture(self.pg_interfaces)
3494         self.pg_start()
3495         frags = self.pg0.get_capture(len(pkts))
3496         p = self.reass_frags_and_verify(frags,
3497                                         self.pg1.remote_ip4,
3498                                         self.pg0.remote_ip4)
3499         self.assertEqual(p[TCP].sport, 20)
3500         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3501         self.assertEqual(data, p[Raw].load)
3502
3503     def test_port_restricted(self):
3504         """ Port restricted NAT44 (MAP-E CE) """
3505         self.nat44_add_address(self.nat_addr)
3506         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3507         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3508                                                   is_inside=0)
3509         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3510                       "psid-offset 6 psid-len 6")
3511
3512         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3513              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3514              TCP(sport=4567, dport=22))
3515         self.pg0.add_stream(p)
3516         self.pg_enable_capture(self.pg_interfaces)
3517         self.pg_start()
3518         capture = self.pg1.get_capture(1)
3519         p = capture[0]
3520         try:
3521             ip = p[IP]
3522             tcp = p[TCP]
3523             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3524             self.assertEqual(ip.src, self.nat_addr)
3525             self.assertEqual(tcp.dport, 22)
3526             self.assertNotEqual(tcp.sport, 4567)
3527             self.assertEqual((tcp.sport >> 6) & 63, 10)
3528             self.check_tcp_checksum(p)
3529             self.check_ip_checksum(p)
3530         except:
3531             self.logger.error(ppp("Unexpected or invalid packet:", p))
3532             raise
3533
3534     def test_twice_nat(self):
3535         """ Twice NAT44 """
3536         twice_nat_addr = '10.0.1.3'
3537         port_in = 8080
3538         port_out = 80
3539         eh_port_out = 4567
3540         eh_port_in = 0
3541         self.nat44_add_address(self.nat_addr)
3542         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3543         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3544                                       port_in, port_out, proto=IP_PROTOS.tcp,
3545                                       twice_nat=1)
3546         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3547         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3548                                                   is_inside=0)
3549
3550         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3551              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3552              TCP(sport=eh_port_out, dport=port_out))
3553         self.pg1.add_stream(p)
3554         self.pg_enable_capture(self.pg_interfaces)
3555         self.pg_start()
3556         capture = self.pg0.get_capture(1)
3557         p = capture[0]
3558         try:
3559             ip = p[IP]
3560             tcp = p[TCP]
3561             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3562             self.assertEqual(ip.src, twice_nat_addr)
3563             self.assertEqual(tcp.dport, port_in)
3564             self.assertNotEqual(tcp.sport, eh_port_out)
3565             eh_port_in = tcp.sport
3566             self.check_tcp_checksum(p)
3567             self.check_ip_checksum(p)
3568         except:
3569             self.logger.error(ppp("Unexpected or invalid packet:", p))
3570             raise
3571
3572         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3573              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3574              TCP(sport=port_in, dport=eh_port_in))
3575         self.pg0.add_stream(p)
3576         self.pg_enable_capture(self.pg_interfaces)
3577         self.pg_start()
3578         capture = self.pg1.get_capture(1)
3579         p = capture[0]
3580         try:
3581             ip = p[IP]
3582             tcp = p[TCP]
3583             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3584             self.assertEqual(ip.src, self.nat_addr)
3585             self.assertEqual(tcp.dport, eh_port_out)
3586             self.assertEqual(tcp.sport, port_out)
3587             self.check_tcp_checksum(p)
3588             self.check_ip_checksum(p)
3589         except:
3590             self.logger.error(ppp("Unexpected or invalid packet:", p))
3591             raise
3592
3593     def test_twice_nat_lb(self):
3594         """ Twice NAT44 local service load balancing """
3595         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3596         twice_nat_addr = '10.0.1.3'
3597         local_port = 8080
3598         external_port = 80
3599         eh_port_out = 4567
3600         eh_port_in = 0
3601         server1 = self.pg0.remote_hosts[0]
3602         server2 = self.pg0.remote_hosts[1]
3603
3604         locals = [{'addr': server1.ip4n,
3605                    'port': local_port,
3606                    'probability': 50},
3607                   {'addr': server2.ip4n,
3608                    'port': local_port,
3609                    'probability': 50}]
3610
3611         self.nat44_add_address(self.nat_addr)
3612         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3613
3614         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3615                                                   external_port,
3616                                                   IP_PROTOS.tcp,
3617                                                   twice_nat=1,
3618                                                   local_num=len(locals),
3619                                                   locals=locals)
3620         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3621         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3622                                                   is_inside=0)
3623
3624         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3625              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3626              TCP(sport=eh_port_out, dport=external_port))
3627         self.pg1.add_stream(p)
3628         self.pg_enable_capture(self.pg_interfaces)
3629         self.pg_start()
3630         capture = self.pg0.get_capture(1)
3631         p = capture[0]
3632         server = None
3633         try:
3634             ip = p[IP]
3635             tcp = p[TCP]
3636             self.assertEqual(ip.src, twice_nat_addr)
3637             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3638             if ip.dst == server1.ip4:
3639                 server = server1
3640             else:
3641                 server = server2
3642             self.assertNotEqual(tcp.sport, eh_port_out)
3643             eh_port_in = tcp.sport
3644             self.assertEqual(tcp.dport, local_port)
3645             self.check_tcp_checksum(p)
3646             self.check_ip_checksum(p)
3647         except:
3648             self.logger.error(ppp("Unexpected or invalid packet:", p))
3649             raise
3650
3651         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3652              IP(src=server.ip4, dst=twice_nat_addr) /
3653              TCP(sport=local_port, dport=eh_port_in))
3654         self.pg0.add_stream(p)
3655         self.pg_enable_capture(self.pg_interfaces)
3656         self.pg_start()
3657         capture = self.pg1.get_capture(1)
3658         p = capture[0]
3659         try:
3660             ip = p[IP]
3661             tcp = p[TCP]
3662             self.assertEqual(ip.src, self.nat_addr)
3663             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3664             self.assertEqual(tcp.sport, external_port)
3665             self.assertEqual(tcp.dport, eh_port_out)
3666             self.check_tcp_checksum(p)
3667             self.check_ip_checksum(p)
3668         except:
3669             self.logger.error(ppp("Unexpected or invalid packet:", p))
3670             raise
3671
3672     def test_twice_nat_interface_addr(self):
3673         """ Acquire twice NAT44 addresses from interface """
3674         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3675
3676         # no address in NAT pool
3677         adresses = self.vapi.nat44_address_dump()
3678         self.assertEqual(0, len(adresses))
3679
3680         # configure interface address and check NAT address pool
3681         self.pg7.config_ip4()
3682         adresses = self.vapi.nat44_address_dump()
3683         self.assertEqual(1, len(adresses))
3684         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3685         self.assertEqual(adresses[0].twice_nat, 1)
3686
3687         # remove interface address and check NAT address pool
3688         self.pg7.unconfig_ip4()
3689         adresses = self.vapi.nat44_address_dump()
3690         self.assertEqual(0, len(adresses))
3691
3692     def test_ipfix_max_frags(self):
3693         """ IPFIX logging maximum fragments pending reassembly exceeded """
3694         self.nat44_add_address(self.nat_addr)
3695         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3696         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3697                                                   is_inside=0)
3698         self.vapi.nat_set_reass(max_frag=0)
3699         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3700                                      src_address=self.pg3.local_ip4n,
3701                                      path_mtu=512,
3702                                      template_interval=10)
3703         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3704                             src_port=self.ipfix_src_port)
3705
3706         data = "A" * 4 + "B" * 16 + "C" * 3
3707         self.tcp_port_in = random.randint(1025, 65535)
3708         pkts = self.create_stream_frag(self.pg0,
3709                                        self.pg1.remote_ip4,
3710                                        self.tcp_port_in,
3711                                        20,
3712                                        data)
3713         self.pg0.add_stream(pkts[-1])
3714         self.pg_enable_capture(self.pg_interfaces)
3715         self.pg_start()
3716         frags = self.pg1.get_capture(0)
3717         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3718         capture = self.pg3.get_capture(9)
3719         ipfix = IPFIXDecoder()
3720         # first load template
3721         for p in capture:
3722             self.assertTrue(p.haslayer(IPFIX))
3723             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3724             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3725             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3726             self.assertEqual(p[UDP].dport, 4739)
3727             self.assertEqual(p[IPFIX].observationDomainID,
3728                              self.ipfix_domain_id)
3729             if p.haslayer(Template):
3730                 ipfix.add_template(p.getlayer(Template))
3731         # verify events in data set
3732         for p in capture:
3733             if p.haslayer(Data):
3734                 data = ipfix.decode_data_set(p.getlayer(Set))
3735                 self.verify_ipfix_max_fragments_ip4(data, 0,
3736                                                     self.pg0.remote_ip4n)
3737
3738     def tearDown(self):
3739         super(TestNAT44, self).tearDown()
3740         if not self.vpp_dead:
3741             self.logger.info(self.vapi.cli("show nat44 addresses"))
3742             self.logger.info(self.vapi.cli("show nat44 interfaces"))
3743             self.logger.info(self.vapi.cli("show nat44 static mappings"))
3744             self.logger.info(self.vapi.cli("show nat44 interface address"))
3745             self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3746             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3747             self.vapi.cli("nat addr-port-assignment-alg default")
3748             self.clear_nat44()
3749
3750
3751 class TestNAT44Out2InDPO(MethodHolder):
3752     """ NAT44 Test Cases using out2in DPO """
3753
3754     @classmethod
3755     def setUpConstants(cls):
3756         super(TestNAT44Out2InDPO, cls).setUpConstants()
3757         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3758
3759     @classmethod
3760     def setUpClass(cls):
3761         super(TestNAT44Out2InDPO, cls).setUpClass()
3762
3763         try:
3764             cls.tcp_port_in = 6303
3765             cls.tcp_port_out = 6303
3766             cls.udp_port_in = 6304
3767             cls.udp_port_out = 6304
3768             cls.icmp_id_in = 6305
3769             cls.icmp_id_out = 6305
3770             cls.nat_addr = '10.0.0.3'
3771             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3772             cls.dst_ip4 = '192.168.70.1'
3773
3774             cls.create_pg_interfaces(range(2))
3775
3776             cls.pg0.admin_up()
3777             cls.pg0.config_ip4()
3778             cls.pg0.resolve_arp()
3779
3780             cls.pg1.admin_up()
3781             cls.pg1.config_ip6()
3782             cls.pg1.resolve_ndp()
3783
3784             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3785                                       dst_address_length=0,
3786                                       next_hop_address=cls.pg1.remote_ip6n,
3787                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3788
3789         except Exception:
3790             super(TestNAT44Out2InDPO, cls).tearDownClass()
3791             raise
3792
3793     def configure_xlat(self):
3794         self.dst_ip6_pfx = '1:2:3::'
3795         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3796                                               self.dst_ip6_pfx)
3797         self.dst_ip6_pfx_len = 96
3798         self.src_ip6_pfx = '4:5:6::'
3799         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3800                                               self.src_ip6_pfx)
3801         self.src_ip6_pfx_len = 96
3802         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3803                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3804                                  '\x00\x00\x00\x00', 0, is_translation=1,
3805                                  is_rfc6052=1)
3806
3807     def test_464xlat_ce(self):
3808         """ Test 464XLAT CE with NAT44 """
3809
3810         self.configure_xlat()
3811
3812         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3813         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3814
3815         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3816                                        self.dst_ip6_pfx_len)
3817         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3818                                        self.src_ip6_pfx_len)
3819
3820         try:
3821             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3822             self.pg0.add_stream(pkts)
3823             self.pg_enable_capture(self.pg_interfaces)
3824             self.pg_start()
3825             capture = self.pg1.get_capture(len(pkts))
3826             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3827                                         dst_ip=out_src_ip6)
3828
3829             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3830                                               out_dst_ip6)
3831             self.pg1.add_stream(pkts)
3832             self.pg_enable_capture(self.pg_interfaces)
3833             self.pg_start()
3834             capture = self.pg0.get_capture(len(pkts))
3835             self.verify_capture_in(capture, self.pg0)
3836         finally:
3837             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3838                                                       is_add=0)
3839             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3840                                                   self.nat_addr_n, is_add=0)
3841
3842     def test_464xlat_ce_no_nat(self):
3843         """ Test 464XLAT CE without NAT44 """
3844
3845         self.configure_xlat()
3846
3847         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3848                                        self.dst_ip6_pfx_len)
3849         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3850                                        self.src_ip6_pfx_len)
3851
3852         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3853         self.pg0.add_stream(pkts)
3854         self.pg_enable_capture(self.pg_interfaces)
3855         self.pg_start()
3856         capture = self.pg1.get_capture(len(pkts))
3857         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3858                                     nat_ip=out_dst_ip6, same_port=True)
3859
3860         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3861         self.pg1.add_stream(pkts)
3862         self.pg_enable_capture(self.pg_interfaces)
3863         self.pg_start()
3864         capture = self.pg0.get_capture(len(pkts))
3865         self.verify_capture_in(capture, self.pg0)
3866
3867
3868 class TestDeterministicNAT(MethodHolder):
3869     """ Deterministic NAT Test Cases """
3870
3871     @classmethod
3872     def setUpConstants(cls):
3873         super(TestDeterministicNAT, cls).setUpConstants()
3874         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3875
3876     @classmethod
3877     def setUpClass(cls):
3878         super(TestDeterministicNAT, cls).setUpClass()
3879
3880         try:
3881             cls.tcp_port_in = 6303
3882             cls.tcp_external_port = 6303
3883             cls.udp_port_in = 6304
3884             cls.udp_external_port = 6304
3885             cls.icmp_id_in = 6305
3886             cls.nat_addr = '10.0.0.3'
3887
3888             cls.create_pg_interfaces(range(3))
3889             cls.interfaces = list(cls.pg_interfaces)
3890
3891             for i in cls.interfaces:
3892                 i.admin_up()
3893                 i.config_ip4()
3894                 i.resolve_arp()
3895
3896             cls.pg0.generate_remote_hosts(2)
3897             cls.pg0.configure_ipv4_neighbors()
3898
3899         except Exception:
3900             super(TestDeterministicNAT, cls).tearDownClass()
3901             raise
3902
3903     def create_stream_in(self, in_if, out_if, ttl=64):
3904         """
3905         Create packet stream for inside network
3906
3907         :param in_if: Inside interface
3908         :param out_if: Outside interface
3909         :param ttl: TTL of generated packets
3910         """
3911         pkts = []
3912         # TCP
3913         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3914              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3915              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3916         pkts.append(p)
3917
3918         # UDP
3919         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3920              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3921              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3922         pkts.append(p)
3923
3924         # ICMP
3925         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3926              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3927              ICMP(id=self.icmp_id_in, type='echo-request'))
3928         pkts.append(p)
3929
3930         return pkts
3931
3932     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3933         """
3934         Create packet stream for outside network
3935
3936         :param out_if: Outside interface
3937         :param dst_ip: Destination IP address (Default use global NAT address)
3938         :param ttl: TTL of generated packets
3939         """
3940         if dst_ip is None:
3941             dst_ip = self.nat_addr
3942         pkts = []
3943         # TCP
3944         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3945              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3946              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3947         pkts.append(p)
3948
3949         # UDP
3950         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3951              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3952              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3953         pkts.append(p)
3954
3955         # ICMP
3956         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3957              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3958              ICMP(id=self.icmp_external_id, type='echo-reply'))
3959         pkts.append(p)
3960
3961         return pkts
3962
3963     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3964         """
3965         Verify captured packets on outside network
3966
3967         :param capture: Captured packets
3968         :param nat_ip: Translated IP address (Default use global NAT address)
3969         :param same_port: Sorce port number is not translated (Default False)
3970         :param packet_num: Expected number of packets (Default 3)
3971         """
3972         if nat_ip is None:
3973             nat_ip = self.nat_addr
3974         self.assertEqual(packet_num, len(capture))
3975         for packet in capture:
3976             try:
3977                 self.assertEqual(packet[IP].src, nat_ip)
3978                 if packet.haslayer(TCP):
3979                     self.tcp_port_out = packet[TCP].sport
3980                 elif packet.haslayer(UDP):
3981                     self.udp_port_out = packet[UDP].sport
3982                 else:
3983                     self.icmp_external_id = packet[ICMP].id
3984             except:
3985                 self.logger.error(ppp("Unexpected or invalid packet "
3986                                       "(outside network):", packet))
3987                 raise
3988
3989     def initiate_tcp_session(self, in_if, out_if):
3990         """
3991         Initiates TCP session
3992
3993         :param in_if: Inside interface
3994         :param out_if: Outside interface
3995         """
3996         try:
3997             # SYN packet in->out
3998             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3999                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4000                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4001                      flags="S"))
4002             in_if.add_stream(p)
4003             self.pg_enable_capture(self.pg_interfaces)
4004             self.pg_start()
4005             capture = out_if.get_capture(1)
4006             p = capture[0]
4007             self.tcp_port_out = p[TCP].sport
4008
4009             # SYN + ACK packet out->in
4010             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4011                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4012                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4013                      flags="SA"))
4014             out_if.add_stream(p)
4015             self.pg_enable_capture(self.pg_interfaces)
4016             self.pg_start()
4017             in_if.get_capture(1)
4018
4019             # ACK packet in->out
4020             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4021                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4022                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4023                      flags="A"))
4024             in_if.add_stream(p)
4025             self.pg_enable_capture(self.pg_interfaces)
4026             self.pg_start()
4027             out_if.get_capture(1)
4028
4029         except:
4030             self.logger.error("TCP 3 way handshake failed")
4031             raise
4032
4033     def verify_ipfix_max_entries_per_user(self, data):
4034         """
4035         Verify IPFIX maximum entries per user exceeded event
4036
4037         :param data: Decoded IPFIX data records
4038         """
4039         self.assertEqual(1, len(data))
4040         record = data[0]
4041         # natEvent
4042         self.assertEqual(ord(record[230]), 13)
4043         # natQuotaExceededEvent
4044         self.assertEqual('\x03\x00\x00\x00', record[466])
4045         # maxEntriesPerUser
4046         self.assertEqual('\xe8\x03\x00\x00', record[473])
4047         # sourceIPv4Address
4048         self.assertEqual(self.pg0.remote_ip4n, record[8])
4049
4050     def test_deterministic_mode(self):
4051         """ NAT plugin run deterministic mode """
4052         in_addr = '172.16.255.0'
4053         out_addr = '172.17.255.50'
4054         in_addr_t = '172.16.255.20'
4055         in_addr_n = socket.inet_aton(in_addr)
4056         out_addr_n = socket.inet_aton(out_addr)
4057         in_addr_t_n = socket.inet_aton(in_addr_t)
4058         in_plen = 24
4059         out_plen = 32
4060
4061         nat_config = self.vapi.nat_show_config()
4062         self.assertEqual(1, nat_config.deterministic)
4063
4064         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4065
4066         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4067         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4068         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4069         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4070
4071         deterministic_mappings = self.vapi.nat_det_map_dump()
4072         self.assertEqual(len(deterministic_mappings), 1)
4073         dsm = deterministic_mappings[0]
4074         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4075         self.assertEqual(in_plen, dsm.in_plen)
4076         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4077         self.assertEqual(out_plen, dsm.out_plen)
4078
4079         self.clear_nat_det()
4080         deterministic_mappings = self.vapi.nat_det_map_dump()
4081         self.assertEqual(len(deterministic_mappings), 0)
4082
4083     def test_set_timeouts(self):
4084         """ Set deterministic NAT timeouts """
4085         timeouts_before = self.vapi.nat_det_get_timeouts()
4086
4087         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4088                                        timeouts_before.tcp_established + 10,
4089                                        timeouts_before.tcp_transitory + 10,
4090                                        timeouts_before.icmp + 10)
4091
4092         timeouts_after = self.vapi.nat_det_get_timeouts()
4093
4094         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4095         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4096         self.assertNotEqual(timeouts_before.tcp_established,
4097                             timeouts_after.tcp_established)
4098         self.assertNotEqual(timeouts_before.tcp_transitory,
4099                             timeouts_after.tcp_transitory)
4100
4101     def test_det_in(self):
4102         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4103
4104         nat_ip = "10.0.0.10"
4105
4106         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4107                                       32,
4108                                       socket.inet_aton(nat_ip),
4109                                       32)
4110         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4111         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4112                                                   is_inside=0)
4113
4114         # in2out
4115         pkts = self.create_stream_in(self.pg0, self.pg1)
4116         self.pg0.add_stream(pkts)
4117         self.pg_enable_capture(self.pg_interfaces)
4118         self.pg_start()
4119         capture = self.pg1.get_capture(len(pkts))
4120         self.verify_capture_out(capture, nat_ip)
4121
4122         # out2in
4123         pkts = self.create_stream_out(self.pg1, nat_ip)
4124         self.pg1.add_stream(pkts)
4125         self.pg_enable_capture(self.pg_interfaces)
4126         self.pg_start()
4127         capture = self.pg0.get_capture(len(pkts))
4128         self.verify_capture_in(capture, self.pg0)
4129
4130         # session dump test
4131         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4132         self.assertEqual(len(sessions), 3)
4133
4134         # TCP session
4135         s = sessions[0]
4136         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4137         self.assertEqual(s.in_port, self.tcp_port_in)
4138         self.assertEqual(s.out_port, self.tcp_port_out)
4139         self.assertEqual(s.ext_port, self.tcp_external_port)
4140
4141         # UDP session
4142         s = sessions[1]
4143         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4144         self.assertEqual(s.in_port, self.udp_port_in)
4145         self.assertEqual(s.out_port, self.udp_port_out)
4146         self.assertEqual(s.ext_port, self.udp_external_port)
4147
4148         # ICMP session
4149         s = sessions[2]
4150         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4151         self.assertEqual(s.in_port, self.icmp_id_in)
4152         self.assertEqual(s.out_port, self.icmp_external_id)
4153
4154     def test_multiple_users(self):
4155         """ Deterministic NAT multiple users """
4156
4157         nat_ip = "10.0.0.10"
4158         port_in = 80
4159         external_port = 6303
4160
4161         host0 = self.pg0.remote_hosts[0]
4162         host1 = self.pg0.remote_hosts[1]
4163
4164         self.vapi.nat_det_add_del_map(host0.ip4n,
4165                                       24,
4166                                       socket.inet_aton(nat_ip),
4167                                       32)
4168         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4169         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4170                                                   is_inside=0)
4171
4172         # host0 to out
4173         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4174              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4175              TCP(sport=port_in, dport=external_port))
4176         self.pg0.add_stream(p)
4177         self.pg_enable_capture(self.pg_interfaces)
4178         self.pg_start()
4179         capture = self.pg1.get_capture(1)
4180         p = capture[0]
4181         try:
4182             ip = p[IP]
4183             tcp = p[TCP]
4184             self.assertEqual(ip.src, nat_ip)
4185             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4186             self.assertEqual(tcp.dport, external_port)
4187             port_out0 = tcp.sport
4188         except:
4189             self.logger.error(ppp("Unexpected or invalid packet:", p))
4190             raise
4191
4192         # host1 to out
4193         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4194              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4195              TCP(sport=port_in, dport=external_port))
4196         self.pg0.add_stream(p)
4197         self.pg_enable_capture(self.pg_interfaces)
4198         self.pg_start()
4199         capture = self.pg1.get_capture(1)
4200         p = capture[0]
4201         try:
4202             ip = p[IP]
4203             tcp = p[TCP]
4204             self.assertEqual(ip.src, nat_ip)
4205             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4206             self.assertEqual(tcp.dport, external_port)
4207             port_out1 = tcp.sport
4208         except:
4209             self.logger.error(ppp("Unexpected or invalid packet:", p))
4210             raise
4211
4212         dms = self.vapi.nat_det_map_dump()
4213         self.assertEqual(1, len(dms))
4214         self.assertEqual(2, dms[0].ses_num)
4215
4216         # out to host0
4217         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4218              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4219              TCP(sport=external_port, dport=port_out0))
4220         self.pg1.add_stream(p)
4221         self.pg_enable_capture(self.pg_interfaces)
4222         self.pg_start()
4223         capture = self.pg0.get_capture(1)
4224         p = capture[0]
4225         try:
4226             ip = p[IP]
4227             tcp = p[TCP]
4228             self.assertEqual(ip.src, self.pg1.remote_ip4)
4229             self.assertEqual(ip.dst, host0.ip4)
4230             self.assertEqual(tcp.dport, port_in)
4231             self.assertEqual(tcp.sport, external_port)
4232         except:
4233             self.logger.error(ppp("Unexpected or invalid packet:", p))
4234             raise
4235
4236         # out to host1
4237         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4238              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4239              TCP(sport=external_port, dport=port_out1))
4240         self.pg1.add_stream(p)
4241         self.pg_enable_capture(self.pg_interfaces)
4242         self.pg_start()
4243         capture = self.pg0.get_capture(1)
4244         p = capture[0]
4245         try:
4246             ip = p[IP]
4247             tcp = p[TCP]
4248             self.assertEqual(ip.src, self.pg1.remote_ip4)
4249             self.assertEqual(ip.dst, host1.ip4)
4250             self.assertEqual(tcp.dport, port_in)
4251             self.assertEqual(tcp.sport, external_port)
4252         except:
4253             self.logger.error(ppp("Unexpected or invalid packet", p))
4254             raise
4255
4256         # session close api test
4257         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4258                                             port_out1,
4259                                             self.pg1.remote_ip4n,
4260                                             external_port)
4261         dms = self.vapi.nat_det_map_dump()
4262         self.assertEqual(dms[0].ses_num, 1)
4263
4264         self.vapi.nat_det_close_session_in(host0.ip4n,
4265                                            port_in,
4266                                            self.pg1.remote_ip4n,
4267                                            external_port)
4268         dms = self.vapi.nat_det_map_dump()
4269         self.assertEqual(dms[0].ses_num, 0)
4270
4271     def test_tcp_session_close_detection_in(self):
4272         """ Deterministic NAT TCP session close from inside network """
4273         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4274                                       32,
4275                                       socket.inet_aton(self.nat_addr),
4276                                       32)
4277         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4278         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4279                                                   is_inside=0)
4280
4281         self.initiate_tcp_session(self.pg0, self.pg1)
4282
4283         # close the session from inside
4284         try:
4285             # FIN packet in -> out
4286             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4287                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4288                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4289                      flags="F"))
4290             self.pg0.add_stream(p)
4291             self.pg_enable_capture(self.pg_interfaces)
4292             self.pg_start()
4293             self.pg1.get_capture(1)
4294
4295             pkts = []
4296
4297             # ACK packet out -> in
4298             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4299                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4300                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4301                      flags="A"))
4302             pkts.append(p)
4303
4304             # FIN packet out -> in
4305             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4306                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4307                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4308                      flags="F"))
4309             pkts.append(p)
4310
4311             self.pg1.add_stream(pkts)
4312             self.pg_enable_capture(self.pg_interfaces)
4313             self.pg_start()
4314             self.pg0.get_capture(2)
4315
4316             # ACK packet in -> out
4317             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4318                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4319                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4320                      flags="A"))
4321             self.pg0.add_stream(p)
4322             self.pg_enable_capture(self.pg_interfaces)
4323             self.pg_start()
4324             self.pg1.get_capture(1)
4325
4326             # Check if deterministic NAT44 closed the session
4327             dms = self.vapi.nat_det_map_dump()
4328             self.assertEqual(0, dms[0].ses_num)
4329         except:
4330             self.logger.error("TCP session termination failed")
4331             raise
4332
4333     def test_tcp_session_close_detection_out(self):
4334         """ Deterministic NAT TCP session close from outside network """
4335         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4336                                       32,
4337                                       socket.inet_aton(self.nat_addr),
4338                                       32)
4339         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4340         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4341                                                   is_inside=0)
4342
4343         self.initiate_tcp_session(self.pg0, self.pg1)
4344
4345         # close the session from outside
4346         try:
4347             # FIN packet out -> in
4348             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4349                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4350                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4351                      flags="F"))
4352             self.pg1.add_stream(p)
4353             self.pg_enable_capture(self.pg_interfaces)
4354             self.pg_start()
4355             self.pg0.get_capture(1)
4356
4357             pkts = []
4358
4359             # ACK packet in -> out
4360             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4361                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4362                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4363                      flags="A"))
4364             pkts.append(p)
4365
4366             # ACK packet in -> out
4367             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4368                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4369                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4370                      flags="F"))
4371             pkts.append(p)
4372
4373             self.pg0.add_stream(pkts)
4374             self.pg_enable_capture(self.pg_interfaces)
4375             self.pg_start()
4376             self.pg1.get_capture(2)
4377
4378             # ACK packet out -> in
4379             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4380                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4381                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4382                      flags="A"))
4383             self.pg1.add_stream(p)
4384             self.pg_enable_capture(self.pg_interfaces)
4385             self.pg_start()
4386             self.pg0.get_capture(1)
4387
4388             # Check if deterministic NAT44 closed the session
4389             dms = self.vapi.nat_det_map_dump()
4390             self.assertEqual(0, dms[0].ses_num)
4391         except:
4392             self.logger.error("TCP session termination failed")
4393             raise
4394
4395     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4396     def test_session_timeout(self):
4397         """ Deterministic NAT session timeouts """
4398         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4399                                       32,
4400                                       socket.inet_aton(self.nat_addr),
4401                                       32)
4402         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4403         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4404                                                   is_inside=0)
4405
4406         self.initiate_tcp_session(self.pg0, self.pg1)
4407         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4408         pkts = self.create_stream_in(self.pg0, self.pg1)
4409         self.pg0.add_stream(pkts)
4410         self.pg_enable_capture(self.pg_interfaces)
4411         self.pg_start()
4412         capture = self.pg1.get_capture(len(pkts))
4413         sleep(15)
4414
4415         dms = self.vapi.nat_det_map_dump()
4416         self.assertEqual(0, dms[0].ses_num)
4417
4418     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4419     def test_session_limit_per_user(self):
4420         """ Deterministic NAT maximum sessions per user limit """
4421         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4422                                       32,
4423                                       socket.inet_aton(self.nat_addr),
4424                                       32)
4425         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4426         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4427                                                   is_inside=0)
4428         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4429                                      src_address=self.pg2.local_ip4n,
4430                                      path_mtu=512,
4431                                      template_interval=10)
4432         self.vapi.nat_ipfix()
4433
4434         pkts = []
4435         for port in range(1025, 2025):
4436             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4437                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4438                  UDP(sport=port, dport=port))
4439             pkts.append(p)
4440
4441         self.pg0.add_stream(pkts)
4442         self.pg_enable_capture(self.pg_interfaces)
4443         self.pg_start()
4444         capture = self.pg1.get_capture(len(pkts))
4445
4446         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4447              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4448              UDP(sport=3001, dport=3002))
4449         self.pg0.add_stream(p)
4450         self.pg_enable_capture(self.pg_interfaces)
4451         self.pg_start()
4452         capture = self.pg1.assert_nothing_captured()
4453
4454         # verify ICMP error packet
4455         capture = self.pg0.get_capture(1)
4456         p = capture[0]
4457         self.assertTrue(p.haslayer(ICMP))
4458         icmp = p[ICMP]
4459         self.assertEqual(icmp.type, 3)
4460         self.assertEqual(icmp.code, 1)
4461         self.assertTrue(icmp.haslayer(IPerror))
4462         inner_ip = icmp[IPerror]
4463         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4464         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4465
4466         dms = self.vapi.nat_det_map_dump()
4467
4468         self.assertEqual(1000, dms[0].ses_num)
4469
4470         # verify IPFIX logging
4471         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4472         sleep(1)
4473         capture = self.pg2.get_capture(2)
4474         ipfix = IPFIXDecoder()
4475         # first load template
4476         for p in capture:
4477             self.assertTrue(p.haslayer(IPFIX))
4478             if p.haslayer(Template):
4479                 ipfix.add_template(p.getlayer(Template))
4480         # verify events in data set
4481         for p in capture:
4482             if p.haslayer(Data):
4483                 data = ipfix.decode_data_set(p.getlayer(Set))
4484                 self.verify_ipfix_max_entries_per_user(data)
4485
4486     def clear_nat_det(self):
4487         """
4488         Clear deterministic NAT configuration.
4489         """
4490         self.vapi.nat_ipfix(enable=0)
4491         self.vapi.nat_det_set_timeouts()
4492         deterministic_mappings = self.vapi.nat_det_map_dump()
4493         for dsm in deterministic_mappings:
4494             self.vapi.nat_det_add_del_map(dsm.in_addr,
4495                                           dsm.in_plen,
4496                                           dsm.out_addr,
4497                                           dsm.out_plen,
4498                                           is_add=0)
4499
4500         interfaces = self.vapi.nat44_interface_dump()
4501         for intf in interfaces:
4502             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4503                                                       intf.is_inside,
4504                                                       is_add=0)
4505
4506     def tearDown(self):
4507         super(TestDeterministicNAT, self).tearDown()
4508         if not self.vpp_dead:
4509             self.logger.info(self.vapi.cli("show nat44 interfaces"))
4510             self.logger.info(
4511                 self.vapi.cli("show nat44 deterministic mappings"))
4512             self.logger.info(
4513                 self.vapi.cli("show nat44 deterministic timeouts"))
4514             self.logger.info(
4515                 self.vapi.cli("show nat44 deterministic sessions"))
4516             self.clear_nat_det()
4517
4518
4519 class TestNAT64(MethodHolder):
4520     """ NAT64 Test Cases """
4521
4522     @classmethod
4523     def setUpConstants(cls):
4524         super(TestNAT64, cls).setUpConstants()
4525         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4526                                 "nat64 st hash buckets 256", "}"])
4527
4528     @classmethod
4529     def setUpClass(cls):
4530         super(TestNAT64, cls).setUpClass()
4531
4532         try:
4533             cls.tcp_port_in = 6303
4534             cls.tcp_port_out = 6303
4535             cls.udp_port_in = 6304
4536             cls.udp_port_out = 6304
4537             cls.icmp_id_in = 6305
4538             cls.icmp_id_out = 6305
4539             cls.nat_addr = '10.0.0.3'
4540             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4541             cls.vrf1_id = 10
4542             cls.vrf1_nat_addr = '10.0.10.3'
4543             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4544                                                    cls.vrf1_nat_addr)
4545             cls.ipfix_src_port = 4739
4546             cls.ipfix_domain_id = 1
4547
4548             cls.create_pg_interfaces(range(5))
4549             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4550             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4551             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4552
4553             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4554
4555             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4556
4557             cls.pg0.generate_remote_hosts(2)
4558
4559             for i in cls.ip6_interfaces:
4560                 i.admin_up()
4561                 i.config_ip6()
4562                 i.configure_ipv6_neighbors()
4563
4564             for i in cls.ip4_interfaces:
4565                 i.admin_up()
4566                 i.config_ip4()
4567                 i.resolve_arp()
4568
4569             cls.pg3.admin_up()
4570             cls.pg3.config_ip4()
4571             cls.pg3.resolve_arp()
4572             cls.pg3.config_ip6()
4573             cls.pg3.configure_ipv6_neighbors()
4574
4575         except Exception:
4576             super(TestNAT64, cls).tearDownClass()
4577             raise
4578
4579     def test_pool(self):
4580         """ Add/delete address to NAT64 pool """
4581         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4582
4583         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4584
4585         addresses = self.vapi.nat64_pool_addr_dump()
4586         self.assertEqual(len(addresses), 1)
4587         self.assertEqual(addresses[0].address, nat_addr)
4588
4589         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4590
4591         addresses = self.vapi.nat64_pool_addr_dump()
4592         self.assertEqual(len(addresses), 0)
4593
4594     def test_interface(self):
4595         """ Enable/disable NAT64 feature on the interface """
4596         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4597         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4598
4599         interfaces = self.vapi.nat64_interface_dump()
4600         self.assertEqual(len(interfaces), 2)
4601         pg0_found = False
4602         pg1_found = False
4603         for intf in interfaces:
4604             if intf.sw_if_index == self.pg0.sw_if_index:
4605                 self.assertEqual(intf.is_inside, 1)
4606                 pg0_found = True
4607             elif intf.sw_if_index == self.pg1.sw_if_index:
4608                 self.assertEqual(intf.is_inside, 0)
4609                 pg1_found = True
4610         self.assertTrue(pg0_found)
4611         self.assertTrue(pg1_found)
4612
4613         features = self.vapi.cli("show interface features pg0")
4614         self.assertNotEqual(features.find('nat64-in2out'), -1)
4615         features = self.vapi.cli("show interface features pg1")
4616         self.assertNotEqual(features.find('nat64-out2in'), -1)
4617
4618         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4619         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4620
4621         interfaces = self.vapi.nat64_interface_dump()
4622         self.assertEqual(len(interfaces), 0)
4623
4624     def test_static_bib(self):
4625         """ Add/delete static BIB entry """
4626         in_addr = socket.inet_pton(socket.AF_INET6,
4627                                    '2001:db8:85a3::8a2e:370:7334')
4628         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4629         in_port = 1234
4630         out_port = 5678
4631         proto = IP_PROTOS.tcp
4632
4633         self.vapi.nat64_add_del_static_bib(in_addr,
4634                                            out_addr,
4635                                            in_port,
4636                                            out_port,
4637                                            proto)
4638         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4639         static_bib_num = 0
4640         for bibe in bib:
4641             if bibe.is_static:
4642                 static_bib_num += 1
4643                 self.assertEqual(bibe.i_addr, in_addr)
4644                 self.assertEqual(bibe.o_addr, out_addr)
4645                 self.assertEqual(bibe.i_port, in_port)
4646                 self.assertEqual(bibe.o_port, out_port)
4647         self.assertEqual(static_bib_num, 1)
4648
4649         self.vapi.nat64_add_del_static_bib(in_addr,
4650                                            out_addr,
4651                                            in_port,
4652                                            out_port,
4653                                            proto,
4654                                            is_add=0)
4655         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4656         static_bib_num = 0
4657         for bibe in bib:
4658             if bibe.is_static:
4659                 static_bib_num += 1
4660         self.assertEqual(static_bib_num, 0)
4661
4662     def test_set_timeouts(self):
4663         """ Set NAT64 timeouts """
4664         # verify default values
4665         timeouts = self.vapi.nat64_get_timeouts()
4666         self.assertEqual(timeouts.udp, 300)
4667         self.assertEqual(timeouts.icmp, 60)
4668         self.assertEqual(timeouts.tcp_trans, 240)
4669         self.assertEqual(timeouts.tcp_est, 7440)
4670         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4671
4672         # set and verify custom values
4673         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4674                                      tcp_est=7450, tcp_incoming_syn=10)
4675         timeouts = self.vapi.nat64_get_timeouts()
4676         self.assertEqual(timeouts.udp, 200)
4677         self.assertEqual(timeouts.icmp, 30)
4678         self.assertEqual(timeouts.tcp_trans, 250)
4679         self.assertEqual(timeouts.tcp_est, 7450)
4680         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4681
4682     def test_dynamic(self):
4683         """ NAT64 dynamic translation test """
4684         self.tcp_port_in = 6303
4685         self.udp_port_in = 6304
4686         self.icmp_id_in = 6305
4687
4688         ses_num_start = self.nat64_get_ses_num()
4689
4690         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4691                                                 self.nat_addr_n)
4692         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4693         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4694
4695         # in2out
4696         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4697         self.pg0.add_stream(pkts)
4698         self.pg_enable_capture(self.pg_interfaces)
4699         self.pg_start()
4700         capture = self.pg1.get_capture(len(pkts))
4701         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4702                                 dst_ip=self.pg1.remote_ip4)
4703
4704         # out2in
4705         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4706         self.pg1.add_stream(pkts)
4707         self.pg_enable_capture(self.pg_interfaces)
4708         self.pg_start()
4709         capture = self.pg0.get_capture(len(pkts))
4710         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4711         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4712
4713         # in2out
4714         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4715         self.pg0.add_stream(pkts)
4716         self.pg_enable_capture(self.pg_interfaces)
4717         self.pg_start()
4718         capture = self.pg1.get_capture(len(pkts))
4719         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4720                                 dst_ip=self.pg1.remote_ip4)
4721
4722         # out2in
4723         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4724         self.pg1.add_stream(pkts)
4725         self.pg_enable_capture(self.pg_interfaces)
4726         self.pg_start()
4727         capture = self.pg0.get_capture(len(pkts))
4728         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4729
4730         ses_num_end = self.nat64_get_ses_num()
4731
4732         self.assertEqual(ses_num_end - ses_num_start, 3)
4733
4734         # tenant with specific VRF
4735         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4736                                                 self.vrf1_nat_addr_n,
4737                                                 vrf_id=self.vrf1_id)
4738         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4739
4740         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4741         self.pg2.add_stream(pkts)
4742         self.pg_enable_capture(self.pg_interfaces)
4743         self.pg_start()
4744         capture = self.pg1.get_capture(len(pkts))
4745         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4746                                 dst_ip=self.pg1.remote_ip4)
4747
4748         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4749         self.pg1.add_stream(pkts)
4750         self.pg_enable_capture(self.pg_interfaces)
4751         self.pg_start()
4752         capture = self.pg2.get_capture(len(pkts))
4753         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4754
4755     def test_static(self):
4756         """ NAT64 static translation test """
4757         self.tcp_port_in = 60303
4758         self.udp_port_in = 60304
4759         self.icmp_id_in = 60305
4760         self.tcp_port_out = 60303
4761         self.udp_port_out = 60304
4762         self.icmp_id_out = 60305
4763
4764         ses_num_start = self.nat64_get_ses_num()
4765
4766         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4767                                                 self.nat_addr_n)
4768         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4769         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4770
4771         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4772                                            self.nat_addr_n,
4773                                            self.tcp_port_in,
4774                                            self.tcp_port_out,
4775                                            IP_PROTOS.tcp)
4776         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4777                                            self.nat_addr_n,
4778                                            self.udp_port_in,
4779                                            self.udp_port_out,
4780                                            IP_PROTOS.udp)
4781         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4782                                            self.nat_addr_n,
4783                                            self.icmp_id_in,
4784                                            self.icmp_id_out,
4785                                            IP_PROTOS.icmp)
4786
4787         # in2out
4788         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4789         self.pg0.add_stream(pkts)
4790         self.pg_enable_capture(self.pg_interfaces)
4791         self.pg_start()
4792         capture = self.pg1.get_capture(len(pkts))
4793         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4794                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4795
4796         # out2in
4797         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4798         self.pg1.add_stream(pkts)
4799         self.pg_enable_capture(self.pg_interfaces)
4800         self.pg_start()
4801         capture = self.pg0.get_capture(len(pkts))
4802         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4803         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4804
4805         ses_num_end = self.nat64_get_ses_num()
4806
4807         self.assertEqual(ses_num_end - ses_num_start, 3)
4808
4809     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4810     def test_session_timeout(self):
4811         """ NAT64 session timeout """
4812         self.icmp_id_in = 1234
4813         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4814                                                 self.nat_addr_n)
4815         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4816         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4817         self.vapi.nat64_set_timeouts(icmp=5)
4818
4819         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4820         self.pg0.add_stream(pkts)
4821         self.pg_enable_capture(self.pg_interfaces)
4822         self.pg_start()
4823         capture = self.pg1.get_capture(len(pkts))
4824
4825         ses_num_before_timeout = self.nat64_get_ses_num()
4826
4827         sleep(15)
4828
4829         # ICMP session after timeout
4830         ses_num_after_timeout = self.nat64_get_ses_num()
4831         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4832
4833     def test_icmp_error(self):
4834         """ NAT64 ICMP Error message translation """
4835         self.tcp_port_in = 6303
4836         self.udp_port_in = 6304
4837         self.icmp_id_in = 6305
4838
4839         ses_num_start = self.nat64_get_ses_num()
4840
4841         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4842                                                 self.nat_addr_n)
4843         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4844         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4845
4846         # send some packets to create sessions
4847         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4848         self.pg0.add_stream(pkts)
4849         self.pg_enable_capture(self.pg_interfaces)
4850         self.pg_start()
4851         capture_ip4 = self.pg1.get_capture(len(pkts))
4852         self.verify_capture_out(capture_ip4,
4853                                 nat_ip=self.nat_addr,
4854                                 dst_ip=self.pg1.remote_ip4)
4855
4856         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4857         self.pg1.add_stream(pkts)
4858         self.pg_enable_capture(self.pg_interfaces)
4859         self.pg_start()
4860         capture_ip6 = self.pg0.get_capture(len(pkts))
4861         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4862         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4863                                    self.pg0.remote_ip6)
4864
4865         # in2out
4866         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4867                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4868                 ICMPv6DestUnreach(code=1) /
4869                 packet[IPv6] for packet in capture_ip6]
4870         self.pg0.add_stream(pkts)
4871         self.pg_enable_capture(self.pg_interfaces)
4872         self.pg_start()
4873         capture = self.pg1.get_capture(len(pkts))
4874         for packet in capture:
4875             try:
4876                 self.assertEqual(packet[IP].src, self.nat_addr)
4877                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4878                 self.assertEqual(packet[ICMP].type, 3)
4879                 self.assertEqual(packet[ICMP].code, 13)
4880                 inner = packet[IPerror]
4881                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4882                 self.assertEqual(inner.dst, self.nat_addr)
4883                 self.check_icmp_checksum(packet)
4884                 if inner.haslayer(TCPerror):
4885                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4886                 elif inner.haslayer(UDPerror):
4887                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4888                 else:
4889                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4890             except:
4891                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4892                 raise
4893
4894         # out2in
4895         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4896                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4897                 ICMP(type=3, code=13) /
4898                 packet[IP] for packet in capture_ip4]
4899         self.pg1.add_stream(pkts)
4900         self.pg_enable_capture(self.pg_interfaces)
4901         self.pg_start()
4902         capture = self.pg0.get_capture(len(pkts))
4903         for packet in capture:
4904             try:
4905                 self.assertEqual(packet[IPv6].src, ip.src)
4906                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4907                 icmp = packet[ICMPv6DestUnreach]
4908                 self.assertEqual(icmp.code, 1)
4909                 inner = icmp[IPerror6]
4910                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4911                 self.assertEqual(inner.dst, ip.src)
4912                 self.check_icmpv6_checksum(packet)
4913                 if inner.haslayer(TCPerror):
4914                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4915                 elif inner.haslayer(UDPerror):
4916                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4917                 else:
4918                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4919                                      self.icmp_id_in)
4920             except:
4921                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4922                 raise
4923
4924     def test_hairpinning(self):
4925         """ NAT64 hairpinning """
4926
4927         client = self.pg0.remote_hosts[0]
4928         server = self.pg0.remote_hosts[1]
4929         server_tcp_in_port = 22
4930         server_tcp_out_port = 4022
4931         server_udp_in_port = 23
4932         server_udp_out_port = 4023
4933         client_tcp_in_port = 1234
4934         client_udp_in_port = 1235
4935         client_tcp_out_port = 0
4936         client_udp_out_port = 0
4937         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4938         nat_addr_ip6 = ip.src
4939
4940         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4941                                                 self.nat_addr_n)
4942         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4943         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4944
4945         self.vapi.nat64_add_del_static_bib(server.ip6n,
4946                                            self.nat_addr_n,
4947                                            server_tcp_in_port,
4948                                            server_tcp_out_port,
4949                                            IP_PROTOS.tcp)
4950         self.vapi.nat64_add_del_static_bib(server.ip6n,
4951                                            self.nat_addr_n,
4952                                            server_udp_in_port,
4953                                            server_udp_out_port,
4954                                            IP_PROTOS.udp)
4955
4956         # client to server
4957         pkts = []
4958         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4959              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4960              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4961         pkts.append(p)
4962         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4963              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4964              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4965         pkts.append(p)
4966         self.pg0.add_stream(pkts)
4967         self.pg_enable_capture(self.pg_interfaces)
4968         self.pg_start()
4969         capture = self.pg0.get_capture(len(pkts))
4970         for packet in capture:
4971             try:
4972                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4973                 self.assertEqual(packet[IPv6].dst, server.ip6)
4974                 if packet.haslayer(TCP):
4975                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4976                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4977                     self.check_tcp_checksum(packet)
4978                     client_tcp_out_port = packet[TCP].sport
4979                 else:
4980                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4981                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4982                     self.check_udp_checksum(packet)
4983                     client_udp_out_port = packet[UDP].sport
4984             except:
4985                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4986                 raise
4987
4988         # server to client
4989         pkts = []
4990         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4991              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4992              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4993         pkts.append(p)
4994         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4996              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4997         pkts.append(p)
4998         self.pg0.add_stream(pkts)
4999         self.pg_enable_capture(self.pg_interfaces)
5000         self.pg_start()
5001         capture = self.pg0.get_capture(len(pkts))
5002         for packet in capture:
5003             try:
5004                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5005                 self.assertEqual(packet[IPv6].dst, client.ip6)
5006                 if packet.haslayer(TCP):
5007                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5008                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5009                     self.check_tcp_checksum(packet)
5010                 else:
5011                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
5012                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
5013                     self.check_udp_checksum(packet)
5014             except:
5015                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5016                 raise
5017
5018         # ICMP error
5019         pkts = []
5020         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5021                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5022                 ICMPv6DestUnreach(code=1) /
5023                 packet[IPv6] for packet in capture]
5024         self.pg0.add_stream(pkts)
5025         self.pg_enable_capture(self.pg_interfaces)
5026         self.pg_start()
5027         capture = self.pg0.get_capture(len(pkts))
5028         for packet in capture:
5029             try:
5030                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5031                 self.assertEqual(packet[IPv6].dst, server.ip6)
5032                 icmp = packet[ICMPv6DestUnreach]
5033                 self.assertEqual(icmp.code, 1)
5034                 inner = icmp[IPerror6]
5035                 self.assertEqual(inner.src, server.ip6)
5036                 self.assertEqual(inner.dst, nat_addr_ip6)
5037                 self.check_icmpv6_checksum(packet)
5038                 if inner.haslayer(TCPerror):
5039                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5040                     self.assertEqual(inner[TCPerror].dport,
5041                                      client_tcp_out_port)
5042                 else:
5043                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5044                     self.assertEqual(inner[UDPerror].dport,
5045                                      client_udp_out_port)
5046             except:
5047                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5048                 raise
5049
5050     def test_prefix(self):
5051         """ NAT64 Network-Specific Prefix """
5052
5053         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5054                                                 self.nat_addr_n)
5055         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5056         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5057         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5058                                                 self.vrf1_nat_addr_n,
5059                                                 vrf_id=self.vrf1_id)
5060         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5061
5062         # Add global prefix
5063         global_pref64 = "2001:db8::"
5064         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5065         global_pref64_len = 32
5066         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5067
5068         prefix = self.vapi.nat64_prefix_dump()
5069         self.assertEqual(len(prefix), 1)
5070         self.assertEqual(prefix[0].prefix, global_pref64_n)
5071         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5072         self.assertEqual(prefix[0].vrf_id, 0)
5073
5074         # Add tenant specific prefix
5075         vrf1_pref64 = "2001:db8:122:300::"
5076         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5077         vrf1_pref64_len = 56
5078         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5079                                        vrf1_pref64_len,
5080                                        vrf_id=self.vrf1_id)
5081         prefix = self.vapi.nat64_prefix_dump()
5082         self.assertEqual(len(prefix), 2)
5083
5084         # Global prefix
5085         pkts = self.create_stream_in_ip6(self.pg0,
5086                                          self.pg1,
5087                                          pref=global_pref64,
5088                                          plen=global_pref64_len)
5089         self.pg0.add_stream(pkts)
5090         self.pg_enable_capture(self.pg_interfaces)
5091         self.pg_start()
5092         capture = self.pg1.get_capture(len(pkts))
5093         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5094                                 dst_ip=self.pg1.remote_ip4)
5095
5096         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5097         self.pg1.add_stream(pkts)
5098         self.pg_enable_capture(self.pg_interfaces)
5099         self.pg_start()
5100         capture = self.pg0.get_capture(len(pkts))
5101         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5102                                   global_pref64,
5103                                   global_pref64_len)
5104         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5105
5106         # Tenant specific prefix
5107         pkts = self.create_stream_in_ip6(self.pg2,
5108                                          self.pg1,
5109                                          pref=vrf1_pref64,
5110                                          plen=vrf1_pref64_len)
5111         self.pg2.add_stream(pkts)
5112         self.pg_enable_capture(self.pg_interfaces)
5113         self.pg_start()
5114         capture = self.pg1.get_capture(len(pkts))
5115         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5116                                 dst_ip=self.pg1.remote_ip4)
5117
5118         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5119         self.pg1.add_stream(pkts)
5120         self.pg_enable_capture(self.pg_interfaces)
5121         self.pg_start()
5122         capture = self.pg2.get_capture(len(pkts))
5123         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5124                                   vrf1_pref64,
5125                                   vrf1_pref64_len)
5126         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5127
5128     def test_unknown_proto(self):
5129         """ NAT64 translate packet with unknown protocol """
5130
5131         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5132                                                 self.nat_addr_n)
5133         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5134         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5135         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5136
5137         # in2out
5138         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5139              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5140              TCP(sport=self.tcp_port_in, dport=20))
5141         self.pg0.add_stream(p)
5142         self.pg_enable_capture(self.pg_interfaces)
5143         self.pg_start()
5144         p = self.pg1.get_capture(1)
5145
5146         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5147              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5148              GRE() /
5149              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5150              TCP(sport=1234, dport=1234))
5151         self.pg0.add_stream(p)
5152         self.pg_enable_capture(self.pg_interfaces)
5153         self.pg_start()
5154         p = self.pg1.get_capture(1)
5155         packet = p[0]
5156         try:
5157             self.assertEqual(packet[IP].src, self.nat_addr)
5158             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5159             self.assertTrue(packet.haslayer(GRE))
5160             self.check_ip_checksum(packet)
5161         except:
5162             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5163             raise
5164
5165         # out2in
5166         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5167              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5168              GRE() /
5169              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5170              TCP(sport=1234, dport=1234))
5171         self.pg1.add_stream(p)
5172         self.pg_enable_capture(self.pg_interfaces)
5173         self.pg_start()
5174         p = self.pg0.get_capture(1)
5175         packet = p[0]
5176         try:
5177             self.assertEqual(packet[IPv6].src, remote_ip6)
5178             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5179             self.assertEqual(packet[IPv6].nh, 47)
5180         except:
5181             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5182             raise
5183
5184     def test_hairpinning_unknown_proto(self):
5185         """ NAT64 translate packet with unknown protocol - hairpinning """
5186
5187         client = self.pg0.remote_hosts[0]
5188         server = self.pg0.remote_hosts[1]
5189         server_tcp_in_port = 22
5190         server_tcp_out_port = 4022
5191         client_tcp_in_port = 1234
5192         client_tcp_out_port = 1235
5193         server_nat_ip = "10.0.0.100"
5194         client_nat_ip = "10.0.0.110"
5195         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5196         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5197         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5198         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5199
5200         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5201                                                 client_nat_ip_n)
5202         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5203         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5204
5205         self.vapi.nat64_add_del_static_bib(server.ip6n,
5206                                            server_nat_ip_n,
5207                                            server_tcp_in_port,
5208                                            server_tcp_out_port,
5209                                            IP_PROTOS.tcp)
5210
5211         self.vapi.nat64_add_del_static_bib(server.ip6n,
5212                                            server_nat_ip_n,
5213                                            0,
5214                                            0,
5215                                            IP_PROTOS.gre)
5216
5217         self.vapi.nat64_add_del_static_bib(client.ip6n,
5218                                            client_nat_ip_n,
5219                                            client_tcp_in_port,
5220                                            client_tcp_out_port,
5221                                            IP_PROTOS.tcp)
5222
5223         # client to server
5224         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5225              IPv6(src=client.ip6, dst=server_nat_ip6) /
5226              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5227         self.pg0.add_stream(p)
5228         self.pg_enable_capture(self.pg_interfaces)
5229         self.pg_start()
5230         p = self.pg0.get_capture(1)
5231
5232         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5233              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5234              GRE() /
5235              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5236              TCP(sport=1234, dport=1234))
5237         self.pg0.add_stream(p)
5238         self.pg_enable_capture(self.pg_interfaces)
5239         self.pg_start()
5240         p = self.pg0.get_capture(1)
5241         packet = p[0]
5242         try:
5243             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5244             self.assertEqual(packet[IPv6].dst, server.ip6)
5245             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5246         except:
5247             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5248             raise
5249
5250         # server to client
5251         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5252              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5253              GRE() /
5254              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5255              TCP(sport=1234, dport=1234))
5256         self.pg0.add_stream(p)
5257         self.pg_enable_capture(self.pg_interfaces)
5258         self.pg_start()
5259         p = self.pg0.get_capture(1)
5260         packet = p[0]
5261         try:
5262             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5263             self.assertEqual(packet[IPv6].dst, client.ip6)
5264             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5265         except:
5266             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5267             raise
5268
5269     def test_one_armed_nat64(self):
5270         """ One armed NAT64 """
5271         external_port = 0
5272         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5273                                            '64:ff9b::',
5274                                            96)
5275
5276         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5277                                                 self.nat_addr_n)
5278         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5279         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5280
5281         # in2out
5282         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5283              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5284              TCP(sport=12345, dport=80))
5285         self.pg3.add_stream(p)
5286         self.pg_enable_capture(self.pg_interfaces)
5287         self.pg_start()
5288         capture = self.pg3.get_capture(1)
5289         p = capture[0]
5290         try:
5291             ip = p[IP]
5292             tcp = p[TCP]
5293             self.assertEqual(ip.src, self.nat_addr)
5294             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5295             self.assertNotEqual(tcp.sport, 12345)
5296             external_port = tcp.sport
5297             self.assertEqual(tcp.dport, 80)
5298             self.check_tcp_checksum(p)
5299             self.check_ip_checksum(p)
5300         except:
5301             self.logger.error(ppp("Unexpected or invalid packet:", p))
5302             raise
5303
5304         # out2in
5305         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5306              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5307              TCP(sport=80, dport=external_port))
5308         self.pg3.add_stream(p)
5309         self.pg_enable_capture(self.pg_interfaces)
5310         self.pg_start()
5311         capture = self.pg3.get_capture(1)
5312         p = capture[0]
5313         try:
5314             ip = p[IPv6]
5315             tcp = p[TCP]
5316             self.assertEqual(ip.src, remote_host_ip6)
5317             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5318             self.assertEqual(tcp.sport, 80)
5319             self.assertEqual(tcp.dport, 12345)
5320             self.check_tcp_checksum(p)
5321         except:
5322             self.logger.error(ppp("Unexpected or invalid packet:", p))
5323             raise
5324
5325     def test_frag_in_order(self):
5326         """ NAT64 translate fragments arriving in order """
5327         self.tcp_port_in = random.randint(1025, 65535)
5328
5329         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5330                                                 self.nat_addr_n)
5331         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5332         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5333
5334         reass = self.vapi.nat_reass_dump()
5335         reass_n_start = len(reass)
5336
5337         # in2out
5338         data = 'a' * 200
5339         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5340                                            self.tcp_port_in, 20, data)
5341         self.pg0.add_stream(pkts)
5342         self.pg_enable_capture(self.pg_interfaces)
5343         self.pg_start()
5344         frags = self.pg1.get_capture(len(pkts))
5345         p = self.reass_frags_and_verify(frags,
5346                                         self.nat_addr,
5347                                         self.pg1.remote_ip4)
5348         self.assertEqual(p[TCP].dport, 20)
5349         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5350         self.tcp_port_out = p[TCP].sport
5351         self.assertEqual(data, p[Raw].load)
5352
5353         # out2in
5354         data = "A" * 4 + "b" * 16 + "C" * 3
5355         pkts = self.create_stream_frag(self.pg1,
5356                                        self.nat_addr,
5357                                        20,
5358                                        self.tcp_port_out,
5359                                        data)
5360         self.pg1.add_stream(pkts)
5361         self.pg_enable_capture(self.pg_interfaces)
5362         self.pg_start()
5363         frags = self.pg0.get_capture(len(pkts))
5364         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5365         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5366         self.assertEqual(p[TCP].sport, 20)
5367         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5368         self.assertEqual(data, p[Raw].load)
5369
5370         reass = self.vapi.nat_reass_dump()
5371         reass_n_end = len(reass)
5372
5373         self.assertEqual(reass_n_end - reass_n_start, 2)
5374
5375     def test_reass_hairpinning(self):
5376         """ NAT64 fragments hairpinning """
5377         data = 'a' * 200
5378         client = self.pg0.remote_hosts[0]
5379         server = self.pg0.remote_hosts[1]
5380         server_in_port = random.randint(1025, 65535)
5381         server_out_port = random.randint(1025, 65535)
5382         client_in_port = random.randint(1025, 65535)
5383         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5384         nat_addr_ip6 = ip.src
5385
5386         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5387                                                 self.nat_addr_n)
5388         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5389         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5390
5391         # add static BIB entry for server
5392         self.vapi.nat64_add_del_static_bib(server.ip6n,
5393                                            self.nat_addr_n,
5394                                            server_in_port,
5395                                            server_out_port,
5396                                            IP_PROTOS.tcp)
5397
5398         # send packet from host to server
5399         pkts = self.create_stream_frag_ip6(self.pg0,
5400                                            self.nat_addr,
5401                                            client_in_port,
5402                                            server_out_port,
5403                                            data)
5404         self.pg0.add_stream(pkts)
5405         self.pg_enable_capture(self.pg_interfaces)
5406         self.pg_start()
5407         frags = self.pg0.get_capture(len(pkts))
5408         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5409         self.assertNotEqual(p[TCP].sport, client_in_port)
5410         self.assertEqual(p[TCP].dport, server_in_port)
5411         self.assertEqual(data, p[Raw].load)
5412
5413     def test_frag_out_of_order(self):
5414         """ NAT64 translate fragments arriving out of order """
5415         self.tcp_port_in = random.randint(1025, 65535)
5416
5417         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5418                                                 self.nat_addr_n)
5419         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5420         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5421
5422         # in2out
5423         data = 'a' * 200
5424         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5425                                            self.tcp_port_in, 20, data)
5426         pkts.reverse()
5427         self.pg0.add_stream(pkts)
5428         self.pg_enable_capture(self.pg_interfaces)
5429         self.pg_start()
5430         frags = self.pg1.get_capture(len(pkts))
5431         p = self.reass_frags_and_verify(frags,
5432                                         self.nat_addr,
5433                                         self.pg1.remote_ip4)
5434         self.assertEqual(p[TCP].dport, 20)
5435         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5436         self.tcp_port_out = p[TCP].sport
5437         self.assertEqual(data, p[Raw].load)
5438
5439         # out2in
5440         data = "A" * 4 + "B" * 16 + "C" * 3
5441         pkts = self.create_stream_frag(self.pg1,
5442                                        self.nat_addr,
5443                                        20,
5444                                        self.tcp_port_out,
5445                                        data)
5446         pkts.reverse()
5447         self.pg1.add_stream(pkts)
5448         self.pg_enable_capture(self.pg_interfaces)
5449         self.pg_start()
5450         frags = self.pg0.get_capture(len(pkts))
5451         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5452         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5453         self.assertEqual(p[TCP].sport, 20)
5454         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5455         self.assertEqual(data, p[Raw].load)
5456
5457     def test_interface_addr(self):
5458         """ Acquire NAT64 pool addresses from interface """
5459         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5460
5461         # no address in NAT64 pool
5462         adresses = self.vapi.nat44_address_dump()
5463         self.assertEqual(0, len(adresses))
5464
5465         # configure interface address and check NAT64 address pool
5466         self.pg4.config_ip4()
5467         addresses = self.vapi.nat64_pool_addr_dump()
5468         self.assertEqual(len(addresses), 1)
5469         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5470
5471         # remove interface address and check NAT64 address pool
5472         self.pg4.unconfig_ip4()
5473         addresses = self.vapi.nat64_pool_addr_dump()
5474         self.assertEqual(0, len(adresses))
5475
5476     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5477     def test_ipfix_max_bibs_sessions(self):
5478         """ IPFIX logging maximum session and BIB entries exceeded """
5479         max_bibs = 1280
5480         max_sessions = 2560
5481         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5482                                            '64:ff9b::',
5483                                            96)
5484
5485         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5486                                                 self.nat_addr_n)
5487         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5488         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5489
5490         pkts = []
5491         src = ""
5492         for i in range(0, max_bibs):
5493             src = "fd01:aa::%x" % (i)
5494             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5495                  IPv6(src=src, dst=remote_host_ip6) /
5496                  TCP(sport=12345, dport=80))
5497             pkts.append(p)
5498             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5499                  IPv6(src=src, dst=remote_host_ip6) /
5500                  TCP(sport=12345, dport=22))
5501             pkts.append(p)
5502         self.pg0.add_stream(pkts)
5503         self.pg_enable_capture(self.pg_interfaces)
5504         self.pg_start()
5505         self.pg1.get_capture(max_sessions)
5506
5507         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5508                                      src_address=self.pg3.local_ip4n,
5509                                      path_mtu=512,
5510                                      template_interval=10)
5511         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5512                             src_port=self.ipfix_src_port)
5513
5514         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5515              IPv6(src=src, dst=remote_host_ip6) /
5516              TCP(sport=12345, dport=25))
5517         self.pg0.add_stream(p)
5518         self.pg_enable_capture(self.pg_interfaces)
5519         self.pg_start()
5520         self.pg1.get_capture(0)
5521         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5522         capture = self.pg3.get_capture(9)
5523         ipfix = IPFIXDecoder()
5524         # first load template
5525         for p in capture:
5526             self.assertTrue(p.haslayer(IPFIX))
5527             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5528             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5529             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5530             self.assertEqual(p[UDP].dport, 4739)
5531             self.assertEqual(p[IPFIX].observationDomainID,
5532                              self.ipfix_domain_id)
5533             if p.haslayer(Template):
5534                 ipfix.add_template(p.getlayer(Template))
5535         # verify events in data set
5536         for p in capture:
5537             if p.haslayer(Data):
5538                 data = ipfix.decode_data_set(p.getlayer(Set))
5539                 self.verify_ipfix_max_sessions(data, max_sessions)
5540
5541         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5542              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5543              TCP(sport=12345, dport=80))
5544         self.pg0.add_stream(p)
5545         self.pg_enable_capture(self.pg_interfaces)
5546         self.pg_start()
5547         self.pg1.get_capture(0)
5548         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5549         capture = self.pg3.get_capture(1)
5550         # verify events in data set
5551         for p in capture:
5552             self.assertTrue(p.haslayer(IPFIX))
5553             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5554             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5555             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5556             self.assertEqual(p[UDP].dport, 4739)
5557             self.assertEqual(p[IPFIX].observationDomainID,
5558                              self.ipfix_domain_id)
5559             if p.haslayer(Data):
5560                 data = ipfix.decode_data_set(p.getlayer(Set))
5561                 self.verify_ipfix_max_bibs(data, max_bibs)
5562
5563     def test_ipfix_max_frags(self):
5564         """ IPFIX logging maximum fragments pending reassembly exceeded """
5565         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5566                                                 self.nat_addr_n)
5567         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5568         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5569         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5570         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5571                                      src_address=self.pg3.local_ip4n,
5572                                      path_mtu=512,
5573                                      template_interval=10)
5574         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5575                             src_port=self.ipfix_src_port)
5576
5577         data = 'a' * 200
5578         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5579                                            self.tcp_port_in, 20, data)
5580         self.pg0.add_stream(pkts[-1])
5581         self.pg_enable_capture(self.pg_interfaces)
5582         self.pg_start()
5583         self.pg1.get_capture(0)
5584         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5585         capture = self.pg3.get_capture(9)
5586         ipfix = IPFIXDecoder()
5587         # first load template
5588         for p in capture:
5589             self.assertTrue(p.haslayer(IPFIX))
5590             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5591             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5592             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5593             self.assertEqual(p[UDP].dport, 4739)
5594             self.assertEqual(p[IPFIX].observationDomainID,
5595                              self.ipfix_domain_id)
5596             if p.haslayer(Template):
5597                 ipfix.add_template(p.getlayer(Template))
5598         # verify events in data set
5599         for p in capture:
5600             if p.haslayer(Data):
5601                 data = ipfix.decode_data_set(p.getlayer(Set))
5602                 self.verify_ipfix_max_fragments_ip6(data, 0,
5603                                                     self.pg0.remote_ip6n)
5604
5605     def test_ipfix_bib_ses(self):
5606         """ IPFIX logging NAT64 BIB/session create and delete events """
5607         self.tcp_port_in = random.randint(1025, 65535)
5608         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5609                                            '64:ff9b::',
5610                                            96)
5611
5612         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5613                                                 self.nat_addr_n)
5614         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5615         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5616         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5617                                      src_address=self.pg3.local_ip4n,
5618                                      path_mtu=512,
5619                                      template_interval=10)
5620         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5621                             src_port=self.ipfix_src_port)
5622
5623         # Create
5624         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5625              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5626              TCP(sport=self.tcp_port_in, dport=25))
5627         self.pg0.add_stream(p)
5628         self.pg_enable_capture(self.pg_interfaces)
5629         self.pg_start()
5630         p = self.pg1.get_capture(1)
5631         self.tcp_port_out = p[0][TCP].sport
5632         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5633         capture = self.pg3.get_capture(10)
5634         ipfix = IPFIXDecoder()
5635         # first load template
5636         for p in capture:
5637             self.assertTrue(p.haslayer(IPFIX))
5638             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5639             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5640             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5641             self.assertEqual(p[UDP].dport, 4739)
5642             self.assertEqual(p[IPFIX].observationDomainID,
5643                              self.ipfix_domain_id)
5644             if p.haslayer(Template):
5645                 ipfix.add_template(p.getlayer(Template))
5646         # verify events in data set
5647         for p in capture:
5648             if p.haslayer(Data):
5649                 data = ipfix.decode_data_set(p.getlayer(Set))
5650                 if ord(data[0][230]) == 10:
5651                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5652                 elif ord(data[0][230]) == 6:
5653                     self.verify_ipfix_nat64_ses(data,
5654                                                 1,
5655                                                 self.pg0.remote_ip6n,
5656                                                 self.pg1.remote_ip4,
5657                                                 25)
5658                 else:
5659                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5660
5661         # Delete
5662         self.pg_enable_capture(self.pg_interfaces)
5663         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5664                                                 self.nat_addr_n,
5665                                                 is_add=0)
5666         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5667         capture = self.pg3.get_capture(2)
5668         # verify events in data set
5669         for p in capture:
5670             self.assertTrue(p.haslayer(IPFIX))
5671             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5672             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5673             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5674             self.assertEqual(p[UDP].dport, 4739)
5675             self.assertEqual(p[IPFIX].observationDomainID,
5676                              self.ipfix_domain_id)
5677             if p.haslayer(Data):
5678                 data = ipfix.decode_data_set(p.getlayer(Set))
5679                 if ord(data[0][230]) == 11:
5680                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5681                 elif ord(data[0][230]) == 7:
5682                     self.verify_ipfix_nat64_ses(data,
5683                                                 0,
5684                                                 self.pg0.remote_ip6n,
5685                                                 self.pg1.remote_ip4,
5686                                                 25)
5687                 else:
5688                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5689
5690     def nat64_get_ses_num(self):
5691         """
5692         Return number of active NAT64 sessions.
5693         """
5694         st = self.vapi.nat64_st_dump()
5695         return len(st)
5696
5697     def clear_nat64(self):
5698         """
5699         Clear NAT64 configuration.
5700         """
5701         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5702                             domain_id=self.ipfix_domain_id)
5703         self.ipfix_src_port = 4739
5704         self.ipfix_domain_id = 1
5705
5706         self.vapi.nat64_set_timeouts()
5707
5708         interfaces = self.vapi.nat64_interface_dump()
5709         for intf in interfaces:
5710             if intf.is_inside > 1:
5711                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5712                                                   0,
5713                                                   is_add=0)
5714             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5715                                               intf.is_inside,
5716                                               is_add=0)
5717
5718         bib = self.vapi.nat64_bib_dump(255)
5719         for bibe in bib:
5720             if bibe.is_static:
5721                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5722                                                    bibe.o_addr,
5723                                                    bibe.i_port,
5724                                                    bibe.o_port,
5725                                                    bibe.proto,
5726                                                    bibe.vrf_id,
5727                                                    is_add=0)
5728
5729         adresses = self.vapi.nat64_pool_addr_dump()
5730         for addr in adresses:
5731             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5732                                                     addr.address,
5733                                                     vrf_id=addr.vrf_id,
5734                                                     is_add=0)
5735
5736         prefixes = self.vapi.nat64_prefix_dump()
5737         for prefix in prefixes:
5738             self.vapi.nat64_add_del_prefix(prefix.prefix,
5739                                            prefix.prefix_len,
5740                                            vrf_id=prefix.vrf_id,
5741                                            is_add=0)
5742
5743     def tearDown(self):
5744         super(TestNAT64, self).tearDown()
5745         if not self.vpp_dead:
5746             self.logger.info(self.vapi.cli("show nat64 pool"))
5747             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5748             self.logger.info(self.vapi.cli("show nat64 prefix"))
5749             self.logger.info(self.vapi.cli("show nat64 bib all"))
5750             self.logger.info(self.vapi.cli("show nat64 session table all"))
5751             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5752             self.clear_nat64()
5753
5754
5755 class TestDSlite(MethodHolder):
5756     """ DS-Lite Test Cases """
5757
5758     @classmethod
5759     def setUpClass(cls):
5760         super(TestDSlite, cls).setUpClass()
5761
5762         try:
5763             cls.nat_addr = '10.0.0.3'
5764             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5765
5766             cls.create_pg_interfaces(range(2))
5767             cls.pg0.admin_up()
5768             cls.pg0.config_ip4()
5769             cls.pg0.resolve_arp()
5770             cls.pg1.admin_up()
5771             cls.pg1.config_ip6()
5772             cls.pg1.generate_remote_hosts(2)
5773             cls.pg1.configure_ipv6_neighbors()
5774
5775         except Exception:
5776             super(TestDSlite, cls).tearDownClass()
5777             raise
5778
5779     def test_dslite(self):
5780         """ Test DS-Lite """
5781         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5782                                                  self.nat_addr_n)
5783         aftr_ip4 = '192.0.0.1'
5784         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5785         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5786         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5787         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5788
5789         # UDP
5790         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5791              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5792              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5793              UDP(sport=20000, dport=10000))
5794         self.pg1.add_stream(p)
5795         self.pg_enable_capture(self.pg_interfaces)
5796         self.pg_start()
5797         capture = self.pg0.get_capture(1)
5798         capture = capture[0]
5799         self.assertFalse(capture.haslayer(IPv6))
5800         self.assertEqual(capture[IP].src, self.nat_addr)
5801         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5802         self.assertNotEqual(capture[UDP].sport, 20000)
5803         self.assertEqual(capture[UDP].dport, 10000)
5804         self.check_ip_checksum(capture)
5805         out_port = capture[UDP].sport
5806
5807         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5808              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5809              UDP(sport=10000, dport=out_port))
5810         self.pg0.add_stream(p)
5811         self.pg_enable_capture(self.pg_interfaces)
5812         self.pg_start()
5813         capture = self.pg1.get_capture(1)
5814         capture = capture[0]
5815         self.assertEqual(capture[IPv6].src, aftr_ip6)
5816         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5817         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5818         self.assertEqual(capture[IP].dst, '192.168.1.1')
5819         self.assertEqual(capture[UDP].sport, 10000)
5820         self.assertEqual(capture[UDP].dport, 20000)
5821         self.check_ip_checksum(capture)
5822
5823         # TCP
5824         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5825              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5826              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5827              TCP(sport=20001, dport=10001))
5828         self.pg1.add_stream(p)
5829         self.pg_enable_capture(self.pg_interfaces)
5830         self.pg_start()
5831         capture = self.pg0.get_capture(1)
5832         capture = capture[0]
5833         self.assertFalse(capture.haslayer(IPv6))
5834         self.assertEqual(capture[IP].src, self.nat_addr)
5835         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5836         self.assertNotEqual(capture[TCP].sport, 20001)
5837         self.assertEqual(capture[TCP].dport, 10001)
5838         self.check_ip_checksum(capture)
5839         self.check_tcp_checksum(capture)
5840         out_port = capture[TCP].sport
5841
5842         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5843              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5844              TCP(sport=10001, dport=out_port))
5845         self.pg0.add_stream(p)
5846         self.pg_enable_capture(self.pg_interfaces)
5847         self.pg_start()
5848         capture = self.pg1.get_capture(1)
5849         capture = capture[0]
5850         self.assertEqual(capture[IPv6].src, aftr_ip6)
5851         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5852         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5853         self.assertEqual(capture[IP].dst, '192.168.1.1')
5854         self.assertEqual(capture[TCP].sport, 10001)
5855         self.assertEqual(capture[TCP].dport, 20001)
5856         self.check_ip_checksum(capture)
5857         self.check_tcp_checksum(capture)
5858
5859         # ICMP
5860         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5861              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5862              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5863              ICMP(id=4000, type='echo-request'))
5864         self.pg1.add_stream(p)
5865         self.pg_enable_capture(self.pg_interfaces)
5866         self.pg_start()
5867         capture = self.pg0.get_capture(1)
5868         capture = capture[0]
5869         self.assertFalse(capture.haslayer(IPv6))
5870         self.assertEqual(capture[IP].src, self.nat_addr)
5871         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5872         self.assertNotEqual(capture[ICMP].id, 4000)
5873         self.check_ip_checksum(capture)
5874         self.check_icmp_checksum(capture)
5875         out_id = capture[ICMP].id
5876
5877         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5878              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5879              ICMP(id=out_id, type='echo-reply'))
5880         self.pg0.add_stream(p)
5881         self.pg_enable_capture(self.pg_interfaces)
5882         self.pg_start()
5883         capture = self.pg1.get_capture(1)
5884         capture = capture[0]
5885         self.assertEqual(capture[IPv6].src, aftr_ip6)
5886         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5887         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5888         self.assertEqual(capture[IP].dst, '192.168.1.1')
5889         self.assertEqual(capture[ICMP].id, 4000)
5890         self.check_ip_checksum(capture)
5891         self.check_icmp_checksum(capture)
5892
5893         # ping DS-Lite AFTR tunnel endpoint address
5894         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5895              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5896              ICMPv6EchoRequest())
5897         self.pg1.add_stream(p)
5898         self.pg_enable_capture(self.pg_interfaces)
5899         self.pg_start()
5900         capture = self.pg1.get_capture(1)
5901         self.assertEqual(1, len(capture))
5902         capture = capture[0]
5903         self.assertEqual(capture[IPv6].src, aftr_ip6)
5904         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5905         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5906
5907     def tearDown(self):
5908         super(TestDSlite, self).tearDown()
5909         if not self.vpp_dead:
5910             self.logger.info(self.vapi.cli("show dslite pool"))
5911             self.logger.info(
5912                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5913             self.logger.info(self.vapi.cli("show dslite sessions"))
5914
5915
5916 class TestDSliteCE(MethodHolder):
5917     """ DS-Lite CE Test Cases """
5918
5919     @classmethod
5920     def setUpConstants(cls):
5921         super(TestDSliteCE, cls).setUpConstants()
5922         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5923
5924     @classmethod
5925     def setUpClass(cls):
5926         super(TestDSliteCE, cls).setUpClass()
5927
5928         try:
5929             cls.create_pg_interfaces(range(2))
5930             cls.pg0.admin_up()
5931             cls.pg0.config_ip4()
5932             cls.pg0.resolve_arp()
5933             cls.pg1.admin_up()
5934             cls.pg1.config_ip6()
5935             cls.pg1.generate_remote_hosts(1)
5936             cls.pg1.configure_ipv6_neighbors()
5937
5938         except Exception:
5939             super(TestDSliteCE, cls).tearDownClass()
5940             raise
5941
5942     def test_dslite_ce(self):
5943         """ Test DS-Lite CE """
5944
5945         b4_ip4 = '192.0.0.2'
5946         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5947         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5948         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5949         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5950
5951         aftr_ip4 = '192.0.0.1'
5952         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5953         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5954         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5955         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5956
5957         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5958                                    dst_address_length=128,
5959                                    next_hop_address=self.pg1.remote_ip6n,
5960                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5961                                    is_ipv6=1)
5962
5963         # UDP encapsulation
5964         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5965              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5966              UDP(sport=10000, dport=20000))
5967         self.pg0.add_stream(p)
5968         self.pg_enable_capture(self.pg_interfaces)
5969         self.pg_start()
5970         capture = self.pg1.get_capture(1)
5971         capture = capture[0]
5972         self.assertEqual(capture[IPv6].src, b4_ip6)
5973         self.assertEqual(capture[IPv6].dst, aftr_ip6)
5974         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5975         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5976         self.assertEqual(capture[UDP].sport, 10000)
5977         self.assertEqual(capture[UDP].dport, 20000)
5978         self.check_ip_checksum(capture)
5979
5980         # UDP decapsulation
5981         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5982              IPv6(dst=b4_ip6, src=aftr_ip6) /
5983              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5984              UDP(sport=20000, dport=10000))
5985         self.pg1.add_stream(p)
5986         self.pg_enable_capture(self.pg_interfaces)
5987         self.pg_start()
5988         capture = self.pg0.get_capture(1)
5989         capture = capture[0]
5990         self.assertFalse(capture.haslayer(IPv6))
5991         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5992         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5993         self.assertEqual(capture[UDP].sport, 20000)
5994         self.assertEqual(capture[UDP].dport, 10000)
5995         self.check_ip_checksum(capture)
5996
5997         # ping DS-Lite B4 tunnel endpoint address
5998         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5999              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6000              ICMPv6EchoRequest())
6001         self.pg1.add_stream(p)
6002         self.pg_enable_capture(self.pg_interfaces)
6003         self.pg_start()
6004         capture = self.pg1.get_capture(1)
6005         self.assertEqual(1, len(capture))
6006         capture = capture[0]
6007         self.assertEqual(capture[IPv6].src, b4_ip6)
6008         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6009         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6010
6011     def tearDown(self):
6012         super(TestDSliteCE, self).tearDown()
6013         if not self.vpp_dead:
6014             self.logger.info(
6015                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6016             self.logger.info(
6017                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6018
6019 if __name__ == '__main__':
6020     unittest.main(testRunner=VppTestRunner)