NAT44: asymmetrical static mapping and one-armed NAT (VPP-1138)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 is_add=0)
1034
1035         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036         for lb_sm in lb_static_mappings:
1037             self.vapi.nat44_add_del_lb_static_mapping(
1038                 lb_sm.external_addr,
1039                 lb_sm.external_port,
1040                 lb_sm.protocol,
1041                 vrf_id=lb_sm.vrf_id,
1042                 twice_nat=lb_sm.twice_nat,
1043                 out2in_only=lb_sm.out2in_only,
1044                 is_add=0,
1045                 local_num=0,
1046                 locals=[])
1047
1048         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049         for id_m in identity_mappings:
1050             self.vapi.nat44_add_del_identity_mapping(
1051                 addr_only=id_m.addr_only,
1052                 ip=id_m.ip_address,
1053                 port=id_m.port,
1054                 sw_if_index=id_m.sw_if_index,
1055                 vrf_id=id_m.vrf_id,
1056                 protocol=id_m.protocol,
1057                 is_add=0)
1058
1059         adresses = self.vapi.nat44_address_dump()
1060         for addr in adresses:
1061             self.vapi.nat44_add_del_address_range(addr.ip_address,
1062                                                   addr.ip_address,
1063                                                   twice_nat=addr.twice_nat,
1064                                                   is_add=0)
1065
1066         self.vapi.nat_set_reass()
1067         self.vapi.nat_set_reass(is_ip6=1)
1068
1069     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070                                  local_port=0, external_port=0, vrf_id=0,
1071                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1072                                  proto=0, twice_nat=0, out2in_only=0):
1073         """
1074         Add/delete NAT44 static mapping
1075
1076         :param local_ip: Local IP address
1077         :param external_ip: External IP address
1078         :param local_port: Local port number (Optional)
1079         :param external_port: External port number (Optional)
1080         :param vrf_id: VRF ID (Default 0)
1081         :param is_add: 1 if add, 0 if delete (Default add)
1082         :param external_sw_if_index: External interface instead of IP address
1083         :param proto: IP protocol (Mandatory if port specified)
1084         :param twice_nat: 1 if translate external host address and port
1085         :param out2in_only: if 1 rule is matching only out2in direction
1086         """
1087         addr_only = 1
1088         if local_port and external_port:
1089             addr_only = 0
1090         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092         self.vapi.nat44_add_del_static_mapping(
1093             l_ip,
1094             e_ip,
1095             external_sw_if_index,
1096             local_port,
1097             external_port,
1098             addr_only,
1099             vrf_id,
1100             proto,
1101             twice_nat,
1102             out2in_only,
1103             is_add)
1104
1105     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1106         """
1107         Add/delete NAT44 address
1108
1109         :param ip: IP address
1110         :param is_add: 1 if add, 0 if delete (Default add)
1111         :param twice_nat: twice NAT address for extenal hosts
1112         """
1113         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1115                                               vrf_id=vrf_id,
1116                                               twice_nat=twice_nat)
1117
1118     def test_dynamic(self):
1119         """ NAT44 dynamic translation test """
1120
1121         self.nat44_add_address(self.nat_addr)
1122         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124                                                   is_inside=0)
1125
1126         # in2out
1127         pkts = self.create_stream_in(self.pg0, self.pg1)
1128         self.pg0.add_stream(pkts)
1129         self.pg_enable_capture(self.pg_interfaces)
1130         self.pg_start()
1131         capture = self.pg1.get_capture(len(pkts))
1132         self.verify_capture_out(capture)
1133
1134         # out2in
1135         pkts = self.create_stream_out(self.pg1)
1136         self.pg1.add_stream(pkts)
1137         self.pg_enable_capture(self.pg_interfaces)
1138         self.pg_start()
1139         capture = self.pg0.get_capture(len(pkts))
1140         self.verify_capture_in(capture, self.pg0)
1141
1142     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143         """ NAT44 handling of client packets with TTL=1 """
1144
1145         self.nat44_add_address(self.nat_addr)
1146         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1148                                                   is_inside=0)
1149
1150         # Client side - generate traffic
1151         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152         self.pg0.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155
1156         # Client side - verify ICMP type 11 packets
1157         capture = self.pg0.get_capture(len(pkts))
1158         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1159
1160     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161         """ NAT44 handling of server packets with TTL=1 """
1162
1163         self.nat44_add_address(self.nat_addr)
1164         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1166                                                   is_inside=0)
1167
1168         # Client side - create sessions
1169         pkts = self.create_stream_in(self.pg0, self.pg1)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173
1174         # Server side - generate traffic
1175         capture = self.pg1.get_capture(len(pkts))
1176         self.verify_capture_out(capture)
1177         pkts = self.create_stream_out(self.pg1, ttl=1)
1178         self.pg1.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181
1182         # Server side - verify ICMP type 11 packets
1183         capture = self.pg1.get_capture(len(pkts))
1184         self.verify_capture_out_with_icmp_errors(capture,
1185                                                  src_ip=self.pg1.local_ip4)
1186
1187     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188         """ NAT44 handling of error responses to client packets with TTL=2 """
1189
1190         self.nat44_add_address(self.nat_addr)
1191         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1193                                                   is_inside=0)
1194
1195         # Client side - generate traffic
1196         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197         self.pg0.add_stream(pkts)
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pg_start()
1200
1201         # Server side - simulate ICMP type 11 response
1202         capture = self.pg1.get_capture(len(pkts))
1203         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205                 ICMP(type=11) / packet[IP] for packet in capture]
1206         self.pg1.add_stream(pkts)
1207         self.pg_enable_capture(self.pg_interfaces)
1208         self.pg_start()
1209
1210         # Client side - verify ICMP type 11 packets
1211         capture = self.pg0.get_capture(len(pkts))
1212         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1213
1214     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215         """ NAT44 handling of error responses to server packets with TTL=2 """
1216
1217         self.nat44_add_address(self.nat_addr)
1218         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220                                                   is_inside=0)
1221
1222         # Client side - create sessions
1223         pkts = self.create_stream_in(self.pg0, self.pg1)
1224         self.pg0.add_stream(pkts)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227
1228         # Server side - generate traffic
1229         capture = self.pg1.get_capture(len(pkts))
1230         self.verify_capture_out(capture)
1231         pkts = self.create_stream_out(self.pg1, ttl=2)
1232         self.pg1.add_stream(pkts)
1233         self.pg_enable_capture(self.pg_interfaces)
1234         self.pg_start()
1235
1236         # Client side - simulate ICMP type 11 response
1237         capture = self.pg0.get_capture(len(pkts))
1238         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240                 ICMP(type=11) / packet[IP] for packet in capture]
1241         self.pg0.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244
1245         # Server side - verify ICMP type 11 packets
1246         capture = self.pg1.get_capture(len(pkts))
1247         self.verify_capture_out_with_icmp_errors(capture)
1248
1249     def test_ping_out_interface_from_outside(self):
1250         """ Ping NAT44 out interface from outside network """
1251
1252         self.nat44_add_address(self.nat_addr)
1253         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1255                                                   is_inside=0)
1256
1257         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259              ICMP(id=self.icmp_id_out, type='echo-request'))
1260         pkts = [p]
1261         self.pg1.add_stream(pkts)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg1.get_capture(len(pkts))
1265         self.assertEqual(1, len(capture))
1266         packet = capture[0]
1267         try:
1268             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1272         except:
1273             self.logger.error(ppp("Unexpected or invalid packet "
1274                                   "(outside network):", packet))
1275             raise
1276
1277     def test_ping_internal_host_from_outside(self):
1278         """ Ping internal host from outside network """
1279
1280         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1283                                                   is_inside=0)
1284
1285         # out2in
1286         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288                ICMP(id=self.icmp_id_out, type='echo-request'))
1289         self.pg1.add_stream(pkt)
1290         self.pg_enable_capture(self.pg_interfaces)
1291         self.pg_start()
1292         capture = self.pg0.get_capture(1)
1293         self.verify_capture_in(capture, self.pg0, packet_num=1)
1294         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1295
1296         # in2out
1297         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299                ICMP(id=self.icmp_id_in, type='echo-reply'))
1300         self.pg0.add_stream(pkt)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg1.get_capture(1)
1304         self.verify_capture_out(capture, same_port=True, packet_num=1)
1305         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1306
1307     def test_forwarding(self):
1308         """ NAT44 forwarding test """
1309
1310         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1312                                                   is_inside=0)
1313         self.vapi.nat44_forwarding_enable_disable(1)
1314
1315         real_ip = self.pg0.remote_ip4n
1316         alias_ip = self.nat_addr_n
1317         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318                                                external_ip=alias_ip)
1319
1320         try:
1321             # in2out - static mapping match
1322
1323             pkts = self.create_stream_out(self.pg1)
1324             self.pg1.add_stream(pkts)
1325             self.pg_enable_capture(self.pg_interfaces)
1326             self.pg_start()
1327             capture = self.pg0.get_capture(len(pkts))
1328             self.verify_capture_in(capture, self.pg0)
1329
1330             pkts = self.create_stream_in(self.pg0, self.pg1)
1331             self.pg0.add_stream(pkts)
1332             self.pg_enable_capture(self.pg_interfaces)
1333             self.pg_start()
1334             capture = self.pg1.get_capture(len(pkts))
1335             self.verify_capture_out(capture, same_port=True)
1336
1337             # in2out - no static mapping match
1338
1339             host0 = self.pg0.remote_hosts[0]
1340             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1341             try:
1342                 pkts = self.create_stream_out(self.pg1,
1343                                               dst_ip=self.pg0.remote_ip4,
1344                                               use_inside_ports=True)
1345                 self.pg1.add_stream(pkts)
1346                 self.pg_enable_capture(self.pg_interfaces)
1347                 self.pg_start()
1348                 capture = self.pg0.get_capture(len(pkts))
1349                 self.verify_capture_in(capture, self.pg0)
1350
1351                 pkts = self.create_stream_in(self.pg0, self.pg1)
1352                 self.pg0.add_stream(pkts)
1353                 self.pg_enable_capture(self.pg_interfaces)
1354                 self.pg_start()
1355                 capture = self.pg1.get_capture(len(pkts))
1356                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1357                                         same_port=True)
1358             finally:
1359                 self.pg0.remote_hosts[0] = host0
1360
1361         finally:
1362             self.vapi.nat44_forwarding_enable_disable(0)
1363             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364                                                    external_ip=alias_ip,
1365                                                    is_add=0)
1366
1367     def test_static_in(self):
1368         """ 1:1 NAT initialized from inside network """
1369
1370         nat_ip = "10.0.0.10"
1371         self.tcp_port_out = 6303
1372         self.udp_port_out = 6304
1373         self.icmp_id_out = 6305
1374
1375         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1378                                                   is_inside=0)
1379
1380         # in2out
1381         pkts = self.create_stream_in(self.pg0, self.pg1)
1382         self.pg0.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg1.get_capture(len(pkts))
1386         self.verify_capture_out(capture, nat_ip, True)
1387
1388         # out2in
1389         pkts = self.create_stream_out(self.pg1, nat_ip)
1390         self.pg1.add_stream(pkts)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg0.get_capture(len(pkts))
1394         self.verify_capture_in(capture, self.pg0)
1395
1396     def test_static_out(self):
1397         """ 1:1 NAT initialized from outside network """
1398
1399         nat_ip = "10.0.0.20"
1400         self.tcp_port_out = 6303
1401         self.udp_port_out = 6304
1402         self.icmp_id_out = 6305
1403
1404         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1407                                                   is_inside=0)
1408
1409         # out2in
1410         pkts = self.create_stream_out(self.pg1, nat_ip)
1411         self.pg1.add_stream(pkts)
1412         self.pg_enable_capture(self.pg_interfaces)
1413         self.pg_start()
1414         capture = self.pg0.get_capture(len(pkts))
1415         self.verify_capture_in(capture, self.pg0)
1416
1417         # in2out
1418         pkts = self.create_stream_in(self.pg0, self.pg1)
1419         self.pg0.add_stream(pkts)
1420         self.pg_enable_capture(self.pg_interfaces)
1421         self.pg_start()
1422         capture = self.pg1.get_capture(len(pkts))
1423         self.verify_capture_out(capture, nat_ip, True)
1424
1425     def test_static_with_port_in(self):
1426         """ 1:1 NAPT initialized from inside network """
1427
1428         self.tcp_port_out = 3606
1429         self.udp_port_out = 3607
1430         self.icmp_id_out = 3608
1431
1432         self.nat44_add_address(self.nat_addr)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.tcp_port_in, self.tcp_port_out,
1435                                       proto=IP_PROTOS.tcp)
1436         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437                                       self.udp_port_in, self.udp_port_out,
1438                                       proto=IP_PROTOS.udp)
1439         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440                                       self.icmp_id_in, self.icmp_id_out,
1441                                       proto=IP_PROTOS.icmp)
1442         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1444                                                   is_inside=0)
1445
1446         # in2out
1447         pkts = self.create_stream_in(self.pg0, self.pg1)
1448         self.pg0.add_stream(pkts)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg1.get_capture(len(pkts))
1452         self.verify_capture_out(capture)
1453
1454         # out2in
1455         pkts = self.create_stream_out(self.pg1)
1456         self.pg1.add_stream(pkts)
1457         self.pg_enable_capture(self.pg_interfaces)
1458         self.pg_start()
1459         capture = self.pg0.get_capture(len(pkts))
1460         self.verify_capture_in(capture, self.pg0)
1461
1462     def test_static_with_port_out(self):
1463         """ 1:1 NAPT initialized from outside network """
1464
1465         self.tcp_port_out = 30606
1466         self.udp_port_out = 30607
1467         self.icmp_id_out = 30608
1468
1469         self.nat44_add_address(self.nat_addr)
1470         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471                                       self.tcp_port_in, self.tcp_port_out,
1472                                       proto=IP_PROTOS.tcp)
1473         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474                                       self.udp_port_in, self.udp_port_out,
1475                                       proto=IP_PROTOS.udp)
1476         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477                                       self.icmp_id_in, self.icmp_id_out,
1478                                       proto=IP_PROTOS.icmp)
1479         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481                                                   is_inside=0)
1482
1483         # out2in
1484         pkts = self.create_stream_out(self.pg1)
1485         self.pg1.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg0.get_capture(len(pkts))
1489         self.verify_capture_in(capture, self.pg0)
1490
1491         # in2out
1492         pkts = self.create_stream_in(self.pg0, self.pg1)
1493         self.pg0.add_stream(pkts)
1494         self.pg_enable_capture(self.pg_interfaces)
1495         self.pg_start()
1496         capture = self.pg1.get_capture(len(pkts))
1497         self.verify_capture_out(capture)
1498
1499     def test_static_with_port_out2(self):
1500         """ 1:1 NAPT symmetrical rule """
1501
1502         external_port = 80
1503         local_port = 8080
1504
1505         self.vapi.nat44_forwarding_enable_disable(1)
1506         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507                                       local_port, external_port,
1508                                       proto=IP_PROTOS.tcp, out2in_only=1)
1509         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1511                                                   is_inside=0)
1512
1513         # from client to service
1514         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516              TCP(sport=12345, dport=external_port))
1517         self.pg1.add_stream(p)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(1)
1521         p = capture[0]
1522         server = None
1523         try:
1524             ip = p[IP]
1525             tcp = p[TCP]
1526             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527             self.assertEqual(tcp.dport, local_port)
1528             self.check_tcp_checksum(p)
1529             self.check_ip_checksum(p)
1530         except:
1531             self.logger.error(ppp("Unexpected or invalid packet:", p))
1532             raise
1533
1534         # from service back to client
1535         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537              TCP(sport=local_port, dport=12345))
1538         self.pg0.add_stream(p)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         capture = self.pg1.get_capture(1)
1542         p = capture[0]
1543         try:
1544             ip = p[IP]
1545             tcp = p[TCP]
1546             self.assertEqual(ip.src, self.nat_addr)
1547             self.assertEqual(tcp.sport, external_port)
1548             self.check_tcp_checksum(p)
1549             self.check_ip_checksum(p)
1550         except:
1551             self.logger.error(ppp("Unexpected or invalid packet:", p))
1552             raise
1553
1554         # from client to server (no translation)
1555         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557              TCP(sport=12346, dport=local_port))
1558         self.pg1.add_stream(p)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg0.get_capture(1)
1562         p = capture[0]
1563         server = None
1564         try:
1565             ip = p[IP]
1566             tcp = p[TCP]
1567             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568             self.assertEqual(tcp.dport, local_port)
1569             self.check_tcp_checksum(p)
1570             self.check_ip_checksum(p)
1571         except:
1572             self.logger.error(ppp("Unexpected or invalid packet:", p))
1573             raise
1574
1575         # from service back to client (no translation)
1576         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578              TCP(sport=local_port, dport=12346))
1579         self.pg0.add_stream(p)
1580         self.pg_enable_capture(self.pg_interfaces)
1581         self.pg_start()
1582         capture = self.pg1.get_capture(1)
1583         p = capture[0]
1584         try:
1585             ip = p[IP]
1586             tcp = p[TCP]
1587             self.assertEqual(ip.src, self.pg0.remote_ip4)
1588             self.assertEqual(tcp.sport, local_port)
1589             self.check_tcp_checksum(p)
1590             self.check_ip_checksum(p)
1591         except:
1592             self.logger.error(ppp("Unexpected or invalid packet:", p))
1593             raise
1594
1595     def test_static_vrf_aware(self):
1596         """ 1:1 NAT VRF awareness """
1597
1598         nat_ip1 = "10.0.0.30"
1599         nat_ip2 = "10.0.0.40"
1600         self.tcp_port_out = 6303
1601         self.udp_port_out = 6304
1602         self.icmp_id_out = 6305
1603
1604         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1605                                       vrf_id=10)
1606         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1607                                       vrf_id=10)
1608         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1609                                                   is_inside=0)
1610         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1612
1613         # inside interface VRF match NAT44 static mapping VRF
1614         pkts = self.create_stream_in(self.pg4, self.pg3)
1615         self.pg4.add_stream(pkts)
1616         self.pg_enable_capture(self.pg_interfaces)
1617         self.pg_start()
1618         capture = self.pg3.get_capture(len(pkts))
1619         self.verify_capture_out(capture, nat_ip1, True)
1620
1621         # inside interface VRF don't match NAT44 static mapping VRF (packets
1622         # are dropped)
1623         pkts = self.create_stream_in(self.pg0, self.pg3)
1624         self.pg0.add_stream(pkts)
1625         self.pg_enable_capture(self.pg_interfaces)
1626         self.pg_start()
1627         self.pg3.assert_nothing_captured()
1628
1629     def test_identity_nat(self):
1630         """ Identity NAT """
1631
1632         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1635                                                   is_inside=0)
1636
1637         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639              TCP(sport=12345, dport=56789))
1640         self.pg1.add_stream(p)
1641         self.pg_enable_capture(self.pg_interfaces)
1642         self.pg_start()
1643         capture = self.pg0.get_capture(1)
1644         p = capture[0]
1645         try:
1646             ip = p[IP]
1647             tcp = p[TCP]
1648             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649             self.assertEqual(ip.src, self.pg1.remote_ip4)
1650             self.assertEqual(tcp.dport, 56789)
1651             self.assertEqual(tcp.sport, 12345)
1652             self.check_tcp_checksum(p)
1653             self.check_ip_checksum(p)
1654         except:
1655             self.logger.error(ppp("Unexpected or invalid packet:", p))
1656             raise
1657
1658     def test_static_lb(self):
1659         """ NAT44 local service load balancing """
1660         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1661         external_port = 80
1662         local_port = 8080
1663         server1 = self.pg0.remote_hosts[0]
1664         server2 = self.pg0.remote_hosts[1]
1665
1666         locals = [{'addr': server1.ip4n,
1667                    'port': local_port,
1668                    'probability': 70},
1669                   {'addr': server2.ip4n,
1670                    'port': local_port,
1671                    'probability': 30}]
1672
1673         self.nat44_add_address(self.nat_addr)
1674         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1675                                                   external_port,
1676                                                   IP_PROTOS.tcp,
1677                                                   local_num=len(locals),
1678                                                   locals=locals)
1679         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1681                                                   is_inside=0)
1682
1683         # from client to service
1684         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686              TCP(sport=12345, dport=external_port))
1687         self.pg1.add_stream(p)
1688         self.pg_enable_capture(self.pg_interfaces)
1689         self.pg_start()
1690         capture = self.pg0.get_capture(1)
1691         p = capture[0]
1692         server = None
1693         try:
1694             ip = p[IP]
1695             tcp = p[TCP]
1696             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697             if ip.dst == server1.ip4:
1698                 server = server1
1699             else:
1700                 server = server2
1701             self.assertEqual(tcp.dport, local_port)
1702             self.check_tcp_checksum(p)
1703             self.check_ip_checksum(p)
1704         except:
1705             self.logger.error(ppp("Unexpected or invalid packet:", p))
1706             raise
1707
1708         # from service back to client
1709         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711              TCP(sport=local_port, dport=12345))
1712         self.pg0.add_stream(p)
1713         self.pg_enable_capture(self.pg_interfaces)
1714         self.pg_start()
1715         capture = self.pg1.get_capture(1)
1716         p = capture[0]
1717         try:
1718             ip = p[IP]
1719             tcp = p[TCP]
1720             self.assertEqual(ip.src, self.nat_addr)
1721             self.assertEqual(tcp.sport, external_port)
1722             self.check_tcp_checksum(p)
1723             self.check_ip_checksum(p)
1724         except:
1725             self.logger.error(ppp("Unexpected or invalid packet:", p))
1726             raise
1727
1728         # multiple clients
1729         server1_n = 0
1730         server2_n = 0
1731         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1732         pkts = []
1733         for client in clients:
1734             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735                  IP(src=client, dst=self.nat_addr) /
1736                  TCP(sport=12345, dport=external_port))
1737             pkts.append(p)
1738         self.pg1.add_stream(pkts)
1739         self.pg_enable_capture(self.pg_interfaces)
1740         self.pg_start()
1741         capture = self.pg0.get_capture(len(pkts))
1742         for p in capture:
1743             if p[IP].dst == server1.ip4:
1744                 server1_n += 1
1745             else:
1746                 server2_n += 1
1747         self.assertTrue(server1_n > server2_n)
1748
1749     def test_static_lb_2(self):
1750         """ NAT44 local service load balancing (asymmetrical rule) """
1751         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1752         external_port = 80
1753         local_port = 8080
1754         server1 = self.pg0.remote_hosts[0]
1755         server2 = self.pg0.remote_hosts[1]
1756
1757         locals = [{'addr': server1.ip4n,
1758                    'port': local_port,
1759                    'probability': 70},
1760                   {'addr': server2.ip4n,
1761                    'port': local_port,
1762                    'probability': 30}]
1763
1764         self.vapi.nat44_forwarding_enable_disable(1)
1765         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1766                                                   external_port,
1767                                                   IP_PROTOS.tcp,
1768                                                   out2in_only=1,
1769                                                   local_num=len(locals),
1770                                                   locals=locals)
1771         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1773                                                   is_inside=0)
1774
1775         # from client to service
1776         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778              TCP(sport=12345, dport=external_port))
1779         self.pg1.add_stream(p)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782         capture = self.pg0.get_capture(1)
1783         p = capture[0]
1784         server = None
1785         try:
1786             ip = p[IP]
1787             tcp = p[TCP]
1788             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789             if ip.dst == server1.ip4:
1790                 server = server1
1791             else:
1792                 server = server2
1793             self.assertEqual(tcp.dport, local_port)
1794             self.check_tcp_checksum(p)
1795             self.check_ip_checksum(p)
1796         except:
1797             self.logger.error(ppp("Unexpected or invalid packet:", p))
1798             raise
1799
1800         # from service back to client
1801         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803              TCP(sport=local_port, dport=12345))
1804         self.pg0.add_stream(p)
1805         self.pg_enable_capture(self.pg_interfaces)
1806         self.pg_start()
1807         capture = self.pg1.get_capture(1)
1808         p = capture[0]
1809         try:
1810             ip = p[IP]
1811             tcp = p[TCP]
1812             self.assertEqual(ip.src, self.nat_addr)
1813             self.assertEqual(tcp.sport, external_port)
1814             self.check_tcp_checksum(p)
1815             self.check_ip_checksum(p)
1816         except:
1817             self.logger.error(ppp("Unexpected or invalid packet:", p))
1818             raise
1819
1820         # from client to server (no translation)
1821         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823              TCP(sport=12346, dport=local_port))
1824         self.pg1.add_stream(p)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg0.get_capture(1)
1828         p = capture[0]
1829         server = None
1830         try:
1831             ip = p[IP]
1832             tcp = p[TCP]
1833             self.assertEqual(ip.dst, server1.ip4)
1834             self.assertEqual(tcp.dport, local_port)
1835             self.check_tcp_checksum(p)
1836             self.check_ip_checksum(p)
1837         except:
1838             self.logger.error(ppp("Unexpected or invalid packet:", p))
1839             raise
1840
1841         # from service back to client (no translation)
1842         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844              TCP(sport=local_port, dport=12346))
1845         self.pg0.add_stream(p)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg1.get_capture(1)
1849         p = capture[0]
1850         try:
1851             ip = p[IP]
1852             tcp = p[TCP]
1853             self.assertEqual(ip.src, server1.ip4)
1854             self.assertEqual(tcp.sport, local_port)
1855             self.check_tcp_checksum(p)
1856             self.check_ip_checksum(p)
1857         except:
1858             self.logger.error(ppp("Unexpected or invalid packet:", p))
1859             raise
1860
1861     def test_multiple_inside_interfaces(self):
1862         """ NAT44 multiple non-overlapping address space inside interfaces """
1863
1864         self.nat44_add_address(self.nat_addr)
1865         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1868                                                   is_inside=0)
1869
1870         # between two NAT44 inside interfaces (no translation)
1871         pkts = self.create_stream_in(self.pg0, self.pg1)
1872         self.pg0.add_stream(pkts)
1873         self.pg_enable_capture(self.pg_interfaces)
1874         self.pg_start()
1875         capture = self.pg1.get_capture(len(pkts))
1876         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1877
1878         # from NAT44 inside to interface without NAT44 feature (no translation)
1879         pkts = self.create_stream_in(self.pg0, self.pg2)
1880         self.pg0.add_stream(pkts)
1881         self.pg_enable_capture(self.pg_interfaces)
1882         self.pg_start()
1883         capture = self.pg2.get_capture(len(pkts))
1884         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1885
1886         # in2out 1st interface
1887         pkts = self.create_stream_in(self.pg0, self.pg3)
1888         self.pg0.add_stream(pkts)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         capture = self.pg3.get_capture(len(pkts))
1892         self.verify_capture_out(capture)
1893
1894         # out2in 1st interface
1895         pkts = self.create_stream_out(self.pg3)
1896         self.pg3.add_stream(pkts)
1897         self.pg_enable_capture(self.pg_interfaces)
1898         self.pg_start()
1899         capture = self.pg0.get_capture(len(pkts))
1900         self.verify_capture_in(capture, self.pg0)
1901
1902         # in2out 2nd interface
1903         pkts = self.create_stream_in(self.pg1, self.pg3)
1904         self.pg1.add_stream(pkts)
1905         self.pg_enable_capture(self.pg_interfaces)
1906         self.pg_start()
1907         capture = self.pg3.get_capture(len(pkts))
1908         self.verify_capture_out(capture)
1909
1910         # out2in 2nd interface
1911         pkts = self.create_stream_out(self.pg3)
1912         self.pg3.add_stream(pkts)
1913         self.pg_enable_capture(self.pg_interfaces)
1914         self.pg_start()
1915         capture = self.pg1.get_capture(len(pkts))
1916         self.verify_capture_in(capture, self.pg1)
1917
1918     def test_inside_overlapping_interfaces(self):
1919         """ NAT44 multiple inside interfaces with overlapping address space """
1920
1921         static_nat_ip = "10.0.0.10"
1922         self.nat44_add_address(self.nat_addr)
1923         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1924                                                   is_inside=0)
1925         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1929                                       vrf_id=20)
1930
1931         # between NAT44 inside interfaces with same VRF (no translation)
1932         pkts = self.create_stream_in(self.pg4, self.pg5)
1933         self.pg4.add_stream(pkts)
1934         self.pg_enable_capture(self.pg_interfaces)
1935         self.pg_start()
1936         capture = self.pg5.get_capture(len(pkts))
1937         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1938
1939         # between NAT44 inside interfaces with different VRF (hairpinning)
1940         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942              TCP(sport=1234, dport=5678))
1943         self.pg4.add_stream(p)
1944         self.pg_enable_capture(self.pg_interfaces)
1945         self.pg_start()
1946         capture = self.pg6.get_capture(1)
1947         p = capture[0]
1948         try:
1949             ip = p[IP]
1950             tcp = p[TCP]
1951             self.assertEqual(ip.src, self.nat_addr)
1952             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953             self.assertNotEqual(tcp.sport, 1234)
1954             self.assertEqual(tcp.dport, 5678)
1955         except:
1956             self.logger.error(ppp("Unexpected or invalid packet:", p))
1957             raise
1958
1959         # in2out 1st interface
1960         pkts = self.create_stream_in(self.pg4, self.pg3)
1961         self.pg4.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg3.get_capture(len(pkts))
1965         self.verify_capture_out(capture)
1966
1967         # out2in 1st interface
1968         pkts = self.create_stream_out(self.pg3)
1969         self.pg3.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg4.get_capture(len(pkts))
1973         self.verify_capture_in(capture, self.pg4)
1974
1975         # in2out 2nd interface
1976         pkts = self.create_stream_in(self.pg5, self.pg3)
1977         self.pg5.add_stream(pkts)
1978         self.pg_enable_capture(self.pg_interfaces)
1979         self.pg_start()
1980         capture = self.pg3.get_capture(len(pkts))
1981         self.verify_capture_out(capture)
1982
1983         # out2in 2nd interface
1984         pkts = self.create_stream_out(self.pg3)
1985         self.pg3.add_stream(pkts)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg5.get_capture(len(pkts))
1989         self.verify_capture_in(capture, self.pg5)
1990
1991         # pg5 session dump
1992         addresses = self.vapi.nat44_address_dump()
1993         self.assertEqual(len(addresses), 1)
1994         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995         self.assertEqual(len(sessions), 3)
1996         for session in sessions:
1997             self.assertFalse(session.is_static)
1998             self.assertEqual(session.inside_ip_address[0:4],
1999                              self.pg5.remote_ip4n)
2000             self.assertEqual(session.outside_ip_address,
2001                              addresses[0].ip_address)
2002         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2011
2012         # in2out 3rd interface
2013         pkts = self.create_stream_in(self.pg6, self.pg3)
2014         self.pg6.add_stream(pkts)
2015         self.pg_enable_capture(self.pg_interfaces)
2016         self.pg_start()
2017         capture = self.pg3.get_capture(len(pkts))
2018         self.verify_capture_out(capture, static_nat_ip, True)
2019
2020         # out2in 3rd interface
2021         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022         self.pg3.add_stream(pkts)
2023         self.pg_enable_capture(self.pg_interfaces)
2024         self.pg_start()
2025         capture = self.pg6.get_capture(len(pkts))
2026         self.verify_capture_in(capture, self.pg6)
2027
2028         # general user and session dump verifications
2029         users = self.vapi.nat44_user_dump()
2030         self.assertTrue(len(users) >= 3)
2031         addresses = self.vapi.nat44_address_dump()
2032         self.assertEqual(len(addresses), 1)
2033         for user in users:
2034             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2035                                                          user.vrf_id)
2036             for session in sessions:
2037                 self.assertEqual(user.ip_address, session.inside_ip_address)
2038                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039                 self.assertTrue(session.protocol in
2040                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2041                                  IP_PROTOS.icmp])
2042
2043         # pg4 session dump
2044         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045         self.assertTrue(len(sessions) >= 4)
2046         for session in sessions:
2047             self.assertFalse(session.is_static)
2048             self.assertEqual(session.inside_ip_address[0:4],
2049                              self.pg4.remote_ip4n)
2050             self.assertEqual(session.outside_ip_address,
2051                              addresses[0].ip_address)
2052
2053         # pg6 session dump
2054         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055         self.assertTrue(len(sessions) >= 3)
2056         for session in sessions:
2057             self.assertTrue(session.is_static)
2058             self.assertEqual(session.inside_ip_address[0:4],
2059                              self.pg6.remote_ip4n)
2060             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061                              map(int, static_nat_ip.split('.')))
2062             self.assertTrue(session.inside_port in
2063                             [self.tcp_port_in, self.udp_port_in,
2064                              self.icmp_id_in])
2065
2066     def test_hairpinning(self):
2067         """ NAT44 hairpinning - 1:1 NAPT """
2068
2069         host = self.pg0.remote_hosts[0]
2070         server = self.pg0.remote_hosts[1]
2071         host_in_port = 1234
2072         host_out_port = 0
2073         server_in_port = 5678
2074         server_out_port = 8765
2075
2076         self.nat44_add_address(self.nat_addr)
2077         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2079                                                   is_inside=0)
2080         # add static mapping for server
2081         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082                                       server_in_port, server_out_port,
2083                                       proto=IP_PROTOS.tcp)
2084
2085         # send packet from host to server
2086         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087              IP(src=host.ip4, dst=self.nat_addr) /
2088              TCP(sport=host_in_port, dport=server_out_port))
2089         self.pg0.add_stream(p)
2090         self.pg_enable_capture(self.pg_interfaces)
2091         self.pg_start()
2092         capture = self.pg0.get_capture(1)
2093         p = capture[0]
2094         try:
2095             ip = p[IP]
2096             tcp = p[TCP]
2097             self.assertEqual(ip.src, self.nat_addr)
2098             self.assertEqual(ip.dst, server.ip4)
2099             self.assertNotEqual(tcp.sport, host_in_port)
2100             self.assertEqual(tcp.dport, server_in_port)
2101             self.check_tcp_checksum(p)
2102             host_out_port = tcp.sport
2103         except:
2104             self.logger.error(ppp("Unexpected or invalid packet:", p))
2105             raise
2106
2107         # send reply from server to host
2108         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109              IP(src=server.ip4, dst=self.nat_addr) /
2110              TCP(sport=server_in_port, dport=host_out_port))
2111         self.pg0.add_stream(p)
2112         self.pg_enable_capture(self.pg_interfaces)
2113         self.pg_start()
2114         capture = self.pg0.get_capture(1)
2115         p = capture[0]
2116         try:
2117             ip = p[IP]
2118             tcp = p[TCP]
2119             self.assertEqual(ip.src, self.nat_addr)
2120             self.assertEqual(ip.dst, host.ip4)
2121             self.assertEqual(tcp.sport, server_out_port)
2122             self.assertEqual(tcp.dport, host_in_port)
2123             self.check_tcp_checksum(p)
2124         except:
2125             self.logger.error(ppp("Unexpected or invalid packet:", p))
2126             raise
2127
2128     def test_hairpinning2(self):
2129         """ NAT44 hairpinning - 1:1 NAT"""
2130
2131         server1_nat_ip = "10.0.0.10"
2132         server2_nat_ip = "10.0.0.11"
2133         host = self.pg0.remote_hosts[0]
2134         server1 = self.pg0.remote_hosts[1]
2135         server2 = self.pg0.remote_hosts[2]
2136         server_tcp_port = 22
2137         server_udp_port = 20
2138
2139         self.nat44_add_address(self.nat_addr)
2140         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2142                                                   is_inside=0)
2143
2144         # add static mapping for servers
2145         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2147
2148         # host to server1
2149         pkts = []
2150         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151              IP(src=host.ip4, dst=server1_nat_ip) /
2152              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2153         pkts.append(p)
2154         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155              IP(src=host.ip4, dst=server1_nat_ip) /
2156              UDP(sport=self.udp_port_in, dport=server_udp_port))
2157         pkts.append(p)
2158         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159              IP(src=host.ip4, dst=server1_nat_ip) /
2160              ICMP(id=self.icmp_id_in, type='echo-request'))
2161         pkts.append(p)
2162         self.pg0.add_stream(pkts)
2163         self.pg_enable_capture(self.pg_interfaces)
2164         self.pg_start()
2165         capture = self.pg0.get_capture(len(pkts))
2166         for packet in capture:
2167             try:
2168                 self.assertEqual(packet[IP].src, self.nat_addr)
2169                 self.assertEqual(packet[IP].dst, server1.ip4)
2170                 if packet.haslayer(TCP):
2171                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2173                     self.tcp_port_out = packet[TCP].sport
2174                     self.check_tcp_checksum(packet)
2175                 elif packet.haslayer(UDP):
2176                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177                     self.assertEqual(packet[UDP].dport, server_udp_port)
2178                     self.udp_port_out = packet[UDP].sport
2179                 else:
2180                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181                     self.icmp_id_out = packet[ICMP].id
2182             except:
2183                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2184                 raise
2185
2186         # server1 to host
2187         pkts = []
2188         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189              IP(src=server1.ip4, dst=self.nat_addr) /
2190              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2191         pkts.append(p)
2192         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193              IP(src=server1.ip4, dst=self.nat_addr) /
2194              UDP(sport=server_udp_port, dport=self.udp_port_out))
2195         pkts.append(p)
2196         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197              IP(src=server1.ip4, dst=self.nat_addr) /
2198              ICMP(id=self.icmp_id_out, type='echo-reply'))
2199         pkts.append(p)
2200         self.pg0.add_stream(pkts)
2201         self.pg_enable_capture(self.pg_interfaces)
2202         self.pg_start()
2203         capture = self.pg0.get_capture(len(pkts))
2204         for packet in capture:
2205             try:
2206                 self.assertEqual(packet[IP].src, server1_nat_ip)
2207                 self.assertEqual(packet[IP].dst, host.ip4)
2208                 if packet.haslayer(TCP):
2209                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2211                     self.check_tcp_checksum(packet)
2212                 elif packet.haslayer(UDP):
2213                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214                     self.assertEqual(packet[UDP].sport, server_udp_port)
2215                 else:
2216                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2217             except:
2218                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2219                 raise
2220
2221         # server2 to server1
2222         pkts = []
2223         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224              IP(src=server2.ip4, dst=server1_nat_ip) /
2225              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2226         pkts.append(p)
2227         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228              IP(src=server2.ip4, dst=server1_nat_ip) /
2229              UDP(sport=self.udp_port_in, dport=server_udp_port))
2230         pkts.append(p)
2231         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232              IP(src=server2.ip4, dst=server1_nat_ip) /
2233              ICMP(id=self.icmp_id_in, type='echo-request'))
2234         pkts.append(p)
2235         self.pg0.add_stream(pkts)
2236         self.pg_enable_capture(self.pg_interfaces)
2237         self.pg_start()
2238         capture = self.pg0.get_capture(len(pkts))
2239         for packet in capture:
2240             try:
2241                 self.assertEqual(packet[IP].src, server2_nat_ip)
2242                 self.assertEqual(packet[IP].dst, server1.ip4)
2243                 if packet.haslayer(TCP):
2244                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2246                     self.tcp_port_out = packet[TCP].sport
2247                     self.check_tcp_checksum(packet)
2248                 elif packet.haslayer(UDP):
2249                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250                     self.assertEqual(packet[UDP].dport, server_udp_port)
2251                     self.udp_port_out = packet[UDP].sport
2252                 else:
2253                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254                     self.icmp_id_out = packet[ICMP].id
2255             except:
2256                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2257                 raise
2258
2259         # server1 to server2
2260         pkts = []
2261         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262              IP(src=server1.ip4, dst=server2_nat_ip) /
2263              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2264         pkts.append(p)
2265         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266              IP(src=server1.ip4, dst=server2_nat_ip) /
2267              UDP(sport=server_udp_port, dport=self.udp_port_out))
2268         pkts.append(p)
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=server1.ip4, dst=server2_nat_ip) /
2271              ICMP(id=self.icmp_id_out, type='echo-reply'))
2272         pkts.append(p)
2273         self.pg0.add_stream(pkts)
2274         self.pg_enable_capture(self.pg_interfaces)
2275         self.pg_start()
2276         capture = self.pg0.get_capture(len(pkts))
2277         for packet in capture:
2278             try:
2279                 self.assertEqual(packet[IP].src, server1_nat_ip)
2280                 self.assertEqual(packet[IP].dst, server2.ip4)
2281                 if packet.haslayer(TCP):
2282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2284                     self.check_tcp_checksum(packet)
2285                 elif packet.haslayer(UDP):
2286                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287                     self.assertEqual(packet[UDP].sport, server_udp_port)
2288                 else:
2289                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2290             except:
2291                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2292                 raise
2293
2294     def test_max_translations_per_user(self):
2295         """ MAX translations per user - recycle the least recently used """
2296
2297         self.nat44_add_address(self.nat_addr)
2298         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2300                                                   is_inside=0)
2301
2302         # get maximum number of translations per user
2303         nat44_config = self.vapi.nat_show_config()
2304
2305         # send more than maximum number of translations per user packets
2306         pkts_num = nat44_config.max_translations_per_user + 5
2307         pkts = []
2308         for port in range(0, pkts_num):
2309             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311                  TCP(sport=1025 + port))
2312             pkts.append(p)
2313         self.pg0.add_stream(pkts)
2314         self.pg_enable_capture(self.pg_interfaces)
2315         self.pg_start()
2316
2317         # verify number of translated packet
2318         self.pg1.get_capture(pkts_num)
2319
2320     def test_interface_addr(self):
2321         """ Acquire NAT44 addresses from interface """
2322         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2323
2324         # no address in NAT pool
2325         adresses = self.vapi.nat44_address_dump()
2326         self.assertEqual(0, len(adresses))
2327
2328         # configure interface address and check NAT address pool
2329         self.pg7.config_ip4()
2330         adresses = self.vapi.nat44_address_dump()
2331         self.assertEqual(1, len(adresses))
2332         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2333
2334         # remove interface address and check NAT address pool
2335         self.pg7.unconfig_ip4()
2336         adresses = self.vapi.nat44_address_dump()
2337         self.assertEqual(0, len(adresses))
2338
2339     def test_interface_addr_static_mapping(self):
2340         """ Static mapping with addresses from interface """
2341         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342         self.nat44_add_static_mapping(
2343             '1.2.3.4',
2344             external_sw_if_index=self.pg7.sw_if_index)
2345
2346         # static mappings with external interface
2347         static_mappings = self.vapi.nat44_static_mapping_dump()
2348         self.assertEqual(1, len(static_mappings))
2349         self.assertEqual(self.pg7.sw_if_index,
2350                          static_mappings[0].external_sw_if_index)
2351
2352         # configure interface address and check static mappings
2353         self.pg7.config_ip4()
2354         static_mappings = self.vapi.nat44_static_mapping_dump()
2355         self.assertEqual(1, len(static_mappings))
2356         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357                          self.pg7.local_ip4n)
2358         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2359
2360         # remove interface address and check static mappings
2361         self.pg7.unconfig_ip4()
2362         static_mappings = self.vapi.nat44_static_mapping_dump()
2363         self.assertEqual(0, len(static_mappings))
2364
2365     def test_interface_addr_identity_nat(self):
2366         """ Identity NAT with addresses from interface """
2367
2368         port = 53053
2369         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370         self.vapi.nat44_add_del_identity_mapping(
2371             sw_if_index=self.pg7.sw_if_index,
2372             port=port,
2373             protocol=IP_PROTOS.tcp,
2374             addr_only=0)
2375
2376         # identity mappings with external interface
2377         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378         self.assertEqual(1, len(identity_mappings))
2379         self.assertEqual(self.pg7.sw_if_index,
2380                          identity_mappings[0].sw_if_index)
2381
2382         # configure interface address and check identity mappings
2383         self.pg7.config_ip4()
2384         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385         self.assertEqual(1, len(identity_mappings))
2386         self.assertEqual(identity_mappings[0].ip_address,
2387                          self.pg7.local_ip4n)
2388         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389         self.assertEqual(port, identity_mappings[0].port)
2390         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2391
2392         # remove interface address and check identity mappings
2393         self.pg7.unconfig_ip4()
2394         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395         self.assertEqual(0, len(identity_mappings))
2396
2397     def test_ipfix_nat44_sess(self):
2398         """ IPFIX logging NAT44 session created/delted """
2399         self.ipfix_domain_id = 10
2400         self.ipfix_src_port = 20202
2401         colector_port = 30303
2402         bind_layers(UDP, IPFIX, dport=30303)
2403         self.nat44_add_address(self.nat_addr)
2404         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2406                                                   is_inside=0)
2407         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408                                      src_address=self.pg3.local_ip4n,
2409                                      path_mtu=512,
2410                                      template_interval=10,
2411                                      collector_port=colector_port)
2412         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413                             src_port=self.ipfix_src_port)
2414
2415         pkts = self.create_stream_in(self.pg0, self.pg1)
2416         self.pg0.add_stream(pkts)
2417         self.pg_enable_capture(self.pg_interfaces)
2418         self.pg_start()
2419         capture = self.pg1.get_capture(len(pkts))
2420         self.verify_capture_out(capture)
2421         self.nat44_add_address(self.nat_addr, is_add=0)
2422         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2423         capture = self.pg3.get_capture(9)
2424         ipfix = IPFIXDecoder()
2425         # first load template
2426         for p in capture:
2427             self.assertTrue(p.haslayer(IPFIX))
2428             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431             self.assertEqual(p[UDP].dport, colector_port)
2432             self.assertEqual(p[IPFIX].observationDomainID,
2433                              self.ipfix_domain_id)
2434             if p.haslayer(Template):
2435                 ipfix.add_template(p.getlayer(Template))
2436         # verify events in data set
2437         for p in capture:
2438             if p.haslayer(Data):
2439                 data = ipfix.decode_data_set(p.getlayer(Set))
2440                 self.verify_ipfix_nat44_ses(data)
2441
2442     def test_ipfix_addr_exhausted(self):
2443         """ IPFIX logging NAT addresses exhausted """
2444         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2446                                                   is_inside=0)
2447         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448                                      src_address=self.pg3.local_ip4n,
2449                                      path_mtu=512,
2450                                      template_interval=10)
2451         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452                             src_port=self.ipfix_src_port)
2453
2454         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2456              TCP(sport=3025))
2457         self.pg0.add_stream(p)
2458         self.pg_enable_capture(self.pg_interfaces)
2459         self.pg_start()
2460         capture = self.pg1.get_capture(0)
2461         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2462         capture = self.pg3.get_capture(9)
2463         ipfix = IPFIXDecoder()
2464         # first load template
2465         for p in capture:
2466             self.assertTrue(p.haslayer(IPFIX))
2467             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470             self.assertEqual(p[UDP].dport, 4739)
2471             self.assertEqual(p[IPFIX].observationDomainID,
2472                              self.ipfix_domain_id)
2473             if p.haslayer(Template):
2474                 ipfix.add_template(p.getlayer(Template))
2475         # verify events in data set
2476         for p in capture:
2477             if p.haslayer(Data):
2478                 data = ipfix.decode_data_set(p.getlayer(Set))
2479                 self.verify_ipfix_addr_exhausted(data)
2480
2481     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482     def test_ipfix_max_sessions(self):
2483         """ IPFIX logging maximum session entries exceeded """
2484         self.nat44_add_address(self.nat_addr)
2485         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2487                                                   is_inside=0)
2488
2489         nat44_config = self.vapi.nat_show_config()
2490         max_sessions = 10 * nat44_config.translation_buckets
2491
2492         pkts = []
2493         for i in range(0, max_sessions):
2494             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496                  IP(src=src, dst=self.pg1.remote_ip4) /
2497                  TCP(sport=1025))
2498             pkts.append(p)
2499         self.pg0.add_stream(pkts)
2500         self.pg_enable_capture(self.pg_interfaces)
2501         self.pg_start()
2502
2503         self.pg1.get_capture(max_sessions)
2504         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505                                      src_address=self.pg3.local_ip4n,
2506                                      path_mtu=512,
2507                                      template_interval=10)
2508         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509                             src_port=self.ipfix_src_port)
2510
2511         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2513              TCP(sport=1025))
2514         self.pg0.add_stream(p)
2515         self.pg_enable_capture(self.pg_interfaces)
2516         self.pg_start()
2517         self.pg1.get_capture(0)
2518         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2519         capture = self.pg3.get_capture(9)
2520         ipfix = IPFIXDecoder()
2521         # first load template
2522         for p in capture:
2523             self.assertTrue(p.haslayer(IPFIX))
2524             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527             self.assertEqual(p[UDP].dport, 4739)
2528             self.assertEqual(p[IPFIX].observationDomainID,
2529                              self.ipfix_domain_id)
2530             if p.haslayer(Template):
2531                 ipfix.add_template(p.getlayer(Template))
2532         # verify events in data set
2533         for p in capture:
2534             if p.haslayer(Data):
2535                 data = ipfix.decode_data_set(p.getlayer(Set))
2536                 self.verify_ipfix_max_sessions(data, max_sessions)
2537
2538     def test_pool_addr_fib(self):
2539         """ NAT44 add pool addresses to FIB """
2540         static_addr = '10.0.0.10'
2541         self.nat44_add_address(self.nat_addr)
2542         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2544                                                   is_inside=0)
2545         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2546
2547         # NAT44 address
2548         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549              ARP(op=ARP.who_has, pdst=self.nat_addr,
2550                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551         self.pg1.add_stream(p)
2552         self.pg_enable_capture(self.pg_interfaces)
2553         self.pg_start()
2554         capture = self.pg1.get_capture(1)
2555         self.assertTrue(capture[0].haslayer(ARP))
2556         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2557
2558         # 1:1 NAT address
2559         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560              ARP(op=ARP.who_has, pdst=static_addr,
2561                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562         self.pg1.add_stream(p)
2563         self.pg_enable_capture(self.pg_interfaces)
2564         self.pg_start()
2565         capture = self.pg1.get_capture(1)
2566         self.assertTrue(capture[0].haslayer(ARP))
2567         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2568
2569         # send ARP to non-NAT44 interface
2570         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571              ARP(op=ARP.who_has, pdst=self.nat_addr,
2572                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573         self.pg2.add_stream(p)
2574         self.pg_enable_capture(self.pg_interfaces)
2575         self.pg_start()
2576         capture = self.pg1.get_capture(0)
2577
2578         # remove addresses and verify
2579         self.nat44_add_address(self.nat_addr, is_add=0)
2580         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2581                                       is_add=0)
2582
2583         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584              ARP(op=ARP.who_has, pdst=self.nat_addr,
2585                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586         self.pg1.add_stream(p)
2587         self.pg_enable_capture(self.pg_interfaces)
2588         self.pg_start()
2589         capture = self.pg1.get_capture(0)
2590
2591         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592              ARP(op=ARP.who_has, pdst=static_addr,
2593                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594         self.pg1.add_stream(p)
2595         self.pg_enable_capture(self.pg_interfaces)
2596         self.pg_start()
2597         capture = self.pg1.get_capture(0)
2598
2599     def test_vrf_mode(self):
2600         """ NAT44 tenant VRF aware address pool mode """
2601
2602         vrf_id1 = 1
2603         vrf_id2 = 2
2604         nat_ip1 = "10.0.0.10"
2605         nat_ip2 = "10.0.0.11"
2606
2607         self.pg0.unconfig_ip4()
2608         self.pg1.unconfig_ip4()
2609         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611         self.pg0.set_table_ip4(vrf_id1)
2612         self.pg1.set_table_ip4(vrf_id2)
2613         self.pg0.config_ip4()
2614         self.pg1.config_ip4()
2615
2616         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2621                                                   is_inside=0)
2622
2623         # first VRF
2624         pkts = self.create_stream_in(self.pg0, self.pg2)
2625         self.pg0.add_stream(pkts)
2626         self.pg_enable_capture(self.pg_interfaces)
2627         self.pg_start()
2628         capture = self.pg2.get_capture(len(pkts))
2629         self.verify_capture_out(capture, nat_ip1)
2630
2631         # second VRF
2632         pkts = self.create_stream_in(self.pg1, self.pg2)
2633         self.pg1.add_stream(pkts)
2634         self.pg_enable_capture(self.pg_interfaces)
2635         self.pg_start()
2636         capture = self.pg2.get_capture(len(pkts))
2637         self.verify_capture_out(capture, nat_ip2)
2638
2639         self.pg0.unconfig_ip4()
2640         self.pg1.unconfig_ip4()
2641         self.pg0.set_table_ip4(0)
2642         self.pg1.set_table_ip4(0)
2643         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2645
2646     def test_vrf_feature_independent(self):
2647         """ NAT44 tenant VRF independent address pool mode """
2648
2649         nat_ip1 = "10.0.0.10"
2650         nat_ip2 = "10.0.0.11"
2651
2652         self.nat44_add_address(nat_ip1)
2653         self.nat44_add_address(nat_ip2, vrf_id=99)
2654         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2657                                                   is_inside=0)
2658
2659         # first VRF
2660         pkts = self.create_stream_in(self.pg0, self.pg2)
2661         self.pg0.add_stream(pkts)
2662         self.pg_enable_capture(self.pg_interfaces)
2663         self.pg_start()
2664         capture = self.pg2.get_capture(len(pkts))
2665         self.verify_capture_out(capture, nat_ip1)
2666
2667         # second VRF
2668         pkts = self.create_stream_in(self.pg1, self.pg2)
2669         self.pg1.add_stream(pkts)
2670         self.pg_enable_capture(self.pg_interfaces)
2671         self.pg_start()
2672         capture = self.pg2.get_capture(len(pkts))
2673         self.verify_capture_out(capture, nat_ip1)
2674
2675     def test_dynamic_ipless_interfaces(self):
2676         """ NAT44 interfaces without configured IP address """
2677
2678         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679                                       mactobinary(self.pg7.remote_mac),
2680                                       self.pg7.remote_ip4n,
2681                                       is_static=1)
2682         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683                                       mactobinary(self.pg8.remote_mac),
2684                                       self.pg8.remote_ip4n,
2685                                       is_static=1)
2686
2687         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688                                    dst_address_length=32,
2689                                    next_hop_address=self.pg7.remote_ip4n,
2690                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2691         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692                                    dst_address_length=32,
2693                                    next_hop_address=self.pg8.remote_ip4n,
2694                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2695
2696         self.nat44_add_address(self.nat_addr)
2697         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2699                                                   is_inside=0)
2700
2701         # in2out
2702         pkts = self.create_stream_in(self.pg7, self.pg8)
2703         self.pg7.add_stream(pkts)
2704         self.pg_enable_capture(self.pg_interfaces)
2705         self.pg_start()
2706         capture = self.pg8.get_capture(len(pkts))
2707         self.verify_capture_out(capture)
2708
2709         # out2in
2710         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711         self.pg8.add_stream(pkts)
2712         self.pg_enable_capture(self.pg_interfaces)
2713         self.pg_start()
2714         capture = self.pg7.get_capture(len(pkts))
2715         self.verify_capture_in(capture, self.pg7)
2716
2717     def test_static_ipless_interfaces(self):
2718         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2719
2720         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721                                       mactobinary(self.pg7.remote_mac),
2722                                       self.pg7.remote_ip4n,
2723                                       is_static=1)
2724         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725                                       mactobinary(self.pg8.remote_mac),
2726                                       self.pg8.remote_ip4n,
2727                                       is_static=1)
2728
2729         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730                                    dst_address_length=32,
2731                                    next_hop_address=self.pg7.remote_ip4n,
2732                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2733         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734                                    dst_address_length=32,
2735                                    next_hop_address=self.pg8.remote_ip4n,
2736                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2737
2738         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2741                                                   is_inside=0)
2742
2743         # out2in
2744         pkts = self.create_stream_out(self.pg8)
2745         self.pg8.add_stream(pkts)
2746         self.pg_enable_capture(self.pg_interfaces)
2747         self.pg_start()
2748         capture = self.pg7.get_capture(len(pkts))
2749         self.verify_capture_in(capture, self.pg7)
2750
2751         # in2out
2752         pkts = self.create_stream_in(self.pg7, self.pg8)
2753         self.pg7.add_stream(pkts)
2754         self.pg_enable_capture(self.pg_interfaces)
2755         self.pg_start()
2756         capture = self.pg8.get_capture(len(pkts))
2757         self.verify_capture_out(capture, self.nat_addr, True)
2758
2759     def test_static_with_port_ipless_interfaces(self):
2760         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2761
2762         self.tcp_port_out = 30606
2763         self.udp_port_out = 30607
2764         self.icmp_id_out = 30608
2765
2766         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767                                       mactobinary(self.pg7.remote_mac),
2768                                       self.pg7.remote_ip4n,
2769                                       is_static=1)
2770         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771                                       mactobinary(self.pg8.remote_mac),
2772                                       self.pg8.remote_ip4n,
2773                                       is_static=1)
2774
2775         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776                                    dst_address_length=32,
2777                                    next_hop_address=self.pg7.remote_ip4n,
2778                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2779         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780                                    dst_address_length=32,
2781                                    next_hop_address=self.pg8.remote_ip4n,
2782                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2783
2784         self.nat44_add_address(self.nat_addr)
2785         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786                                       self.tcp_port_in, self.tcp_port_out,
2787                                       proto=IP_PROTOS.tcp)
2788         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789                                       self.udp_port_in, self.udp_port_out,
2790                                       proto=IP_PROTOS.udp)
2791         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792                                       self.icmp_id_in, self.icmp_id_out,
2793                                       proto=IP_PROTOS.icmp)
2794         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2796                                                   is_inside=0)
2797
2798         # out2in
2799         pkts = self.create_stream_out(self.pg8)
2800         self.pg8.add_stream(pkts)
2801         self.pg_enable_capture(self.pg_interfaces)
2802         self.pg_start()
2803         capture = self.pg7.get_capture(len(pkts))
2804         self.verify_capture_in(capture, self.pg7)
2805
2806         # in2out
2807         pkts = self.create_stream_in(self.pg7, self.pg8)
2808         self.pg7.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         capture = self.pg8.get_capture(len(pkts))
2812         self.verify_capture_out(capture)
2813
2814     def test_static_unknown_proto(self):
2815         """ 1:1 NAT translate packet with unknown protocol """
2816         nat_ip = "10.0.0.10"
2817         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2820                                                   is_inside=0)
2821
2822         # in2out
2823         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2825              GRE() /
2826              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827              TCP(sport=1234, dport=1234))
2828         self.pg0.add_stream(p)
2829         self.pg_enable_capture(self.pg_interfaces)
2830         self.pg_start()
2831         p = self.pg1.get_capture(1)
2832         packet = p[0]
2833         try:
2834             self.assertEqual(packet[IP].src, nat_ip)
2835             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836             self.assertTrue(packet.haslayer(GRE))
2837             self.check_ip_checksum(packet)
2838         except:
2839             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2840             raise
2841
2842         # out2in
2843         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2845              GRE() /
2846              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847              TCP(sport=1234, dport=1234))
2848         self.pg1.add_stream(p)
2849         self.pg_enable_capture(self.pg_interfaces)
2850         self.pg_start()
2851         p = self.pg0.get_capture(1)
2852         packet = p[0]
2853         try:
2854             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856             self.assertTrue(packet.haslayer(GRE))
2857             self.check_ip_checksum(packet)
2858         except:
2859             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2860             raise
2861
2862     def test_hairpinning_static_unknown_proto(self):
2863         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2864
2865         host = self.pg0.remote_hosts[0]
2866         server = self.pg0.remote_hosts[1]
2867
2868         host_nat_ip = "10.0.0.10"
2869         server_nat_ip = "10.0.0.11"
2870
2871         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2875                                                   is_inside=0)
2876
2877         # host to server
2878         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879              IP(src=host.ip4, dst=server_nat_ip) /
2880              GRE() /
2881              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882              TCP(sport=1234, dport=1234))
2883         self.pg0.add_stream(p)
2884         self.pg_enable_capture(self.pg_interfaces)
2885         self.pg_start()
2886         p = self.pg0.get_capture(1)
2887         packet = p[0]
2888         try:
2889             self.assertEqual(packet[IP].src, host_nat_ip)
2890             self.assertEqual(packet[IP].dst, server.ip4)
2891             self.assertTrue(packet.haslayer(GRE))
2892             self.check_ip_checksum(packet)
2893         except:
2894             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2895             raise
2896
2897         # server to host
2898         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899              IP(src=server.ip4, dst=host_nat_ip) /
2900              GRE() /
2901              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902              TCP(sport=1234, dport=1234))
2903         self.pg0.add_stream(p)
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906         p = self.pg0.get_capture(1)
2907         packet = p[0]
2908         try:
2909             self.assertEqual(packet[IP].src, server_nat_ip)
2910             self.assertEqual(packet[IP].dst, host.ip4)
2911             self.assertTrue(packet.haslayer(GRE))
2912             self.check_ip_checksum(packet)
2913         except:
2914             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2915             raise
2916
2917     def test_unknown_proto(self):
2918         """ NAT44 translate packet with unknown protocol """
2919         self.nat44_add_address(self.nat_addr)
2920         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2922                                                   is_inside=0)
2923
2924         # in2out
2925         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927              TCP(sport=self.tcp_port_in, dport=20))
2928         self.pg0.add_stream(p)
2929         self.pg_enable_capture(self.pg_interfaces)
2930         self.pg_start()
2931         p = self.pg1.get_capture(1)
2932
2933         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2935              GRE() /
2936              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937              TCP(sport=1234, dport=1234))
2938         self.pg0.add_stream(p)
2939         self.pg_enable_capture(self.pg_interfaces)
2940         self.pg_start()
2941         p = self.pg1.get_capture(1)
2942         packet = p[0]
2943         try:
2944             self.assertEqual(packet[IP].src, self.nat_addr)
2945             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946             self.assertTrue(packet.haslayer(GRE))
2947             self.check_ip_checksum(packet)
2948         except:
2949             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2950             raise
2951
2952         # out2in
2953         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2955              GRE() /
2956              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957              TCP(sport=1234, dport=1234))
2958         self.pg1.add_stream(p)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         p = self.pg0.get_capture(1)
2962         packet = p[0]
2963         try:
2964             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966             self.assertTrue(packet.haslayer(GRE))
2967             self.check_ip_checksum(packet)
2968         except:
2969             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970             raise
2971
2972     def test_hairpinning_unknown_proto(self):
2973         """ NAT44 translate packet with unknown protocol - hairpinning """
2974         host = self.pg0.remote_hosts[0]
2975         server = self.pg0.remote_hosts[1]
2976         host_in_port = 1234
2977         host_out_port = 0
2978         server_in_port = 5678
2979         server_out_port = 8765
2980         server_nat_ip = "10.0.0.11"
2981
2982         self.nat44_add_address(self.nat_addr)
2983         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2985                                                   is_inside=0)
2986
2987         # add static mapping for server
2988         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2989
2990         # host to server
2991         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992              IP(src=host.ip4, dst=server_nat_ip) /
2993              TCP(sport=host_in_port, dport=server_out_port))
2994         self.pg0.add_stream(p)
2995         self.pg_enable_capture(self.pg_interfaces)
2996         self.pg_start()
2997         capture = self.pg0.get_capture(1)
2998
2999         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000              IP(src=host.ip4, dst=server_nat_ip) /
3001              GRE() /
3002              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003              TCP(sport=1234, dport=1234))
3004         self.pg0.add_stream(p)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007         p = self.pg0.get_capture(1)
3008         packet = p[0]
3009         try:
3010             self.assertEqual(packet[IP].src, self.nat_addr)
3011             self.assertEqual(packet[IP].dst, server.ip4)
3012             self.assertTrue(packet.haslayer(GRE))
3013             self.check_ip_checksum(packet)
3014         except:
3015             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3016             raise
3017
3018         # server to host
3019         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020              IP(src=server.ip4, dst=self.nat_addr) /
3021              GRE() /
3022              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023              TCP(sport=1234, dport=1234))
3024         self.pg0.add_stream(p)
3025         self.pg_enable_capture(self.pg_interfaces)
3026         self.pg_start()
3027         p = self.pg0.get_capture(1)
3028         packet = p[0]
3029         try:
3030             self.assertEqual(packet[IP].src, server_nat_ip)
3031             self.assertEqual(packet[IP].dst, host.ip4)
3032             self.assertTrue(packet.haslayer(GRE))
3033             self.check_ip_checksum(packet)
3034         except:
3035             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3036             raise
3037
3038     def test_output_feature(self):
3039         """ NAT44 interface output feature (in2out postrouting) """
3040         self.nat44_add_address(self.nat_addr)
3041         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3044                                                          is_inside=0)
3045
3046         # in2out
3047         pkts = self.create_stream_in(self.pg0, self.pg3)
3048         self.pg0.add_stream(pkts)
3049         self.pg_enable_capture(self.pg_interfaces)
3050         self.pg_start()
3051         capture = self.pg3.get_capture(len(pkts))
3052         self.verify_capture_out(capture)
3053
3054         # out2in
3055         pkts = self.create_stream_out(self.pg3)
3056         self.pg3.add_stream(pkts)
3057         self.pg_enable_capture(self.pg_interfaces)
3058         self.pg_start()
3059         capture = self.pg0.get_capture(len(pkts))
3060         self.verify_capture_in(capture, self.pg0)
3061
3062         # from non-NAT interface to NAT inside interface
3063         pkts = self.create_stream_in(self.pg2, self.pg0)
3064         self.pg2.add_stream(pkts)
3065         self.pg_enable_capture(self.pg_interfaces)
3066         self.pg_start()
3067         capture = self.pg0.get_capture(len(pkts))
3068         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3069
3070     def test_output_feature_vrf_aware(self):
3071         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072         nat_ip_vrf10 = "10.0.0.10"
3073         nat_ip_vrf20 = "10.0.0.20"
3074
3075         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076                                    dst_address_length=32,
3077                                    next_hop_address=self.pg3.remote_ip4n,
3078                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3079                                    table_id=10)
3080         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081                                    dst_address_length=32,
3082                                    next_hop_address=self.pg3.remote_ip4n,
3083                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3084                                    table_id=20)
3085
3086         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3091                                                          is_inside=0)
3092
3093         # in2out VRF 10
3094         pkts = self.create_stream_in(self.pg4, self.pg3)
3095         self.pg4.add_stream(pkts)
3096         self.pg_enable_capture(self.pg_interfaces)
3097         self.pg_start()
3098         capture = self.pg3.get_capture(len(pkts))
3099         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3100
3101         # out2in VRF 10
3102         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103         self.pg3.add_stream(pkts)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg4.get_capture(len(pkts))
3107         self.verify_capture_in(capture, self.pg4)
3108
3109         # in2out VRF 20
3110         pkts = self.create_stream_in(self.pg6, self.pg3)
3111         self.pg6.add_stream(pkts)
3112         self.pg_enable_capture(self.pg_interfaces)
3113         self.pg_start()
3114         capture = self.pg3.get_capture(len(pkts))
3115         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3116
3117         # out2in VRF 20
3118         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119         self.pg3.add_stream(pkts)
3120         self.pg_enable_capture(self.pg_interfaces)
3121         self.pg_start()
3122         capture = self.pg6.get_capture(len(pkts))
3123         self.verify_capture_in(capture, self.pg6)
3124
3125     def test_output_feature_hairpinning(self):
3126         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127         host = self.pg0.remote_hosts[0]
3128         server = self.pg0.remote_hosts[1]
3129         host_in_port = 1234
3130         host_out_port = 0
3131         server_in_port = 5678
3132         server_out_port = 8765
3133
3134         self.nat44_add_address(self.nat_addr)
3135         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3137                                                          is_inside=0)
3138
3139         # add static mapping for server
3140         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141                                       server_in_port, server_out_port,
3142                                       proto=IP_PROTOS.tcp)
3143
3144         # send packet from host to server
3145         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146              IP(src=host.ip4, dst=self.nat_addr) /
3147              TCP(sport=host_in_port, dport=server_out_port))
3148         self.pg0.add_stream(p)
3149         self.pg_enable_capture(self.pg_interfaces)
3150         self.pg_start()
3151         capture = self.pg0.get_capture(1)
3152         p = capture[0]
3153         try:
3154             ip = p[IP]
3155             tcp = p[TCP]
3156             self.assertEqual(ip.src, self.nat_addr)
3157             self.assertEqual(ip.dst, server.ip4)
3158             self.assertNotEqual(tcp.sport, host_in_port)
3159             self.assertEqual(tcp.dport, server_in_port)
3160             self.check_tcp_checksum(p)
3161             host_out_port = tcp.sport
3162         except:
3163             self.logger.error(ppp("Unexpected or invalid packet:", p))
3164             raise
3165
3166         # send reply from server to host
3167         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168              IP(src=server.ip4, dst=self.nat_addr) /
3169              TCP(sport=server_in_port, dport=host_out_port))
3170         self.pg0.add_stream(p)
3171         self.pg_enable_capture(self.pg_interfaces)
3172         self.pg_start()
3173         capture = self.pg0.get_capture(1)
3174         p = capture[0]
3175         try:
3176             ip = p[IP]
3177             tcp = p[TCP]
3178             self.assertEqual(ip.src, self.nat_addr)
3179             self.assertEqual(ip.dst, host.ip4)
3180             self.assertEqual(tcp.sport, server_out_port)
3181             self.assertEqual(tcp.dport, host_in_port)
3182             self.check_tcp_checksum(p)
3183         except:
3184             self.logger.error(ppp("Unexpected or invalid packet:", p))
3185             raise
3186
3187     def test_one_armed_nat44(self):
3188         """ One armed NAT44 """
3189         remote_host = self.pg9.remote_hosts[0]
3190         local_host = self.pg9.remote_hosts[1]
3191         external_port = 0
3192
3193         self.nat44_add_address(self.nat_addr)
3194         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3196                                                   is_inside=0)
3197
3198         # in2out
3199         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200              IP(src=local_host.ip4, dst=remote_host.ip4) /
3201              TCP(sport=12345, dport=80))
3202         self.pg9.add_stream(p)
3203         self.pg_enable_capture(self.pg_interfaces)
3204         self.pg_start()
3205         capture = self.pg9.get_capture(1)
3206         p = capture[0]
3207         try:
3208             ip = p[IP]
3209             tcp = p[TCP]
3210             self.assertEqual(ip.src, self.nat_addr)
3211             self.assertEqual(ip.dst, remote_host.ip4)
3212             self.assertNotEqual(tcp.sport, 12345)
3213             external_port = tcp.sport
3214             self.assertEqual(tcp.dport, 80)
3215             self.check_tcp_checksum(p)
3216             self.check_ip_checksum(p)
3217         except:
3218             self.logger.error(ppp("Unexpected or invalid packet:", p))
3219             raise
3220
3221         # out2in
3222         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223              IP(src=remote_host.ip4, dst=self.nat_addr) /
3224              TCP(sport=80, dport=external_port))
3225         self.pg9.add_stream(p)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         capture = self.pg9.get_capture(1)
3229         p = capture[0]
3230         try:
3231             ip = p[IP]
3232             tcp = p[TCP]
3233             self.assertEqual(ip.src, remote_host.ip4)
3234             self.assertEqual(ip.dst, local_host.ip4)
3235             self.assertEqual(tcp.sport, 80)
3236             self.assertEqual(tcp.dport, 12345)
3237             self.check_tcp_checksum(p)
3238             self.check_ip_checksum(p)
3239         except:
3240             self.logger.error(ppp("Unexpected or invalid packet:", p))
3241             raise
3242
3243     def test_one_armed_nat44_static(self):
3244         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3245         remote_host = self.pg9.remote_hosts[0]
3246         local_host = self.pg9.remote_hosts[1]
3247         external_port = 80
3248         local_port = 8080
3249         eh_port_in = 0
3250
3251         self.vapi.nat44_forwarding_enable_disable(1)
3252         self.nat44_add_address(self.nat_addr, twice_nat=1)
3253         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3254                                       local_port, external_port,
3255                                       proto=IP_PROTOS.tcp, out2in_only=1,
3256                                       twice_nat=1)
3257         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3258         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3259                                                   is_inside=0)
3260
3261         # from client to service
3262         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3263              IP(src=remote_host.ip4, dst=self.nat_addr) /
3264              TCP(sport=12345, dport=external_port))
3265         self.pg9.add_stream(p)
3266         self.pg_enable_capture(self.pg_interfaces)
3267         self.pg_start()
3268         capture = self.pg9.get_capture(1)
3269         p = capture[0]
3270         server = None
3271         try:
3272             ip = p[IP]
3273             tcp = p[TCP]
3274             self.assertEqual(ip.dst, local_host.ip4)
3275             self.assertEqual(ip.src, self.nat_addr)
3276             self.assertEqual(tcp.dport, local_port)
3277             self.assertNotEqual(tcp.sport, 12345)
3278             eh_port_in = tcp.sport
3279             self.check_tcp_checksum(p)
3280             self.check_ip_checksum(p)
3281         except:
3282             self.logger.error(ppp("Unexpected or invalid packet:", p))
3283             raise
3284
3285         # from service back to client
3286         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3287              IP(src=local_host.ip4, dst=self.nat_addr) /
3288              TCP(sport=local_port, dport=eh_port_in))
3289         self.pg9.add_stream(p)
3290         self.pg_enable_capture(self.pg_interfaces)
3291         self.pg_start()
3292         capture = self.pg9.get_capture(1)
3293         p = capture[0]
3294         try:
3295             ip = p[IP]
3296             tcp = p[TCP]
3297             self.assertEqual(ip.src, self.nat_addr)
3298             self.assertEqual(ip.dst, remote_host.ip4)
3299             self.assertEqual(tcp.sport, external_port)
3300             self.assertEqual(tcp.dport, 12345)
3301             self.check_tcp_checksum(p)
3302             self.check_ip_checksum(p)
3303         except:
3304             self.logger.error(ppp("Unexpected or invalid packet:", p))
3305             raise
3306
3307     def test_del_session(self):
3308         """ Delete NAT44 session """
3309         self.nat44_add_address(self.nat_addr)
3310         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3312                                                   is_inside=0)
3313
3314         pkts = self.create_stream_in(self.pg0, self.pg1)
3315         self.pg0.add_stream(pkts)
3316         self.pg_enable_capture(self.pg_interfaces)
3317         self.pg_start()
3318         capture = self.pg1.get_capture(len(pkts))
3319
3320         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3321         nsessions = len(sessions)
3322
3323         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3324                                     sessions[0].inside_port,
3325                                     sessions[0].protocol)
3326         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3327                                     sessions[1].outside_port,
3328                                     sessions[1].protocol,
3329                                     is_in=0)
3330
3331         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3332         self.assertEqual(nsessions - len(sessions), 2)
3333
3334     def test_set_get_reass(self):
3335         """ NAT44 set/get virtual fragmentation reassembly """
3336         reas_cfg1 = self.vapi.nat_get_reass()
3337
3338         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3339                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3340                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3341
3342         reas_cfg2 = self.vapi.nat_get_reass()
3343
3344         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3345         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3346         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3347
3348         self.vapi.nat_set_reass(drop_frag=1)
3349         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3350
3351     def test_frag_in_order(self):
3352         """ NAT44 translate fragments arriving in order """
3353         self.nat44_add_address(self.nat_addr)
3354         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3356                                                   is_inside=0)
3357
3358         data = "A" * 4 + "B" * 16 + "C" * 3
3359         self.tcp_port_in = random.randint(1025, 65535)
3360
3361         reass = self.vapi.nat_reass_dump()
3362         reass_n_start = len(reass)
3363
3364         # in2out
3365         pkts = self.create_stream_frag(self.pg0,
3366                                        self.pg1.remote_ip4,
3367                                        self.tcp_port_in,
3368                                        20,
3369                                        data)
3370         self.pg0.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         frags = self.pg1.get_capture(len(pkts))
3374         p = self.reass_frags_and_verify(frags,
3375                                         self.nat_addr,
3376                                         self.pg1.remote_ip4)
3377         self.assertEqual(p[TCP].dport, 20)
3378         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3379         self.tcp_port_out = p[TCP].sport
3380         self.assertEqual(data, p[Raw].load)
3381
3382         # out2in
3383         pkts = self.create_stream_frag(self.pg1,
3384                                        self.nat_addr,
3385                                        20,
3386                                        self.tcp_port_out,
3387                                        data)
3388         self.pg1.add_stream(pkts)
3389         self.pg_enable_capture(self.pg_interfaces)
3390         self.pg_start()
3391         frags = self.pg0.get_capture(len(pkts))
3392         p = self.reass_frags_and_verify(frags,
3393                                         self.pg1.remote_ip4,
3394                                         self.pg0.remote_ip4)
3395         self.assertEqual(p[TCP].sport, 20)
3396         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3397         self.assertEqual(data, p[Raw].load)
3398
3399         reass = self.vapi.nat_reass_dump()
3400         reass_n_end = len(reass)
3401
3402         self.assertEqual(reass_n_end - reass_n_start, 2)
3403
3404     def test_reass_hairpinning(self):
3405         """ NAT44 fragments hairpinning """
3406         host = self.pg0.remote_hosts[0]
3407         server = self.pg0.remote_hosts[1]
3408         host_in_port = random.randint(1025, 65535)
3409         host_out_port = 0
3410         server_in_port = random.randint(1025, 65535)
3411         server_out_port = random.randint(1025, 65535)
3412         data = "A" * 4 + "B" * 16 + "C" * 3
3413
3414         self.nat44_add_address(self.nat_addr)
3415         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3416         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3417                                                   is_inside=0)
3418         # add static mapping for server
3419         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3420                                       server_in_port, server_out_port,
3421                                       proto=IP_PROTOS.tcp)
3422
3423         # send packet from host to server
3424         pkts = self.create_stream_frag(self.pg0,
3425                                        self.nat_addr,
3426                                        host_in_port,
3427                                        server_out_port,
3428                                        data)
3429         self.pg0.add_stream(pkts)
3430         self.pg_enable_capture(self.pg_interfaces)
3431         self.pg_start()
3432         frags = self.pg0.get_capture(len(pkts))
3433         p = self.reass_frags_and_verify(frags,
3434                                         self.nat_addr,
3435                                         server.ip4)
3436         self.assertNotEqual(p[TCP].sport, host_in_port)
3437         self.assertEqual(p[TCP].dport, server_in_port)
3438         self.assertEqual(data, p[Raw].load)
3439
3440     def test_frag_out_of_order(self):
3441         """ NAT44 translate fragments arriving out of order """
3442         self.nat44_add_address(self.nat_addr)
3443         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3445                                                   is_inside=0)
3446
3447         data = "A" * 4 + "B" * 16 + "C" * 3
3448         random.randint(1025, 65535)
3449
3450         # in2out
3451         pkts = self.create_stream_frag(self.pg0,
3452                                        self.pg1.remote_ip4,
3453                                        self.tcp_port_in,
3454                                        20,
3455                                        data)
3456         pkts.reverse()
3457         self.pg0.add_stream(pkts)
3458         self.pg_enable_capture(self.pg_interfaces)
3459         self.pg_start()
3460         frags = self.pg1.get_capture(len(pkts))
3461         p = self.reass_frags_and_verify(frags,
3462                                         self.nat_addr,
3463                                         self.pg1.remote_ip4)
3464         self.assertEqual(p[TCP].dport, 20)
3465         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3466         self.tcp_port_out = p[TCP].sport
3467         self.assertEqual(data, p[Raw].load)
3468
3469         # out2in
3470         pkts = self.create_stream_frag(self.pg1,
3471                                        self.nat_addr,
3472                                        20,
3473                                        self.tcp_port_out,
3474                                        data)
3475         pkts.reverse()
3476         self.pg1.add_stream(pkts)
3477         self.pg_enable_capture(self.pg_interfaces)
3478         self.pg_start()
3479         frags = self.pg0.get_capture(len(pkts))
3480         p = self.reass_frags_and_verify(frags,
3481                                         self.pg1.remote_ip4,
3482                                         self.pg0.remote_ip4)
3483         self.assertEqual(p[TCP].sport, 20)
3484         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3485         self.assertEqual(data, p[Raw].load)
3486
3487     def test_port_restricted(self):
3488         """ Port restricted NAT44 (MAP-E CE) """
3489         self.nat44_add_address(self.nat_addr)
3490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3492                                                   is_inside=0)
3493         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3494                       "psid-offset 6 psid-len 6")
3495
3496         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3497              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3498              TCP(sport=4567, dport=22))
3499         self.pg0.add_stream(p)
3500         self.pg_enable_capture(self.pg_interfaces)
3501         self.pg_start()
3502         capture = self.pg1.get_capture(1)
3503         p = capture[0]
3504         try:
3505             ip = p[IP]
3506             tcp = p[TCP]
3507             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3508             self.assertEqual(ip.src, self.nat_addr)
3509             self.assertEqual(tcp.dport, 22)
3510             self.assertNotEqual(tcp.sport, 4567)
3511             self.assertEqual((tcp.sport >> 6) & 63, 10)
3512             self.check_tcp_checksum(p)
3513             self.check_ip_checksum(p)
3514         except:
3515             self.logger.error(ppp("Unexpected or invalid packet:", p))
3516             raise
3517
3518     def test_twice_nat(self):
3519         """ Twice NAT44 """
3520         twice_nat_addr = '10.0.1.3'
3521         port_in = 8080
3522         port_out = 80
3523         eh_port_out = 4567
3524         eh_port_in = 0
3525         self.nat44_add_address(self.nat_addr)
3526         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3527         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3528                                       port_in, port_out, proto=IP_PROTOS.tcp,
3529                                       twice_nat=1)
3530         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3531         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3532                                                   is_inside=0)
3533
3534         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3535              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3536              TCP(sport=eh_port_out, dport=port_out))
3537         self.pg1.add_stream(p)
3538         self.pg_enable_capture(self.pg_interfaces)
3539         self.pg_start()
3540         capture = self.pg0.get_capture(1)
3541         p = capture[0]
3542         try:
3543             ip = p[IP]
3544             tcp = p[TCP]
3545             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3546             self.assertEqual(ip.src, twice_nat_addr)
3547             self.assertEqual(tcp.dport, port_in)
3548             self.assertNotEqual(tcp.sport, eh_port_out)
3549             eh_port_in = tcp.sport
3550             self.check_tcp_checksum(p)
3551             self.check_ip_checksum(p)
3552         except:
3553             self.logger.error(ppp("Unexpected or invalid packet:", p))
3554             raise
3555
3556         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3557              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3558              TCP(sport=port_in, dport=eh_port_in))
3559         self.pg0.add_stream(p)
3560         self.pg_enable_capture(self.pg_interfaces)
3561         self.pg_start()
3562         capture = self.pg1.get_capture(1)
3563         p = capture[0]
3564         try:
3565             ip = p[IP]
3566             tcp = p[TCP]
3567             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3568             self.assertEqual(ip.src, self.nat_addr)
3569             self.assertEqual(tcp.dport, eh_port_out)
3570             self.assertEqual(tcp.sport, port_out)
3571             self.check_tcp_checksum(p)
3572             self.check_ip_checksum(p)
3573         except:
3574             self.logger.error(ppp("Unexpected or invalid packet:", p))
3575             raise
3576
3577     def test_twice_nat_lb(self):
3578         """ Twice NAT44 local service load balancing """
3579         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3580         twice_nat_addr = '10.0.1.3'
3581         local_port = 8080
3582         external_port = 80
3583         eh_port_out = 4567
3584         eh_port_in = 0
3585         server1 = self.pg0.remote_hosts[0]
3586         server2 = self.pg0.remote_hosts[1]
3587
3588         locals = [{'addr': server1.ip4n,
3589                    'port': local_port,
3590                    'probability': 50},
3591                   {'addr': server2.ip4n,
3592                    'port': local_port,
3593                    'probability': 50}]
3594
3595         self.nat44_add_address(self.nat_addr)
3596         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3597
3598         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3599                                                   external_port,
3600                                                   IP_PROTOS.tcp,
3601                                                   twice_nat=1,
3602                                                   local_num=len(locals),
3603                                                   locals=locals)
3604         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3605         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3606                                                   is_inside=0)
3607
3608         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3609              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3610              TCP(sport=eh_port_out, dport=external_port))
3611         self.pg1.add_stream(p)
3612         self.pg_enable_capture(self.pg_interfaces)
3613         self.pg_start()
3614         capture = self.pg0.get_capture(1)
3615         p = capture[0]
3616         server = None
3617         try:
3618             ip = p[IP]
3619             tcp = p[TCP]
3620             self.assertEqual(ip.src, twice_nat_addr)
3621             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3622             if ip.dst == server1.ip4:
3623                 server = server1
3624             else:
3625                 server = server2
3626             self.assertNotEqual(tcp.sport, eh_port_out)
3627             eh_port_in = tcp.sport
3628             self.assertEqual(tcp.dport, local_port)
3629             self.check_tcp_checksum(p)
3630             self.check_ip_checksum(p)
3631         except:
3632             self.logger.error(ppp("Unexpected or invalid packet:", p))
3633             raise
3634
3635         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3636              IP(src=server.ip4, dst=twice_nat_addr) /
3637              TCP(sport=local_port, dport=eh_port_in))
3638         self.pg0.add_stream(p)
3639         self.pg_enable_capture(self.pg_interfaces)
3640         self.pg_start()
3641         capture = self.pg1.get_capture(1)
3642         p = capture[0]
3643         try:
3644             ip = p[IP]
3645             tcp = p[TCP]
3646             self.assertEqual(ip.src, self.nat_addr)
3647             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3648             self.assertEqual(tcp.sport, external_port)
3649             self.assertEqual(tcp.dport, eh_port_out)
3650             self.check_tcp_checksum(p)
3651             self.check_ip_checksum(p)
3652         except:
3653             self.logger.error(ppp("Unexpected or invalid packet:", p))
3654             raise
3655
3656     def test_twice_nat_interface_addr(self):
3657         """ Acquire twice NAT44 addresses from interface """
3658         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3659
3660         # no address in NAT pool
3661         adresses = self.vapi.nat44_address_dump()
3662         self.assertEqual(0, len(adresses))
3663
3664         # configure interface address and check NAT address pool
3665         self.pg7.config_ip4()
3666         adresses = self.vapi.nat44_address_dump()
3667         self.assertEqual(1, len(adresses))
3668         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3669         self.assertEqual(adresses[0].twice_nat, 1)
3670
3671         # remove interface address and check NAT address pool
3672         self.pg7.unconfig_ip4()
3673         adresses = self.vapi.nat44_address_dump()
3674         self.assertEqual(0, len(adresses))
3675
3676     def test_ipfix_max_frags(self):
3677         """ IPFIX logging maximum fragments pending reassembly exceeded """
3678         self.nat44_add_address(self.nat_addr)
3679         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3680         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3681                                                   is_inside=0)
3682         self.vapi.nat_set_reass(max_frag=0)
3683         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3684                                      src_address=self.pg3.local_ip4n,
3685                                      path_mtu=512,
3686                                      template_interval=10)
3687         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3688                             src_port=self.ipfix_src_port)
3689
3690         data = "A" * 4 + "B" * 16 + "C" * 3
3691         self.tcp_port_in = random.randint(1025, 65535)
3692         pkts = self.create_stream_frag(self.pg0,
3693                                        self.pg1.remote_ip4,
3694                                        self.tcp_port_in,
3695                                        20,
3696                                        data)
3697         self.pg0.add_stream(pkts[-1])
3698         self.pg_enable_capture(self.pg_interfaces)
3699         self.pg_start()
3700         frags = self.pg1.get_capture(0)
3701         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3702         capture = self.pg3.get_capture(9)
3703         ipfix = IPFIXDecoder()
3704         # first load template
3705         for p in capture:
3706             self.assertTrue(p.haslayer(IPFIX))
3707             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3708             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3709             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3710             self.assertEqual(p[UDP].dport, 4739)
3711             self.assertEqual(p[IPFIX].observationDomainID,
3712                              self.ipfix_domain_id)
3713             if p.haslayer(Template):
3714                 ipfix.add_template(p.getlayer(Template))
3715         # verify events in data set
3716         for p in capture:
3717             if p.haslayer(Data):
3718                 data = ipfix.decode_data_set(p.getlayer(Set))
3719                 self.verify_ipfix_max_fragments_ip4(data, 0,
3720                                                     self.pg0.remote_ip4n)
3721
3722     def tearDown(self):
3723         super(TestNAT44, self).tearDown()
3724         if not self.vpp_dead:
3725             self.logger.info(self.vapi.cli("show nat44 verbose"))
3726             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3727             self.vapi.cli("nat addr-port-assignment-alg default")
3728             self.clear_nat44()
3729
3730
3731 class TestNAT44Out2InDPO(MethodHolder):
3732     """ NAT44 Test Cases using out2in DPO """
3733
3734     @classmethod
3735     def setUpConstants(cls):
3736         super(TestNAT44Out2InDPO, cls).setUpConstants()
3737         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3738
3739     @classmethod
3740     def setUpClass(cls):
3741         super(TestNAT44Out2InDPO, cls).setUpClass()
3742
3743         try:
3744             cls.tcp_port_in = 6303
3745             cls.tcp_port_out = 6303
3746             cls.udp_port_in = 6304
3747             cls.udp_port_out = 6304
3748             cls.icmp_id_in = 6305
3749             cls.icmp_id_out = 6305
3750             cls.nat_addr = '10.0.0.3'
3751             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3752             cls.dst_ip4 = '192.168.70.1'
3753
3754             cls.create_pg_interfaces(range(2))
3755
3756             cls.pg0.admin_up()
3757             cls.pg0.config_ip4()
3758             cls.pg0.resolve_arp()
3759
3760             cls.pg1.admin_up()
3761             cls.pg1.config_ip6()
3762             cls.pg1.resolve_ndp()
3763
3764             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3765                                       dst_address_length=0,
3766                                       next_hop_address=cls.pg1.remote_ip6n,
3767                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3768
3769         except Exception:
3770             super(TestNAT44Out2InDPO, cls).tearDownClass()
3771             raise
3772
3773     def configure_xlat(self):
3774         self.dst_ip6_pfx = '1:2:3::'
3775         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3776                                               self.dst_ip6_pfx)
3777         self.dst_ip6_pfx_len = 96
3778         self.src_ip6_pfx = '4:5:6::'
3779         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3780                                               self.src_ip6_pfx)
3781         self.src_ip6_pfx_len = 96
3782         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3783                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3784                                  '\x00\x00\x00\x00', 0, is_translation=1,
3785                                  is_rfc6052=1)
3786
3787     def test_464xlat_ce(self):
3788         """ Test 464XLAT CE with NAT44 """
3789
3790         self.configure_xlat()
3791
3792         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3793         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3794
3795         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3796                                        self.dst_ip6_pfx_len)
3797         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3798                                        self.src_ip6_pfx_len)
3799
3800         try:
3801             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3802             self.pg0.add_stream(pkts)
3803             self.pg_enable_capture(self.pg_interfaces)
3804             self.pg_start()
3805             capture = self.pg1.get_capture(len(pkts))
3806             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3807                                         dst_ip=out_src_ip6)
3808
3809             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3810                                               out_dst_ip6)
3811             self.pg1.add_stream(pkts)
3812             self.pg_enable_capture(self.pg_interfaces)
3813             self.pg_start()
3814             capture = self.pg0.get_capture(len(pkts))
3815             self.verify_capture_in(capture, self.pg0)
3816         finally:
3817             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3818                                                       is_add=0)
3819             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3820                                                   self.nat_addr_n, is_add=0)
3821
3822     def test_464xlat_ce_no_nat(self):
3823         """ Test 464XLAT CE without NAT44 """
3824
3825         self.configure_xlat()
3826
3827         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3828                                        self.dst_ip6_pfx_len)
3829         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3830                                        self.src_ip6_pfx_len)
3831
3832         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3833         self.pg0.add_stream(pkts)
3834         self.pg_enable_capture(self.pg_interfaces)
3835         self.pg_start()
3836         capture = self.pg1.get_capture(len(pkts))
3837         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3838                                     nat_ip=out_dst_ip6, same_port=True)
3839
3840         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3841         self.pg1.add_stream(pkts)
3842         self.pg_enable_capture(self.pg_interfaces)
3843         self.pg_start()
3844         capture = self.pg0.get_capture(len(pkts))
3845         self.verify_capture_in(capture, self.pg0)
3846
3847
3848 class TestDeterministicNAT(MethodHolder):
3849     """ Deterministic NAT Test Cases """
3850
3851     @classmethod
3852     def setUpConstants(cls):
3853         super(TestDeterministicNAT, cls).setUpConstants()
3854         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3855
3856     @classmethod
3857     def setUpClass(cls):
3858         super(TestDeterministicNAT, cls).setUpClass()
3859
3860         try:
3861             cls.tcp_port_in = 6303
3862             cls.tcp_external_port = 6303
3863             cls.udp_port_in = 6304
3864             cls.udp_external_port = 6304
3865             cls.icmp_id_in = 6305
3866             cls.nat_addr = '10.0.0.3'
3867
3868             cls.create_pg_interfaces(range(3))
3869             cls.interfaces = list(cls.pg_interfaces)
3870
3871             for i in cls.interfaces:
3872                 i.admin_up()
3873                 i.config_ip4()
3874                 i.resolve_arp()
3875
3876             cls.pg0.generate_remote_hosts(2)
3877             cls.pg0.configure_ipv4_neighbors()
3878
3879         except Exception:
3880             super(TestDeterministicNAT, cls).tearDownClass()
3881             raise
3882
3883     def create_stream_in(self, in_if, out_if, ttl=64):
3884         """
3885         Create packet stream for inside network
3886
3887         :param in_if: Inside interface
3888         :param out_if: Outside interface
3889         :param ttl: TTL of generated packets
3890         """
3891         pkts = []
3892         # TCP
3893         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3894              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3895              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3896         pkts.append(p)
3897
3898         # UDP
3899         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3900              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3901              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3902         pkts.append(p)
3903
3904         # ICMP
3905         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3906              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3907              ICMP(id=self.icmp_id_in, type='echo-request'))
3908         pkts.append(p)
3909
3910         return pkts
3911
3912     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3913         """
3914         Create packet stream for outside network
3915
3916         :param out_if: Outside interface
3917         :param dst_ip: Destination IP address (Default use global NAT address)
3918         :param ttl: TTL of generated packets
3919         """
3920         if dst_ip is None:
3921             dst_ip = self.nat_addr
3922         pkts = []
3923         # TCP
3924         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3925              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3926              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3927         pkts.append(p)
3928
3929         # UDP
3930         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3931              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3932              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3933         pkts.append(p)
3934
3935         # ICMP
3936         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3937              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3938              ICMP(id=self.icmp_external_id, type='echo-reply'))
3939         pkts.append(p)
3940
3941         return pkts
3942
3943     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3944         """
3945         Verify captured packets on outside network
3946
3947         :param capture: Captured packets
3948         :param nat_ip: Translated IP address (Default use global NAT address)
3949         :param same_port: Sorce port number is not translated (Default False)
3950         :param packet_num: Expected number of packets (Default 3)
3951         """
3952         if nat_ip is None:
3953             nat_ip = self.nat_addr
3954         self.assertEqual(packet_num, len(capture))
3955         for packet in capture:
3956             try:
3957                 self.assertEqual(packet[IP].src, nat_ip)
3958                 if packet.haslayer(TCP):
3959                     self.tcp_port_out = packet[TCP].sport
3960                 elif packet.haslayer(UDP):
3961                     self.udp_port_out = packet[UDP].sport
3962                 else:
3963                     self.icmp_external_id = packet[ICMP].id
3964             except:
3965                 self.logger.error(ppp("Unexpected or invalid packet "
3966                                       "(outside network):", packet))
3967                 raise
3968
3969     def initiate_tcp_session(self, in_if, out_if):
3970         """
3971         Initiates TCP session
3972
3973         :param in_if: Inside interface
3974         :param out_if: Outside interface
3975         """
3976         try:
3977             # SYN packet in->out
3978             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3979                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3980                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3981                      flags="S"))
3982             in_if.add_stream(p)
3983             self.pg_enable_capture(self.pg_interfaces)
3984             self.pg_start()
3985             capture = out_if.get_capture(1)
3986             p = capture[0]
3987             self.tcp_port_out = p[TCP].sport
3988
3989             # SYN + ACK packet out->in
3990             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3991                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3992                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3993                      flags="SA"))
3994             out_if.add_stream(p)
3995             self.pg_enable_capture(self.pg_interfaces)
3996             self.pg_start()
3997             in_if.get_capture(1)
3998
3999             # ACK packet in->out
4000             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4001                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4002                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4003                      flags="A"))
4004             in_if.add_stream(p)
4005             self.pg_enable_capture(self.pg_interfaces)
4006             self.pg_start()
4007             out_if.get_capture(1)
4008
4009         except:
4010             self.logger.error("TCP 3 way handshake failed")
4011             raise
4012
4013     def verify_ipfix_max_entries_per_user(self, data):
4014         """
4015         Verify IPFIX maximum entries per user exceeded event
4016
4017         :param data: Decoded IPFIX data records
4018         """
4019         self.assertEqual(1, len(data))
4020         record = data[0]
4021         # natEvent
4022         self.assertEqual(ord(record[230]), 13)
4023         # natQuotaExceededEvent
4024         self.assertEqual('\x03\x00\x00\x00', record[466])
4025         # maxEntriesPerUser
4026         self.assertEqual('\xe8\x03\x00\x00', record[473])
4027         # sourceIPv4Address
4028         self.assertEqual(self.pg0.remote_ip4n, record[8])
4029
4030     def test_deterministic_mode(self):
4031         """ NAT plugin run deterministic mode """
4032         in_addr = '172.16.255.0'
4033         out_addr = '172.17.255.50'
4034         in_addr_t = '172.16.255.20'
4035         in_addr_n = socket.inet_aton(in_addr)
4036         out_addr_n = socket.inet_aton(out_addr)
4037         in_addr_t_n = socket.inet_aton(in_addr_t)
4038         in_plen = 24
4039         out_plen = 32
4040
4041         nat_config = self.vapi.nat_show_config()
4042         self.assertEqual(1, nat_config.deterministic)
4043
4044         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4045
4046         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4047         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4048         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4049         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4050
4051         deterministic_mappings = self.vapi.nat_det_map_dump()
4052         self.assertEqual(len(deterministic_mappings), 1)
4053         dsm = deterministic_mappings[0]
4054         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4055         self.assertEqual(in_plen, dsm.in_plen)
4056         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4057         self.assertEqual(out_plen, dsm.out_plen)
4058
4059         self.clear_nat_det()
4060         deterministic_mappings = self.vapi.nat_det_map_dump()
4061         self.assertEqual(len(deterministic_mappings), 0)
4062
4063     def test_set_timeouts(self):
4064         """ Set deterministic NAT timeouts """
4065         timeouts_before = self.vapi.nat_det_get_timeouts()
4066
4067         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4068                                        timeouts_before.tcp_established + 10,
4069                                        timeouts_before.tcp_transitory + 10,
4070                                        timeouts_before.icmp + 10)
4071
4072         timeouts_after = self.vapi.nat_det_get_timeouts()
4073
4074         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4075         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4076         self.assertNotEqual(timeouts_before.tcp_established,
4077                             timeouts_after.tcp_established)
4078         self.assertNotEqual(timeouts_before.tcp_transitory,
4079                             timeouts_after.tcp_transitory)
4080
4081     def test_det_in(self):
4082         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4083
4084         nat_ip = "10.0.0.10"
4085
4086         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4087                                       32,
4088                                       socket.inet_aton(nat_ip),
4089                                       32)
4090         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4091         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4092                                                   is_inside=0)
4093
4094         # in2out
4095         pkts = self.create_stream_in(self.pg0, self.pg1)
4096         self.pg0.add_stream(pkts)
4097         self.pg_enable_capture(self.pg_interfaces)
4098         self.pg_start()
4099         capture = self.pg1.get_capture(len(pkts))
4100         self.verify_capture_out(capture, nat_ip)
4101
4102         # out2in
4103         pkts = self.create_stream_out(self.pg1, nat_ip)
4104         self.pg1.add_stream(pkts)
4105         self.pg_enable_capture(self.pg_interfaces)
4106         self.pg_start()
4107         capture = self.pg0.get_capture(len(pkts))
4108         self.verify_capture_in(capture, self.pg0)
4109
4110         # session dump test
4111         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4112         self.assertEqual(len(sessions), 3)
4113
4114         # TCP session
4115         s = sessions[0]
4116         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4117         self.assertEqual(s.in_port, self.tcp_port_in)
4118         self.assertEqual(s.out_port, self.tcp_port_out)
4119         self.assertEqual(s.ext_port, self.tcp_external_port)
4120
4121         # UDP session
4122         s = sessions[1]
4123         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4124         self.assertEqual(s.in_port, self.udp_port_in)
4125         self.assertEqual(s.out_port, self.udp_port_out)
4126         self.assertEqual(s.ext_port, self.udp_external_port)
4127
4128         # ICMP session
4129         s = sessions[2]
4130         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4131         self.assertEqual(s.in_port, self.icmp_id_in)
4132         self.assertEqual(s.out_port, self.icmp_external_id)
4133
4134     def test_multiple_users(self):
4135         """ Deterministic NAT multiple users """
4136
4137         nat_ip = "10.0.0.10"
4138         port_in = 80
4139         external_port = 6303
4140
4141         host0 = self.pg0.remote_hosts[0]
4142         host1 = self.pg0.remote_hosts[1]
4143
4144         self.vapi.nat_det_add_del_map(host0.ip4n,
4145                                       24,
4146                                       socket.inet_aton(nat_ip),
4147                                       32)
4148         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4149         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4150                                                   is_inside=0)
4151
4152         # host0 to out
4153         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4154              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4155              TCP(sport=port_in, dport=external_port))
4156         self.pg0.add_stream(p)
4157         self.pg_enable_capture(self.pg_interfaces)
4158         self.pg_start()
4159         capture = self.pg1.get_capture(1)
4160         p = capture[0]
4161         try:
4162             ip = p[IP]
4163             tcp = p[TCP]
4164             self.assertEqual(ip.src, nat_ip)
4165             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4166             self.assertEqual(tcp.dport, external_port)
4167             port_out0 = tcp.sport
4168         except:
4169             self.logger.error(ppp("Unexpected or invalid packet:", p))
4170             raise
4171
4172         # host1 to out
4173         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4174              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4175              TCP(sport=port_in, dport=external_port))
4176         self.pg0.add_stream(p)
4177         self.pg_enable_capture(self.pg_interfaces)
4178         self.pg_start()
4179         capture = self.pg1.get_capture(1)
4180         p = capture[0]
4181         try:
4182             ip = p[IP]
4183             tcp = p[TCP]
4184             self.assertEqual(ip.src, nat_ip)
4185             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4186             self.assertEqual(tcp.dport, external_port)
4187             port_out1 = tcp.sport
4188         except:
4189             self.logger.error(ppp("Unexpected or invalid packet:", p))
4190             raise
4191
4192         dms = self.vapi.nat_det_map_dump()
4193         self.assertEqual(1, len(dms))
4194         self.assertEqual(2, dms[0].ses_num)
4195
4196         # out to host0
4197         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4198              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4199              TCP(sport=external_port, dport=port_out0))
4200         self.pg1.add_stream(p)
4201         self.pg_enable_capture(self.pg_interfaces)
4202         self.pg_start()
4203         capture = self.pg0.get_capture(1)
4204         p = capture[0]
4205         try:
4206             ip = p[IP]
4207             tcp = p[TCP]
4208             self.assertEqual(ip.src, self.pg1.remote_ip4)
4209             self.assertEqual(ip.dst, host0.ip4)
4210             self.assertEqual(tcp.dport, port_in)
4211             self.assertEqual(tcp.sport, external_port)
4212         except:
4213             self.logger.error(ppp("Unexpected or invalid packet:", p))
4214             raise
4215
4216         # out to host1
4217         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4218              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4219              TCP(sport=external_port, dport=port_out1))
4220         self.pg1.add_stream(p)
4221         self.pg_enable_capture(self.pg_interfaces)
4222         self.pg_start()
4223         capture = self.pg0.get_capture(1)
4224         p = capture[0]
4225         try:
4226             ip = p[IP]
4227             tcp = p[TCP]
4228             self.assertEqual(ip.src, self.pg1.remote_ip4)
4229             self.assertEqual(ip.dst, host1.ip4)
4230             self.assertEqual(tcp.dport, port_in)
4231             self.assertEqual(tcp.sport, external_port)
4232         except:
4233             self.logger.error(ppp("Unexpected or invalid packet", p))
4234             raise
4235
4236         # session close api test
4237         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4238                                             port_out1,
4239                                             self.pg1.remote_ip4n,
4240                                             external_port)
4241         dms = self.vapi.nat_det_map_dump()
4242         self.assertEqual(dms[0].ses_num, 1)
4243
4244         self.vapi.nat_det_close_session_in(host0.ip4n,
4245                                            port_in,
4246                                            self.pg1.remote_ip4n,
4247                                            external_port)
4248         dms = self.vapi.nat_det_map_dump()
4249         self.assertEqual(dms[0].ses_num, 0)
4250
4251     def test_tcp_session_close_detection_in(self):
4252         """ Deterministic NAT TCP session close from inside network """
4253         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4254                                       32,
4255                                       socket.inet_aton(self.nat_addr),
4256                                       32)
4257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4259                                                   is_inside=0)
4260
4261         self.initiate_tcp_session(self.pg0, self.pg1)
4262
4263         # close the session from inside
4264         try:
4265             # FIN packet in -> out
4266             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4267                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4268                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4269                      flags="F"))
4270             self.pg0.add_stream(p)
4271             self.pg_enable_capture(self.pg_interfaces)
4272             self.pg_start()
4273             self.pg1.get_capture(1)
4274
4275             pkts = []
4276
4277             # ACK packet out -> in
4278             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4279                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4280                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4281                      flags="A"))
4282             pkts.append(p)
4283
4284             # FIN packet out -> in
4285             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4286                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4287                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4288                      flags="F"))
4289             pkts.append(p)
4290
4291             self.pg1.add_stream(pkts)
4292             self.pg_enable_capture(self.pg_interfaces)
4293             self.pg_start()
4294             self.pg0.get_capture(2)
4295
4296             # ACK packet in -> out
4297             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4298                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4299                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4300                      flags="A"))
4301             self.pg0.add_stream(p)
4302             self.pg_enable_capture(self.pg_interfaces)
4303             self.pg_start()
4304             self.pg1.get_capture(1)
4305
4306             # Check if deterministic NAT44 closed the session
4307             dms = self.vapi.nat_det_map_dump()
4308             self.assertEqual(0, dms[0].ses_num)
4309         except:
4310             self.logger.error("TCP session termination failed")
4311             raise
4312
4313     def test_tcp_session_close_detection_out(self):
4314         """ Deterministic NAT TCP session close from outside network """
4315         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4316                                       32,
4317                                       socket.inet_aton(self.nat_addr),
4318                                       32)
4319         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4320         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4321                                                   is_inside=0)
4322
4323         self.initiate_tcp_session(self.pg0, self.pg1)
4324
4325         # close the session from outside
4326         try:
4327             # FIN packet out -> in
4328             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4329                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4330                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4331                      flags="F"))
4332             self.pg1.add_stream(p)
4333             self.pg_enable_capture(self.pg_interfaces)
4334             self.pg_start()
4335             self.pg0.get_capture(1)
4336
4337             pkts = []
4338
4339             # ACK packet in -> out
4340             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4341                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4342                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4343                      flags="A"))
4344             pkts.append(p)
4345
4346             # ACK packet in -> out
4347             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4348                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4349                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4350                      flags="F"))
4351             pkts.append(p)
4352
4353             self.pg0.add_stream(pkts)
4354             self.pg_enable_capture(self.pg_interfaces)
4355             self.pg_start()
4356             self.pg1.get_capture(2)
4357
4358             # ACK packet out -> in
4359             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4360                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4361                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4362                      flags="A"))
4363             self.pg1.add_stream(p)
4364             self.pg_enable_capture(self.pg_interfaces)
4365             self.pg_start()
4366             self.pg0.get_capture(1)
4367
4368             # Check if deterministic NAT44 closed the session
4369             dms = self.vapi.nat_det_map_dump()
4370             self.assertEqual(0, dms[0].ses_num)
4371         except:
4372             self.logger.error("TCP session termination failed")
4373             raise
4374
4375     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4376     def test_session_timeout(self):
4377         """ Deterministic NAT session timeouts """
4378         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4379                                       32,
4380                                       socket.inet_aton(self.nat_addr),
4381                                       32)
4382         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4383         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4384                                                   is_inside=0)
4385
4386         self.initiate_tcp_session(self.pg0, self.pg1)
4387         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4388         pkts = self.create_stream_in(self.pg0, self.pg1)
4389         self.pg0.add_stream(pkts)
4390         self.pg_enable_capture(self.pg_interfaces)
4391         self.pg_start()
4392         capture = self.pg1.get_capture(len(pkts))
4393         sleep(15)
4394
4395         dms = self.vapi.nat_det_map_dump()
4396         self.assertEqual(0, dms[0].ses_num)
4397
4398     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4399     def test_session_limit_per_user(self):
4400         """ Deterministic NAT maximum sessions per user limit """
4401         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4402                                       32,
4403                                       socket.inet_aton(self.nat_addr),
4404                                       32)
4405         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4406         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4407                                                   is_inside=0)
4408         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4409                                      src_address=self.pg2.local_ip4n,
4410                                      path_mtu=512,
4411                                      template_interval=10)
4412         self.vapi.nat_ipfix()
4413
4414         pkts = []
4415         for port in range(1025, 2025):
4416             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4417                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4418                  UDP(sport=port, dport=port))
4419             pkts.append(p)
4420
4421         self.pg0.add_stream(pkts)
4422         self.pg_enable_capture(self.pg_interfaces)
4423         self.pg_start()
4424         capture = self.pg1.get_capture(len(pkts))
4425
4426         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4427              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4428              UDP(sport=3001, dport=3002))
4429         self.pg0.add_stream(p)
4430         self.pg_enable_capture(self.pg_interfaces)
4431         self.pg_start()
4432         capture = self.pg1.assert_nothing_captured()
4433
4434         # verify ICMP error packet
4435         capture = self.pg0.get_capture(1)
4436         p = capture[0]
4437         self.assertTrue(p.haslayer(ICMP))
4438         icmp = p[ICMP]
4439         self.assertEqual(icmp.type, 3)
4440         self.assertEqual(icmp.code, 1)
4441         self.assertTrue(icmp.haslayer(IPerror))
4442         inner_ip = icmp[IPerror]
4443         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4444         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4445
4446         dms = self.vapi.nat_det_map_dump()
4447
4448         self.assertEqual(1000, dms[0].ses_num)
4449
4450         # verify IPFIX logging
4451         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4452         sleep(1)
4453         capture = self.pg2.get_capture(2)
4454         ipfix = IPFIXDecoder()
4455         # first load template
4456         for p in capture:
4457             self.assertTrue(p.haslayer(IPFIX))
4458             if p.haslayer(Template):
4459                 ipfix.add_template(p.getlayer(Template))
4460         # verify events in data set
4461         for p in capture:
4462             if p.haslayer(Data):
4463                 data = ipfix.decode_data_set(p.getlayer(Set))
4464                 self.verify_ipfix_max_entries_per_user(data)
4465
4466     def clear_nat_det(self):
4467         """
4468         Clear deterministic NAT configuration.
4469         """
4470         self.vapi.nat_ipfix(enable=0)
4471         self.vapi.nat_det_set_timeouts()
4472         deterministic_mappings = self.vapi.nat_det_map_dump()
4473         for dsm in deterministic_mappings:
4474             self.vapi.nat_det_add_del_map(dsm.in_addr,
4475                                           dsm.in_plen,
4476                                           dsm.out_addr,
4477                                           dsm.out_plen,
4478                                           is_add=0)
4479
4480         interfaces = self.vapi.nat44_interface_dump()
4481         for intf in interfaces:
4482             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4483                                                       intf.is_inside,
4484                                                       is_add=0)
4485
4486     def tearDown(self):
4487         super(TestDeterministicNAT, self).tearDown()
4488         if not self.vpp_dead:
4489             self.logger.info(self.vapi.cli("show nat44 detail"))
4490             self.clear_nat_det()
4491
4492
4493 class TestNAT64(MethodHolder):
4494     """ NAT64 Test Cases """
4495
4496     @classmethod
4497     def setUpConstants(cls):
4498         super(TestNAT64, cls).setUpConstants()
4499         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4500                                 "nat64 st hash buckets 256", "}"])
4501
4502     @classmethod
4503     def setUpClass(cls):
4504         super(TestNAT64, cls).setUpClass()
4505
4506         try:
4507             cls.tcp_port_in = 6303
4508             cls.tcp_port_out = 6303
4509             cls.udp_port_in = 6304
4510             cls.udp_port_out = 6304
4511             cls.icmp_id_in = 6305
4512             cls.icmp_id_out = 6305
4513             cls.nat_addr = '10.0.0.3'
4514             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4515             cls.vrf1_id = 10
4516             cls.vrf1_nat_addr = '10.0.10.3'
4517             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4518                                                    cls.vrf1_nat_addr)
4519             cls.ipfix_src_port = 4739
4520             cls.ipfix_domain_id = 1
4521
4522             cls.create_pg_interfaces(range(5))
4523             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4524             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4525             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4526
4527             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4528
4529             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4530
4531             cls.pg0.generate_remote_hosts(2)
4532
4533             for i in cls.ip6_interfaces:
4534                 i.admin_up()
4535                 i.config_ip6()
4536                 i.configure_ipv6_neighbors()
4537
4538             for i in cls.ip4_interfaces:
4539                 i.admin_up()
4540                 i.config_ip4()
4541                 i.resolve_arp()
4542
4543             cls.pg3.admin_up()
4544             cls.pg3.config_ip4()
4545             cls.pg3.resolve_arp()
4546             cls.pg3.config_ip6()
4547             cls.pg3.configure_ipv6_neighbors()
4548
4549         except Exception:
4550             super(TestNAT64, cls).tearDownClass()
4551             raise
4552
4553     def test_pool(self):
4554         """ Add/delete address to NAT64 pool """
4555         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4556
4557         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4558
4559         addresses = self.vapi.nat64_pool_addr_dump()
4560         self.assertEqual(len(addresses), 1)
4561         self.assertEqual(addresses[0].address, nat_addr)
4562
4563         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4564
4565         addresses = self.vapi.nat64_pool_addr_dump()
4566         self.assertEqual(len(addresses), 0)
4567
4568     def test_interface(self):
4569         """ Enable/disable NAT64 feature on the interface """
4570         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4571         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4572
4573         interfaces = self.vapi.nat64_interface_dump()
4574         self.assertEqual(len(interfaces), 2)
4575         pg0_found = False
4576         pg1_found = False
4577         for intf in interfaces:
4578             if intf.sw_if_index == self.pg0.sw_if_index:
4579                 self.assertEqual(intf.is_inside, 1)
4580                 pg0_found = True
4581             elif intf.sw_if_index == self.pg1.sw_if_index:
4582                 self.assertEqual(intf.is_inside, 0)
4583                 pg1_found = True
4584         self.assertTrue(pg0_found)
4585         self.assertTrue(pg1_found)
4586
4587         features = self.vapi.cli("show interface features pg0")
4588         self.assertNotEqual(features.find('nat64-in2out'), -1)
4589         features = self.vapi.cli("show interface features pg1")
4590         self.assertNotEqual(features.find('nat64-out2in'), -1)
4591
4592         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4593         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4594
4595         interfaces = self.vapi.nat64_interface_dump()
4596         self.assertEqual(len(interfaces), 0)
4597
4598     def test_static_bib(self):
4599         """ Add/delete static BIB entry """
4600         in_addr = socket.inet_pton(socket.AF_INET6,
4601                                    '2001:db8:85a3::8a2e:370:7334')
4602         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4603         in_port = 1234
4604         out_port = 5678
4605         proto = IP_PROTOS.tcp
4606
4607         self.vapi.nat64_add_del_static_bib(in_addr,
4608                                            out_addr,
4609                                            in_port,
4610                                            out_port,
4611                                            proto)
4612         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4613         static_bib_num = 0
4614         for bibe in bib:
4615             if bibe.is_static:
4616                 static_bib_num += 1
4617                 self.assertEqual(bibe.i_addr, in_addr)
4618                 self.assertEqual(bibe.o_addr, out_addr)
4619                 self.assertEqual(bibe.i_port, in_port)
4620                 self.assertEqual(bibe.o_port, out_port)
4621         self.assertEqual(static_bib_num, 1)
4622
4623         self.vapi.nat64_add_del_static_bib(in_addr,
4624                                            out_addr,
4625                                            in_port,
4626                                            out_port,
4627                                            proto,
4628                                            is_add=0)
4629         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4630         static_bib_num = 0
4631         for bibe in bib:
4632             if bibe.is_static:
4633                 static_bib_num += 1
4634         self.assertEqual(static_bib_num, 0)
4635
4636     def test_set_timeouts(self):
4637         """ Set NAT64 timeouts """
4638         # verify default values
4639         timeouts = self.vapi.nat64_get_timeouts()
4640         self.assertEqual(timeouts.udp, 300)
4641         self.assertEqual(timeouts.icmp, 60)
4642         self.assertEqual(timeouts.tcp_trans, 240)
4643         self.assertEqual(timeouts.tcp_est, 7440)
4644         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4645
4646         # set and verify custom values
4647         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4648                                      tcp_est=7450, tcp_incoming_syn=10)
4649         timeouts = self.vapi.nat64_get_timeouts()
4650         self.assertEqual(timeouts.udp, 200)
4651         self.assertEqual(timeouts.icmp, 30)
4652         self.assertEqual(timeouts.tcp_trans, 250)
4653         self.assertEqual(timeouts.tcp_est, 7450)
4654         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4655
4656     def test_dynamic(self):
4657         """ NAT64 dynamic translation test """
4658         self.tcp_port_in = 6303
4659         self.udp_port_in = 6304
4660         self.icmp_id_in = 6305
4661
4662         ses_num_start = self.nat64_get_ses_num()
4663
4664         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4665                                                 self.nat_addr_n)
4666         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4667         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4668
4669         # in2out
4670         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4671         self.pg0.add_stream(pkts)
4672         self.pg_enable_capture(self.pg_interfaces)
4673         self.pg_start()
4674         capture = self.pg1.get_capture(len(pkts))
4675         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4676                                 dst_ip=self.pg1.remote_ip4)
4677
4678         # out2in
4679         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4680         self.pg1.add_stream(pkts)
4681         self.pg_enable_capture(self.pg_interfaces)
4682         self.pg_start()
4683         capture = self.pg0.get_capture(len(pkts))
4684         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4685         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4686
4687         # in2out
4688         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4689         self.pg0.add_stream(pkts)
4690         self.pg_enable_capture(self.pg_interfaces)
4691         self.pg_start()
4692         capture = self.pg1.get_capture(len(pkts))
4693         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4694                                 dst_ip=self.pg1.remote_ip4)
4695
4696         # out2in
4697         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4698         self.pg1.add_stream(pkts)
4699         self.pg_enable_capture(self.pg_interfaces)
4700         self.pg_start()
4701         capture = self.pg0.get_capture(len(pkts))
4702         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4703
4704         ses_num_end = self.nat64_get_ses_num()
4705
4706         self.assertEqual(ses_num_end - ses_num_start, 3)
4707
4708         # tenant with specific VRF
4709         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4710                                                 self.vrf1_nat_addr_n,
4711                                                 vrf_id=self.vrf1_id)
4712         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4713
4714         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4715         self.pg2.add_stream(pkts)
4716         self.pg_enable_capture(self.pg_interfaces)
4717         self.pg_start()
4718         capture = self.pg1.get_capture(len(pkts))
4719         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4720                                 dst_ip=self.pg1.remote_ip4)
4721
4722         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4723         self.pg1.add_stream(pkts)
4724         self.pg_enable_capture(self.pg_interfaces)
4725         self.pg_start()
4726         capture = self.pg2.get_capture(len(pkts))
4727         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4728
4729     def test_static(self):
4730         """ NAT64 static translation test """
4731         self.tcp_port_in = 60303
4732         self.udp_port_in = 60304
4733         self.icmp_id_in = 60305
4734         self.tcp_port_out = 60303
4735         self.udp_port_out = 60304
4736         self.icmp_id_out = 60305
4737
4738         ses_num_start = self.nat64_get_ses_num()
4739
4740         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4741                                                 self.nat_addr_n)
4742         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4743         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4744
4745         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4746                                            self.nat_addr_n,
4747                                            self.tcp_port_in,
4748                                            self.tcp_port_out,
4749                                            IP_PROTOS.tcp)
4750         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4751                                            self.nat_addr_n,
4752                                            self.udp_port_in,
4753                                            self.udp_port_out,
4754                                            IP_PROTOS.udp)
4755         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4756                                            self.nat_addr_n,
4757                                            self.icmp_id_in,
4758                                            self.icmp_id_out,
4759                                            IP_PROTOS.icmp)
4760
4761         # in2out
4762         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4763         self.pg0.add_stream(pkts)
4764         self.pg_enable_capture(self.pg_interfaces)
4765         self.pg_start()
4766         capture = self.pg1.get_capture(len(pkts))
4767         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4768                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4769
4770         # out2in
4771         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4772         self.pg1.add_stream(pkts)
4773         self.pg_enable_capture(self.pg_interfaces)
4774         self.pg_start()
4775         capture = self.pg0.get_capture(len(pkts))
4776         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4777         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4778
4779         ses_num_end = self.nat64_get_ses_num()
4780
4781         self.assertEqual(ses_num_end - ses_num_start, 3)
4782
4783     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4784     def test_session_timeout(self):
4785         """ NAT64 session timeout """
4786         self.icmp_id_in = 1234
4787         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4788                                                 self.nat_addr_n)
4789         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4790         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4791         self.vapi.nat64_set_timeouts(icmp=5)
4792
4793         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4794         self.pg0.add_stream(pkts)
4795         self.pg_enable_capture(self.pg_interfaces)
4796         self.pg_start()
4797         capture = self.pg1.get_capture(len(pkts))
4798
4799         ses_num_before_timeout = self.nat64_get_ses_num()
4800
4801         sleep(15)
4802
4803         # ICMP session after timeout
4804         ses_num_after_timeout = self.nat64_get_ses_num()
4805         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4806
4807     def test_icmp_error(self):
4808         """ NAT64 ICMP Error message translation """
4809         self.tcp_port_in = 6303
4810         self.udp_port_in = 6304
4811         self.icmp_id_in = 6305
4812
4813         ses_num_start = self.nat64_get_ses_num()
4814
4815         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4816                                                 self.nat_addr_n)
4817         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4818         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4819
4820         # send some packets to create sessions
4821         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4822         self.pg0.add_stream(pkts)
4823         self.pg_enable_capture(self.pg_interfaces)
4824         self.pg_start()
4825         capture_ip4 = self.pg1.get_capture(len(pkts))
4826         self.verify_capture_out(capture_ip4,
4827                                 nat_ip=self.nat_addr,
4828                                 dst_ip=self.pg1.remote_ip4)
4829
4830         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4831         self.pg1.add_stream(pkts)
4832         self.pg_enable_capture(self.pg_interfaces)
4833         self.pg_start()
4834         capture_ip6 = self.pg0.get_capture(len(pkts))
4835         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4836         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4837                                    self.pg0.remote_ip6)
4838
4839         # in2out
4840         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4841                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4842                 ICMPv6DestUnreach(code=1) /
4843                 packet[IPv6] for packet in capture_ip6]
4844         self.pg0.add_stream(pkts)
4845         self.pg_enable_capture(self.pg_interfaces)
4846         self.pg_start()
4847         capture = self.pg1.get_capture(len(pkts))
4848         for packet in capture:
4849             try:
4850                 self.assertEqual(packet[IP].src, self.nat_addr)
4851                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4852                 self.assertEqual(packet[ICMP].type, 3)
4853                 self.assertEqual(packet[ICMP].code, 13)
4854                 inner = packet[IPerror]
4855                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4856                 self.assertEqual(inner.dst, self.nat_addr)
4857                 self.check_icmp_checksum(packet)
4858                 if inner.haslayer(TCPerror):
4859                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4860                 elif inner.haslayer(UDPerror):
4861                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4862                 else:
4863                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4864             except:
4865                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4866                 raise
4867
4868         # out2in
4869         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4870                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4871                 ICMP(type=3, code=13) /
4872                 packet[IP] for packet in capture_ip4]
4873         self.pg1.add_stream(pkts)
4874         self.pg_enable_capture(self.pg_interfaces)
4875         self.pg_start()
4876         capture = self.pg0.get_capture(len(pkts))
4877         for packet in capture:
4878             try:
4879                 self.assertEqual(packet[IPv6].src, ip.src)
4880                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4881                 icmp = packet[ICMPv6DestUnreach]
4882                 self.assertEqual(icmp.code, 1)
4883                 inner = icmp[IPerror6]
4884                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4885                 self.assertEqual(inner.dst, ip.src)
4886                 self.check_icmpv6_checksum(packet)
4887                 if inner.haslayer(TCPerror):
4888                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4889                 elif inner.haslayer(UDPerror):
4890                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4891                 else:
4892                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4893                                      self.icmp_id_in)
4894             except:
4895                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4896                 raise
4897
4898     def test_hairpinning(self):
4899         """ NAT64 hairpinning """
4900
4901         client = self.pg0.remote_hosts[0]
4902         server = self.pg0.remote_hosts[1]
4903         server_tcp_in_port = 22
4904         server_tcp_out_port = 4022
4905         server_udp_in_port = 23
4906         server_udp_out_port = 4023
4907         client_tcp_in_port = 1234
4908         client_udp_in_port = 1235
4909         client_tcp_out_port = 0
4910         client_udp_out_port = 0
4911         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4912         nat_addr_ip6 = ip.src
4913
4914         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4915                                                 self.nat_addr_n)
4916         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4917         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4918
4919         self.vapi.nat64_add_del_static_bib(server.ip6n,
4920                                            self.nat_addr_n,
4921                                            server_tcp_in_port,
4922                                            server_tcp_out_port,
4923                                            IP_PROTOS.tcp)
4924         self.vapi.nat64_add_del_static_bib(server.ip6n,
4925                                            self.nat_addr_n,
4926                                            server_udp_in_port,
4927                                            server_udp_out_port,
4928                                            IP_PROTOS.udp)
4929
4930         # client to server
4931         pkts = []
4932         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4933              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4934              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4935         pkts.append(p)
4936         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4937              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4938              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4939         pkts.append(p)
4940         self.pg0.add_stream(pkts)
4941         self.pg_enable_capture(self.pg_interfaces)
4942         self.pg_start()
4943         capture = self.pg0.get_capture(len(pkts))
4944         for packet in capture:
4945             try:
4946                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4947                 self.assertEqual(packet[IPv6].dst, server.ip6)
4948                 if packet.haslayer(TCP):
4949                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4950                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4951                     self.check_tcp_checksum(packet)
4952                     client_tcp_out_port = packet[TCP].sport
4953                 else:
4954                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4955                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4956                     self.check_udp_checksum(packet)
4957                     client_udp_out_port = packet[UDP].sport
4958             except:
4959                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4960                 raise
4961
4962         # server to client
4963         pkts = []
4964         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4965              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4966              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4967         pkts.append(p)
4968         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4969              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4970              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4971         pkts.append(p)
4972         self.pg0.add_stream(pkts)
4973         self.pg_enable_capture(self.pg_interfaces)
4974         self.pg_start()
4975         capture = self.pg0.get_capture(len(pkts))
4976         for packet in capture:
4977             try:
4978                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4979                 self.assertEqual(packet[IPv6].dst, client.ip6)
4980                 if packet.haslayer(TCP):
4981                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4982                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4983                     self.check_tcp_checksum(packet)
4984                 else:
4985                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4986                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4987                     self.check_udp_checksum(packet)
4988             except:
4989                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4990                 raise
4991
4992         # ICMP error
4993         pkts = []
4994         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4996                 ICMPv6DestUnreach(code=1) /
4997                 packet[IPv6] for packet in capture]
4998         self.pg0.add_stream(pkts)
4999         self.pg_enable_capture(self.pg_interfaces)
5000         self.pg_start()
5001         capture = self.pg0.get_capture(len(pkts))
5002         for packet in capture:
5003             try:
5004                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5005                 self.assertEqual(packet[IPv6].dst, server.ip6)
5006                 icmp = packet[ICMPv6DestUnreach]
5007                 self.assertEqual(icmp.code, 1)
5008                 inner = icmp[IPerror6]
5009                 self.assertEqual(inner.src, server.ip6)
5010                 self.assertEqual(inner.dst, nat_addr_ip6)
5011                 self.check_icmpv6_checksum(packet)
5012                 if inner.haslayer(TCPerror):
5013                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5014                     self.assertEqual(inner[TCPerror].dport,
5015                                      client_tcp_out_port)
5016                 else:
5017                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5018                     self.assertEqual(inner[UDPerror].dport,
5019                                      client_udp_out_port)
5020             except:
5021                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5022                 raise
5023
5024     def test_prefix(self):
5025         """ NAT64 Network-Specific Prefix """
5026
5027         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5028                                                 self.nat_addr_n)
5029         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5030         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5031         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5032                                                 self.vrf1_nat_addr_n,
5033                                                 vrf_id=self.vrf1_id)
5034         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5035
5036         # Add global prefix
5037         global_pref64 = "2001:db8::"
5038         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5039         global_pref64_len = 32
5040         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5041
5042         prefix = self.vapi.nat64_prefix_dump()
5043         self.assertEqual(len(prefix), 1)
5044         self.assertEqual(prefix[0].prefix, global_pref64_n)
5045         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5046         self.assertEqual(prefix[0].vrf_id, 0)
5047
5048         # Add tenant specific prefix
5049         vrf1_pref64 = "2001:db8:122:300::"
5050         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5051         vrf1_pref64_len = 56
5052         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5053                                        vrf1_pref64_len,
5054                                        vrf_id=self.vrf1_id)
5055         prefix = self.vapi.nat64_prefix_dump()
5056         self.assertEqual(len(prefix), 2)
5057
5058         # Global prefix
5059         pkts = self.create_stream_in_ip6(self.pg0,
5060                                          self.pg1,
5061                                          pref=global_pref64,
5062                                          plen=global_pref64_len)
5063         self.pg0.add_stream(pkts)
5064         self.pg_enable_capture(self.pg_interfaces)
5065         self.pg_start()
5066         capture = self.pg1.get_capture(len(pkts))
5067         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5068                                 dst_ip=self.pg1.remote_ip4)
5069
5070         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5071         self.pg1.add_stream(pkts)
5072         self.pg_enable_capture(self.pg_interfaces)
5073         self.pg_start()
5074         capture = self.pg0.get_capture(len(pkts))
5075         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5076                                   global_pref64,
5077                                   global_pref64_len)
5078         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5079
5080         # Tenant specific prefix
5081         pkts = self.create_stream_in_ip6(self.pg2,
5082                                          self.pg1,
5083                                          pref=vrf1_pref64,
5084                                          plen=vrf1_pref64_len)
5085         self.pg2.add_stream(pkts)
5086         self.pg_enable_capture(self.pg_interfaces)
5087         self.pg_start()
5088         capture = self.pg1.get_capture(len(pkts))
5089         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5090                                 dst_ip=self.pg1.remote_ip4)
5091
5092         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5093         self.pg1.add_stream(pkts)
5094         self.pg_enable_capture(self.pg_interfaces)
5095         self.pg_start()
5096         capture = self.pg2.get_capture(len(pkts))
5097         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5098                                   vrf1_pref64,
5099                                   vrf1_pref64_len)
5100         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5101
5102     def test_unknown_proto(self):
5103         """ NAT64 translate packet with unknown protocol """
5104
5105         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5106                                                 self.nat_addr_n)
5107         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5108         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5109         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5110
5111         # in2out
5112         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5113              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5114              TCP(sport=self.tcp_port_in, dport=20))
5115         self.pg0.add_stream(p)
5116         self.pg_enable_capture(self.pg_interfaces)
5117         self.pg_start()
5118         p = self.pg1.get_capture(1)
5119
5120         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5121              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5122              GRE() /
5123              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5124              TCP(sport=1234, dport=1234))
5125         self.pg0.add_stream(p)
5126         self.pg_enable_capture(self.pg_interfaces)
5127         self.pg_start()
5128         p = self.pg1.get_capture(1)
5129         packet = p[0]
5130         try:
5131             self.assertEqual(packet[IP].src, self.nat_addr)
5132             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5133             self.assertTrue(packet.haslayer(GRE))
5134             self.check_ip_checksum(packet)
5135         except:
5136             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5137             raise
5138
5139         # out2in
5140         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5141              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5142              GRE() /
5143              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5144              TCP(sport=1234, dport=1234))
5145         self.pg1.add_stream(p)
5146         self.pg_enable_capture(self.pg_interfaces)
5147         self.pg_start()
5148         p = self.pg0.get_capture(1)
5149         packet = p[0]
5150         try:
5151             self.assertEqual(packet[IPv6].src, remote_ip6)
5152             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5153             self.assertEqual(packet[IPv6].nh, 47)
5154         except:
5155             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5156             raise
5157
5158     def test_hairpinning_unknown_proto(self):
5159         """ NAT64 translate packet with unknown protocol - hairpinning """
5160
5161         client = self.pg0.remote_hosts[0]
5162         server = self.pg0.remote_hosts[1]
5163         server_tcp_in_port = 22
5164         server_tcp_out_port = 4022
5165         client_tcp_in_port = 1234
5166         client_tcp_out_port = 1235
5167         server_nat_ip = "10.0.0.100"
5168         client_nat_ip = "10.0.0.110"
5169         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5170         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5171         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5172         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5173
5174         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5175                                                 client_nat_ip_n)
5176         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5177         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5178
5179         self.vapi.nat64_add_del_static_bib(server.ip6n,
5180                                            server_nat_ip_n,
5181                                            server_tcp_in_port,
5182                                            server_tcp_out_port,
5183                                            IP_PROTOS.tcp)
5184
5185         self.vapi.nat64_add_del_static_bib(server.ip6n,
5186                                            server_nat_ip_n,
5187                                            0,
5188                                            0,
5189                                            IP_PROTOS.gre)
5190
5191         self.vapi.nat64_add_del_static_bib(client.ip6n,
5192                                            client_nat_ip_n,
5193                                            client_tcp_in_port,
5194                                            client_tcp_out_port,
5195                                            IP_PROTOS.tcp)
5196
5197         # client to server
5198         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5199              IPv6(src=client.ip6, dst=server_nat_ip6) /
5200              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5201         self.pg0.add_stream(p)
5202         self.pg_enable_capture(self.pg_interfaces)
5203         self.pg_start()
5204         p = self.pg0.get_capture(1)
5205
5206         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5207              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5208              GRE() /
5209              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5210              TCP(sport=1234, dport=1234))
5211         self.pg0.add_stream(p)
5212         self.pg_enable_capture(self.pg_interfaces)
5213         self.pg_start()
5214         p = self.pg0.get_capture(1)
5215         packet = p[0]
5216         try:
5217             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5218             self.assertEqual(packet[IPv6].dst, server.ip6)
5219             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5220         except:
5221             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5222             raise
5223
5224         # server to client
5225         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5226              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5227              GRE() /
5228              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5229              TCP(sport=1234, dport=1234))
5230         self.pg0.add_stream(p)
5231         self.pg_enable_capture(self.pg_interfaces)
5232         self.pg_start()
5233         p = self.pg0.get_capture(1)
5234         packet = p[0]
5235         try:
5236             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5237             self.assertEqual(packet[IPv6].dst, client.ip6)
5238             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5239         except:
5240             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5241             raise
5242
5243     def test_one_armed_nat64(self):
5244         """ One armed NAT64 """
5245         external_port = 0
5246         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5247                                            '64:ff9b::',
5248                                            96)
5249
5250         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5251                                                 self.nat_addr_n)
5252         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5253         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5254
5255         # in2out
5256         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5257              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5258              TCP(sport=12345, dport=80))
5259         self.pg3.add_stream(p)
5260         self.pg_enable_capture(self.pg_interfaces)
5261         self.pg_start()
5262         capture = self.pg3.get_capture(1)
5263         p = capture[0]
5264         try:
5265             ip = p[IP]
5266             tcp = p[TCP]
5267             self.assertEqual(ip.src, self.nat_addr)
5268             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5269             self.assertNotEqual(tcp.sport, 12345)
5270             external_port = tcp.sport
5271             self.assertEqual(tcp.dport, 80)
5272             self.check_tcp_checksum(p)
5273             self.check_ip_checksum(p)
5274         except:
5275             self.logger.error(ppp("Unexpected or invalid packet:", p))
5276             raise
5277
5278         # out2in
5279         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5280              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5281              TCP(sport=80, dport=external_port))
5282         self.pg3.add_stream(p)
5283         self.pg_enable_capture(self.pg_interfaces)
5284         self.pg_start()
5285         capture = self.pg3.get_capture(1)
5286         p = capture[0]
5287         try:
5288             ip = p[IPv6]
5289             tcp = p[TCP]
5290             self.assertEqual(ip.src, remote_host_ip6)
5291             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5292             self.assertEqual(tcp.sport, 80)
5293             self.assertEqual(tcp.dport, 12345)
5294             self.check_tcp_checksum(p)
5295         except:
5296             self.logger.error(ppp("Unexpected or invalid packet:", p))
5297             raise
5298
5299     def test_frag_in_order(self):
5300         """ NAT64 translate fragments arriving in order """
5301         self.tcp_port_in = random.randint(1025, 65535)
5302
5303         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5304                                                 self.nat_addr_n)
5305         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5306         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5307
5308         reass = self.vapi.nat_reass_dump()
5309         reass_n_start = len(reass)
5310
5311         # in2out
5312         data = 'a' * 200
5313         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5314                                            self.tcp_port_in, 20, data)
5315         self.pg0.add_stream(pkts)
5316         self.pg_enable_capture(self.pg_interfaces)
5317         self.pg_start()
5318         frags = self.pg1.get_capture(len(pkts))
5319         p = self.reass_frags_and_verify(frags,
5320                                         self.nat_addr,
5321                                         self.pg1.remote_ip4)
5322         self.assertEqual(p[TCP].dport, 20)
5323         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5324         self.tcp_port_out = p[TCP].sport
5325         self.assertEqual(data, p[Raw].load)
5326
5327         # out2in
5328         data = "A" * 4 + "b" * 16 + "C" * 3
5329         pkts = self.create_stream_frag(self.pg1,
5330                                        self.nat_addr,
5331                                        20,
5332                                        self.tcp_port_out,
5333                                        data)
5334         self.pg1.add_stream(pkts)
5335         self.pg_enable_capture(self.pg_interfaces)
5336         self.pg_start()
5337         frags = self.pg0.get_capture(len(pkts))
5338         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5339         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5340         self.assertEqual(p[TCP].sport, 20)
5341         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5342         self.assertEqual(data, p[Raw].load)
5343
5344         reass = self.vapi.nat_reass_dump()
5345         reass_n_end = len(reass)
5346
5347         self.assertEqual(reass_n_end - reass_n_start, 2)
5348
5349     def test_reass_hairpinning(self):
5350         """ NAT64 fragments hairpinning """
5351         data = 'a' * 200
5352         client = self.pg0.remote_hosts[0]
5353         server = self.pg0.remote_hosts[1]
5354         server_in_port = random.randint(1025, 65535)
5355         server_out_port = random.randint(1025, 65535)
5356         client_in_port = random.randint(1025, 65535)
5357         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5358         nat_addr_ip6 = ip.src
5359
5360         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5361                                                 self.nat_addr_n)
5362         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5363         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5364
5365         # add static BIB entry for server
5366         self.vapi.nat64_add_del_static_bib(server.ip6n,
5367                                            self.nat_addr_n,
5368                                            server_in_port,
5369                                            server_out_port,
5370                                            IP_PROTOS.tcp)
5371
5372         # send packet from host to server
5373         pkts = self.create_stream_frag_ip6(self.pg0,
5374                                            self.nat_addr,
5375                                            client_in_port,
5376                                            server_out_port,
5377                                            data)
5378         self.pg0.add_stream(pkts)
5379         self.pg_enable_capture(self.pg_interfaces)
5380         self.pg_start()
5381         frags = self.pg0.get_capture(len(pkts))
5382         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5383         self.assertNotEqual(p[TCP].sport, client_in_port)
5384         self.assertEqual(p[TCP].dport, server_in_port)
5385         self.assertEqual(data, p[Raw].load)
5386
5387     def test_frag_out_of_order(self):
5388         """ NAT64 translate fragments arriving out of order """
5389         self.tcp_port_in = random.randint(1025, 65535)
5390
5391         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5392                                                 self.nat_addr_n)
5393         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5394         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5395
5396         # in2out
5397         data = 'a' * 200
5398         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5399                                            self.tcp_port_in, 20, data)
5400         pkts.reverse()
5401         self.pg0.add_stream(pkts)
5402         self.pg_enable_capture(self.pg_interfaces)
5403         self.pg_start()
5404         frags = self.pg1.get_capture(len(pkts))
5405         p = self.reass_frags_and_verify(frags,
5406                                         self.nat_addr,
5407                                         self.pg1.remote_ip4)
5408         self.assertEqual(p[TCP].dport, 20)
5409         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5410         self.tcp_port_out = p[TCP].sport
5411         self.assertEqual(data, p[Raw].load)
5412
5413         # out2in
5414         data = "A" * 4 + "B" * 16 + "C" * 3
5415         pkts = self.create_stream_frag(self.pg1,
5416                                        self.nat_addr,
5417                                        20,
5418                                        self.tcp_port_out,
5419                                        data)
5420         pkts.reverse()
5421         self.pg1.add_stream(pkts)
5422         self.pg_enable_capture(self.pg_interfaces)
5423         self.pg_start()
5424         frags = self.pg0.get_capture(len(pkts))
5425         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5426         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5427         self.assertEqual(p[TCP].sport, 20)
5428         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5429         self.assertEqual(data, p[Raw].load)
5430
5431     def test_interface_addr(self):
5432         """ Acquire NAT64 pool addresses from interface """
5433         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5434
5435         # no address in NAT64 pool
5436         adresses = self.vapi.nat44_address_dump()
5437         self.assertEqual(0, len(adresses))
5438
5439         # configure interface address and check NAT64 address pool
5440         self.pg4.config_ip4()
5441         addresses = self.vapi.nat64_pool_addr_dump()
5442         self.assertEqual(len(addresses), 1)
5443         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5444
5445         # remove interface address and check NAT64 address pool
5446         self.pg4.unconfig_ip4()
5447         addresses = self.vapi.nat64_pool_addr_dump()
5448         self.assertEqual(0, len(adresses))
5449
5450     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5451     def test_ipfix_max_bibs_sessions(self):
5452         """ IPFIX logging maximum session and BIB entries exceeded """
5453         max_bibs = 1280
5454         max_sessions = 2560
5455         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5456                                            '64:ff9b::',
5457                                            96)
5458
5459         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5460                                                 self.nat_addr_n)
5461         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5462         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5463
5464         pkts = []
5465         src = ""
5466         for i in range(0, max_bibs):
5467             src = "fd01:aa::%x" % (i)
5468             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5469                  IPv6(src=src, dst=remote_host_ip6) /
5470                  TCP(sport=12345, dport=80))
5471             pkts.append(p)
5472             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5473                  IPv6(src=src, dst=remote_host_ip6) /
5474                  TCP(sport=12345, dport=22))
5475             pkts.append(p)
5476         self.pg0.add_stream(pkts)
5477         self.pg_enable_capture(self.pg_interfaces)
5478         self.pg_start()
5479         self.pg1.get_capture(max_sessions)
5480
5481         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5482                                      src_address=self.pg3.local_ip4n,
5483                                      path_mtu=512,
5484                                      template_interval=10)
5485         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5486                             src_port=self.ipfix_src_port)
5487
5488         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5489              IPv6(src=src, dst=remote_host_ip6) /
5490              TCP(sport=12345, dport=25))
5491         self.pg0.add_stream(p)
5492         self.pg_enable_capture(self.pg_interfaces)
5493         self.pg_start()
5494         self.pg1.get_capture(0)
5495         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5496         capture = self.pg3.get_capture(9)
5497         ipfix = IPFIXDecoder()
5498         # first load template
5499         for p in capture:
5500             self.assertTrue(p.haslayer(IPFIX))
5501             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5502             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5503             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5504             self.assertEqual(p[UDP].dport, 4739)
5505             self.assertEqual(p[IPFIX].observationDomainID,
5506                              self.ipfix_domain_id)
5507             if p.haslayer(Template):
5508                 ipfix.add_template(p.getlayer(Template))
5509         # verify events in data set
5510         for p in capture:
5511             if p.haslayer(Data):
5512                 data = ipfix.decode_data_set(p.getlayer(Set))
5513                 self.verify_ipfix_max_sessions(data, max_sessions)
5514
5515         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5516              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5517              TCP(sport=12345, dport=80))
5518         self.pg0.add_stream(p)
5519         self.pg_enable_capture(self.pg_interfaces)
5520         self.pg_start()
5521         self.pg1.get_capture(0)
5522         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5523         capture = self.pg3.get_capture(1)
5524         # verify events in data set
5525         for p in capture:
5526             self.assertTrue(p.haslayer(IPFIX))
5527             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5528             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5529             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5530             self.assertEqual(p[UDP].dport, 4739)
5531             self.assertEqual(p[IPFIX].observationDomainID,
5532                              self.ipfix_domain_id)
5533             if p.haslayer(Data):
5534                 data = ipfix.decode_data_set(p.getlayer(Set))
5535                 self.verify_ipfix_max_bibs(data, max_bibs)
5536
5537     def test_ipfix_max_frags(self):
5538         """ IPFIX logging maximum fragments pending reassembly exceeded """
5539         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5540                                                 self.nat_addr_n)
5541         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5542         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5543         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5544         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5545                                      src_address=self.pg3.local_ip4n,
5546                                      path_mtu=512,
5547                                      template_interval=10)
5548         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5549                             src_port=self.ipfix_src_port)
5550
5551         data = 'a' * 200
5552         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5553                                            self.tcp_port_in, 20, data)
5554         self.pg0.add_stream(pkts[-1])
5555         self.pg_enable_capture(self.pg_interfaces)
5556         self.pg_start()
5557         self.pg1.get_capture(0)
5558         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5559         capture = self.pg3.get_capture(9)
5560         ipfix = IPFIXDecoder()
5561         # first load template
5562         for p in capture:
5563             self.assertTrue(p.haslayer(IPFIX))
5564             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5565             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5566             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5567             self.assertEqual(p[UDP].dport, 4739)
5568             self.assertEqual(p[IPFIX].observationDomainID,
5569                              self.ipfix_domain_id)
5570             if p.haslayer(Template):
5571                 ipfix.add_template(p.getlayer(Template))
5572         # verify events in data set
5573         for p in capture:
5574             if p.haslayer(Data):
5575                 data = ipfix.decode_data_set(p.getlayer(Set))
5576                 self.verify_ipfix_max_fragments_ip6(data, 0,
5577                                                     self.pg0.remote_ip6n)
5578
5579     def test_ipfix_bib_ses(self):
5580         """ IPFIX logging NAT64 BIB/session create and delete events """
5581         self.tcp_port_in = random.randint(1025, 65535)
5582         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5583                                            '64:ff9b::',
5584                                            96)
5585
5586         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5587                                                 self.nat_addr_n)
5588         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5589         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5590         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5591                                      src_address=self.pg3.local_ip4n,
5592                                      path_mtu=512,
5593                                      template_interval=10)
5594         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5595                             src_port=self.ipfix_src_port)
5596
5597         # Create
5598         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5599              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5600              TCP(sport=self.tcp_port_in, dport=25))
5601         self.pg0.add_stream(p)
5602         self.pg_enable_capture(self.pg_interfaces)
5603         self.pg_start()
5604         p = self.pg1.get_capture(1)
5605         self.tcp_port_out = p[0][TCP].sport
5606         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5607         capture = self.pg3.get_capture(10)
5608         ipfix = IPFIXDecoder()
5609         # first load template
5610         for p in capture:
5611             self.assertTrue(p.haslayer(IPFIX))
5612             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5613             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5614             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5615             self.assertEqual(p[UDP].dport, 4739)
5616             self.assertEqual(p[IPFIX].observationDomainID,
5617                              self.ipfix_domain_id)
5618             if p.haslayer(Template):
5619                 ipfix.add_template(p.getlayer(Template))
5620         # verify events in data set
5621         for p in capture:
5622             if p.haslayer(Data):
5623                 data = ipfix.decode_data_set(p.getlayer(Set))
5624                 if ord(data[0][230]) == 10:
5625                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5626                 elif ord(data[0][230]) == 6:
5627                     self.verify_ipfix_nat64_ses(data,
5628                                                 1,
5629                                                 self.pg0.remote_ip6n,
5630                                                 self.pg1.remote_ip4,
5631                                                 25)
5632                 else:
5633                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5634
5635         # Delete
5636         self.pg_enable_capture(self.pg_interfaces)
5637         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5638                                                 self.nat_addr_n,
5639                                                 is_add=0)
5640         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5641         capture = self.pg3.get_capture(2)
5642         # verify events in data set
5643         for p in capture:
5644             self.assertTrue(p.haslayer(IPFIX))
5645             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5646             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5647             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5648             self.assertEqual(p[UDP].dport, 4739)
5649             self.assertEqual(p[IPFIX].observationDomainID,
5650                              self.ipfix_domain_id)
5651             if p.haslayer(Data):
5652                 data = ipfix.decode_data_set(p.getlayer(Set))
5653                 if ord(data[0][230]) == 11:
5654                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5655                 elif ord(data[0][230]) == 7:
5656                     self.verify_ipfix_nat64_ses(data,
5657                                                 0,
5658                                                 self.pg0.remote_ip6n,
5659                                                 self.pg1.remote_ip4,
5660                                                 25)
5661                 else:
5662                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5663
5664     def nat64_get_ses_num(self):
5665         """
5666         Return number of active NAT64 sessions.
5667         """
5668         st = self.vapi.nat64_st_dump()
5669         return len(st)
5670
5671     def clear_nat64(self):
5672         """
5673         Clear NAT64 configuration.
5674         """
5675         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5676                             domain_id=self.ipfix_domain_id)
5677         self.ipfix_src_port = 4739
5678         self.ipfix_domain_id = 1
5679
5680         self.vapi.nat64_set_timeouts()
5681
5682         interfaces = self.vapi.nat64_interface_dump()
5683         for intf in interfaces:
5684             if intf.is_inside > 1:
5685                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5686                                                   0,
5687                                                   is_add=0)
5688             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5689                                               intf.is_inside,
5690                                               is_add=0)
5691
5692         bib = self.vapi.nat64_bib_dump(255)
5693         for bibe in bib:
5694             if bibe.is_static:
5695                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5696                                                    bibe.o_addr,
5697                                                    bibe.i_port,
5698                                                    bibe.o_port,
5699                                                    bibe.proto,
5700                                                    bibe.vrf_id,
5701                                                    is_add=0)
5702
5703         adresses = self.vapi.nat64_pool_addr_dump()
5704         for addr in adresses:
5705             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5706                                                     addr.address,
5707                                                     vrf_id=addr.vrf_id,
5708                                                     is_add=0)
5709
5710         prefixes = self.vapi.nat64_prefix_dump()
5711         for prefix in prefixes:
5712             self.vapi.nat64_add_del_prefix(prefix.prefix,
5713                                            prefix.prefix_len,
5714                                            vrf_id=prefix.vrf_id,
5715                                            is_add=0)
5716
5717     def tearDown(self):
5718         super(TestNAT64, self).tearDown()
5719         if not self.vpp_dead:
5720             self.logger.info(self.vapi.cli("show nat64 pool"))
5721             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5722             self.logger.info(self.vapi.cli("show nat64 prefix"))
5723             self.logger.info(self.vapi.cli("show nat64 bib all"))
5724             self.logger.info(self.vapi.cli("show nat64 session table all"))
5725             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5726             self.clear_nat64()
5727
5728
5729 class TestDSlite(MethodHolder):
5730     """ DS-Lite Test Cases """
5731
5732     @classmethod
5733     def setUpClass(cls):
5734         super(TestDSlite, cls).setUpClass()
5735
5736         try:
5737             cls.nat_addr = '10.0.0.3'
5738             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5739
5740             cls.create_pg_interfaces(range(2))
5741             cls.pg0.admin_up()
5742             cls.pg0.config_ip4()
5743             cls.pg0.resolve_arp()
5744             cls.pg1.admin_up()
5745             cls.pg1.config_ip6()
5746             cls.pg1.generate_remote_hosts(2)
5747             cls.pg1.configure_ipv6_neighbors()
5748
5749         except Exception:
5750             super(TestDSlite, cls).tearDownClass()
5751             raise
5752
5753     def test_dslite(self):
5754         """ Test DS-Lite """
5755         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5756                                                  self.nat_addr_n)
5757         aftr_ip4 = '192.0.0.1'
5758         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5759         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5760         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5761         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5762
5763         # UDP
5764         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5765              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5766              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5767              UDP(sport=20000, dport=10000))
5768         self.pg1.add_stream(p)
5769         self.pg_enable_capture(self.pg_interfaces)
5770         self.pg_start()
5771         capture = self.pg0.get_capture(1)
5772         capture = capture[0]
5773         self.assertFalse(capture.haslayer(IPv6))
5774         self.assertEqual(capture[IP].src, self.nat_addr)
5775         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5776         self.assertNotEqual(capture[UDP].sport, 20000)
5777         self.assertEqual(capture[UDP].dport, 10000)
5778         self.check_ip_checksum(capture)
5779         out_port = capture[UDP].sport
5780
5781         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5782              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5783              UDP(sport=10000, dport=out_port))
5784         self.pg0.add_stream(p)
5785         self.pg_enable_capture(self.pg_interfaces)
5786         self.pg_start()
5787         capture = self.pg1.get_capture(1)
5788         capture = capture[0]
5789         self.assertEqual(capture[IPv6].src, aftr_ip6)
5790         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5791         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5792         self.assertEqual(capture[IP].dst, '192.168.1.1')
5793         self.assertEqual(capture[UDP].sport, 10000)
5794         self.assertEqual(capture[UDP].dport, 20000)
5795         self.check_ip_checksum(capture)
5796
5797         # TCP
5798         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5799              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5800              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5801              TCP(sport=20001, dport=10001))
5802         self.pg1.add_stream(p)
5803         self.pg_enable_capture(self.pg_interfaces)
5804         self.pg_start()
5805         capture = self.pg0.get_capture(1)
5806         capture = capture[0]
5807         self.assertFalse(capture.haslayer(IPv6))
5808         self.assertEqual(capture[IP].src, self.nat_addr)
5809         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5810         self.assertNotEqual(capture[TCP].sport, 20001)
5811         self.assertEqual(capture[TCP].dport, 10001)
5812         self.check_ip_checksum(capture)
5813         self.check_tcp_checksum(capture)
5814         out_port = capture[TCP].sport
5815
5816         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5817              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5818              TCP(sport=10001, dport=out_port))
5819         self.pg0.add_stream(p)
5820         self.pg_enable_capture(self.pg_interfaces)
5821         self.pg_start()
5822         capture = self.pg1.get_capture(1)
5823         capture = capture[0]
5824         self.assertEqual(capture[IPv6].src, aftr_ip6)
5825         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5826         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5827         self.assertEqual(capture[IP].dst, '192.168.1.1')
5828         self.assertEqual(capture[TCP].sport, 10001)
5829         self.assertEqual(capture[TCP].dport, 20001)
5830         self.check_ip_checksum(capture)
5831         self.check_tcp_checksum(capture)
5832
5833         # ICMP
5834         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5835              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5836              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5837              ICMP(id=4000, type='echo-request'))
5838         self.pg1.add_stream(p)
5839         self.pg_enable_capture(self.pg_interfaces)
5840         self.pg_start()
5841         capture = self.pg0.get_capture(1)
5842         capture = capture[0]
5843         self.assertFalse(capture.haslayer(IPv6))
5844         self.assertEqual(capture[IP].src, self.nat_addr)
5845         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5846         self.assertNotEqual(capture[ICMP].id, 4000)
5847         self.check_ip_checksum(capture)
5848         self.check_icmp_checksum(capture)
5849         out_id = capture[ICMP].id
5850
5851         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5852              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5853              ICMP(id=out_id, type='echo-reply'))
5854         self.pg0.add_stream(p)
5855         self.pg_enable_capture(self.pg_interfaces)
5856         self.pg_start()
5857         capture = self.pg1.get_capture(1)
5858         capture = capture[0]
5859         self.assertEqual(capture[IPv6].src, aftr_ip6)
5860         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5861         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5862         self.assertEqual(capture[IP].dst, '192.168.1.1')
5863         self.assertEqual(capture[ICMP].id, 4000)
5864         self.check_ip_checksum(capture)
5865         self.check_icmp_checksum(capture)
5866
5867         # ping DS-Lite AFTR tunnel endpoint address
5868         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5869              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5870              ICMPv6EchoRequest())
5871         self.pg1.add_stream(p)
5872         self.pg_enable_capture(self.pg_interfaces)
5873         self.pg_start()
5874         capture = self.pg1.get_capture(1)
5875         self.assertEqual(1, len(capture))
5876         capture = capture[0]
5877         self.assertEqual(capture[IPv6].src, aftr_ip6)
5878         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5879         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5880
5881     def tearDown(self):
5882         super(TestDSlite, self).tearDown()
5883         if not self.vpp_dead:
5884             self.logger.info(self.vapi.cli("show dslite pool"))
5885             self.logger.info(
5886                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5887             self.logger.info(self.vapi.cli("show dslite sessions"))
5888
5889
5890 class TestDSliteCE(MethodHolder):
5891     """ DS-Lite CE Test Cases """
5892
5893     @classmethod
5894     def setUpConstants(cls):
5895         super(TestDSliteCE, cls).setUpConstants()
5896         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5897
5898     @classmethod
5899     def setUpClass(cls):
5900         super(TestDSliteCE, cls).setUpClass()
5901
5902         try:
5903             cls.create_pg_interfaces(range(2))
5904             cls.pg0.admin_up()
5905             cls.pg0.config_ip4()
5906             cls.pg0.resolve_arp()
5907             cls.pg1.admin_up()
5908             cls.pg1.config_ip6()
5909             cls.pg1.generate_remote_hosts(1)
5910             cls.pg1.configure_ipv6_neighbors()
5911
5912         except Exception:
5913             super(TestDSliteCE, cls).tearDownClass()
5914             raise
5915
5916     def test_dslite_ce(self):
5917         """ Test DS-Lite CE """
5918
5919         b4_ip4 = '192.0.0.2'
5920         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5921         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5922         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5923         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5924
5925         aftr_ip4 = '192.0.0.1'
5926         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5927         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5928         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5929         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5930
5931         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5932                                    dst_address_length=128,
5933                                    next_hop_address=self.pg1.remote_ip6n,
5934                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5935                                    is_ipv6=1)
5936
5937         # UDP encapsulation
5938         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5939              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5940              UDP(sport=10000, dport=20000))
5941         self.pg0.add_stream(p)
5942         self.pg_enable_capture(self.pg_interfaces)
5943         self.pg_start()
5944         capture = self.pg1.get_capture(1)
5945         capture = capture[0]
5946         self.assertEqual(capture[IPv6].src, b4_ip6)
5947         self.assertEqual(capture[IPv6].dst, aftr_ip6)
5948         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5949         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5950         self.assertEqual(capture[UDP].sport, 10000)
5951         self.assertEqual(capture[UDP].dport, 20000)
5952         self.check_ip_checksum(capture)
5953
5954         # UDP decapsulation
5955         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5956              IPv6(dst=b4_ip6, src=aftr_ip6) /
5957              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5958              UDP(sport=20000, dport=10000))
5959         self.pg1.add_stream(p)
5960         self.pg_enable_capture(self.pg_interfaces)
5961         self.pg_start()
5962         capture = self.pg0.get_capture(1)
5963         capture = capture[0]
5964         self.assertFalse(capture.haslayer(IPv6))
5965         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5966         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5967         self.assertEqual(capture[UDP].sport, 20000)
5968         self.assertEqual(capture[UDP].dport, 10000)
5969         self.check_ip_checksum(capture)
5970
5971         # ping DS-Lite B4 tunnel endpoint address
5972         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5973              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5974              ICMPv6EchoRequest())
5975         self.pg1.add_stream(p)
5976         self.pg_enable_capture(self.pg_interfaces)
5977         self.pg_start()
5978         capture = self.pg1.get_capture(1)
5979         self.assertEqual(1, len(capture))
5980         capture = capture[0]
5981         self.assertEqual(capture[IPv6].src, b4_ip6)
5982         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5983         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5984
5985     def tearDown(self):
5986         super(TestDSliteCE, self).tearDown()
5987         if not self.vpp_dead:
5988             self.logger.info(
5989                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5990             self.logger.info(
5991                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5992
5993 if __name__ == '__main__':
5994     unittest.main(testRunner=VppTestRunner)