NAT: nat.c refactor (split out CLI) (VPP-1140)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 is_add=0)
1034
1035         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036         for lb_sm in lb_static_mappings:
1037             self.vapi.nat44_add_del_lb_static_mapping(
1038                 lb_sm.external_addr,
1039                 lb_sm.external_port,
1040                 lb_sm.protocol,
1041                 vrf_id=lb_sm.vrf_id,
1042                 twice_nat=lb_sm.twice_nat,
1043                 out2in_only=lb_sm.out2in_only,
1044                 is_add=0,
1045                 local_num=0,
1046                 locals=[])
1047
1048         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049         for id_m in identity_mappings:
1050             self.vapi.nat44_add_del_identity_mapping(
1051                 addr_only=id_m.addr_only,
1052                 ip=id_m.ip_address,
1053                 port=id_m.port,
1054                 sw_if_index=id_m.sw_if_index,
1055                 vrf_id=id_m.vrf_id,
1056                 protocol=id_m.protocol,
1057                 is_add=0)
1058
1059         adresses = self.vapi.nat44_address_dump()
1060         for addr in adresses:
1061             self.vapi.nat44_add_del_address_range(addr.ip_address,
1062                                                   addr.ip_address,
1063                                                   twice_nat=addr.twice_nat,
1064                                                   is_add=0)
1065
1066         self.vapi.nat_set_reass()
1067         self.vapi.nat_set_reass(is_ip6=1)
1068
1069     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070                                  local_port=0, external_port=0, vrf_id=0,
1071                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1072                                  proto=0, twice_nat=0, out2in_only=0):
1073         """
1074         Add/delete NAT44 static mapping
1075
1076         :param local_ip: Local IP address
1077         :param external_ip: External IP address
1078         :param local_port: Local port number (Optional)
1079         :param external_port: External port number (Optional)
1080         :param vrf_id: VRF ID (Default 0)
1081         :param is_add: 1 if add, 0 if delete (Default add)
1082         :param external_sw_if_index: External interface instead of IP address
1083         :param proto: IP protocol (Mandatory if port specified)
1084         :param twice_nat: 1 if translate external host address and port
1085         :param out2in_only: if 1 rule is matching only out2in direction
1086         """
1087         addr_only = 1
1088         if local_port and external_port:
1089             addr_only = 0
1090         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092         self.vapi.nat44_add_del_static_mapping(
1093             l_ip,
1094             e_ip,
1095             external_sw_if_index,
1096             local_port,
1097             external_port,
1098             addr_only,
1099             vrf_id,
1100             proto,
1101             twice_nat,
1102             out2in_only,
1103             is_add)
1104
1105     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1106         """
1107         Add/delete NAT44 address
1108
1109         :param ip: IP address
1110         :param is_add: 1 if add, 0 if delete (Default add)
1111         :param twice_nat: twice NAT address for extenal hosts
1112         """
1113         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1115                                               vrf_id=vrf_id,
1116                                               twice_nat=twice_nat)
1117
1118     def test_dynamic(self):
1119         """ NAT44 dynamic translation test """
1120
1121         self.nat44_add_address(self.nat_addr)
1122         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124                                                   is_inside=0)
1125
1126         # in2out
1127         pkts = self.create_stream_in(self.pg0, self.pg1)
1128         self.pg0.add_stream(pkts)
1129         self.pg_enable_capture(self.pg_interfaces)
1130         self.pg_start()
1131         capture = self.pg1.get_capture(len(pkts))
1132         self.verify_capture_out(capture)
1133
1134         # out2in
1135         pkts = self.create_stream_out(self.pg1)
1136         self.pg1.add_stream(pkts)
1137         self.pg_enable_capture(self.pg_interfaces)
1138         self.pg_start()
1139         capture = self.pg0.get_capture(len(pkts))
1140         self.verify_capture_in(capture, self.pg0)
1141
1142     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143         """ NAT44 handling of client packets with TTL=1 """
1144
1145         self.nat44_add_address(self.nat_addr)
1146         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1148                                                   is_inside=0)
1149
1150         # Client side - generate traffic
1151         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152         self.pg0.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155
1156         # Client side - verify ICMP type 11 packets
1157         capture = self.pg0.get_capture(len(pkts))
1158         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1159
1160     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161         """ NAT44 handling of server packets with TTL=1 """
1162
1163         self.nat44_add_address(self.nat_addr)
1164         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1166                                                   is_inside=0)
1167
1168         # Client side - create sessions
1169         pkts = self.create_stream_in(self.pg0, self.pg1)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173
1174         # Server side - generate traffic
1175         capture = self.pg1.get_capture(len(pkts))
1176         self.verify_capture_out(capture)
1177         pkts = self.create_stream_out(self.pg1, ttl=1)
1178         self.pg1.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181
1182         # Server side - verify ICMP type 11 packets
1183         capture = self.pg1.get_capture(len(pkts))
1184         self.verify_capture_out_with_icmp_errors(capture,
1185                                                  src_ip=self.pg1.local_ip4)
1186
1187     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188         """ NAT44 handling of error responses to client packets with TTL=2 """
1189
1190         self.nat44_add_address(self.nat_addr)
1191         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1193                                                   is_inside=0)
1194
1195         # Client side - generate traffic
1196         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197         self.pg0.add_stream(pkts)
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pg_start()
1200
1201         # Server side - simulate ICMP type 11 response
1202         capture = self.pg1.get_capture(len(pkts))
1203         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205                 ICMP(type=11) / packet[IP] for packet in capture]
1206         self.pg1.add_stream(pkts)
1207         self.pg_enable_capture(self.pg_interfaces)
1208         self.pg_start()
1209
1210         # Client side - verify ICMP type 11 packets
1211         capture = self.pg0.get_capture(len(pkts))
1212         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1213
1214     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215         """ NAT44 handling of error responses to server packets with TTL=2 """
1216
1217         self.nat44_add_address(self.nat_addr)
1218         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220                                                   is_inside=0)
1221
1222         # Client side - create sessions
1223         pkts = self.create_stream_in(self.pg0, self.pg1)
1224         self.pg0.add_stream(pkts)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227
1228         # Server side - generate traffic
1229         capture = self.pg1.get_capture(len(pkts))
1230         self.verify_capture_out(capture)
1231         pkts = self.create_stream_out(self.pg1, ttl=2)
1232         self.pg1.add_stream(pkts)
1233         self.pg_enable_capture(self.pg_interfaces)
1234         self.pg_start()
1235
1236         # Client side - simulate ICMP type 11 response
1237         capture = self.pg0.get_capture(len(pkts))
1238         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240                 ICMP(type=11) / packet[IP] for packet in capture]
1241         self.pg0.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244
1245         # Server side - verify ICMP type 11 packets
1246         capture = self.pg1.get_capture(len(pkts))
1247         self.verify_capture_out_with_icmp_errors(capture)
1248
1249     def test_ping_out_interface_from_outside(self):
1250         """ Ping NAT44 out interface from outside network """
1251
1252         self.nat44_add_address(self.nat_addr)
1253         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1255                                                   is_inside=0)
1256
1257         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259              ICMP(id=self.icmp_id_out, type='echo-request'))
1260         pkts = [p]
1261         self.pg1.add_stream(pkts)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg1.get_capture(len(pkts))
1265         self.assertEqual(1, len(capture))
1266         packet = capture[0]
1267         try:
1268             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1272         except:
1273             self.logger.error(ppp("Unexpected or invalid packet "
1274                                   "(outside network):", packet))
1275             raise
1276
1277     def test_ping_internal_host_from_outside(self):
1278         """ Ping internal host from outside network """
1279
1280         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1283                                                   is_inside=0)
1284
1285         # out2in
1286         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288                ICMP(id=self.icmp_id_out, type='echo-request'))
1289         self.pg1.add_stream(pkt)
1290         self.pg_enable_capture(self.pg_interfaces)
1291         self.pg_start()
1292         capture = self.pg0.get_capture(1)
1293         self.verify_capture_in(capture, self.pg0, packet_num=1)
1294         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1295
1296         # in2out
1297         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299                ICMP(id=self.icmp_id_in, type='echo-reply'))
1300         self.pg0.add_stream(pkt)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg1.get_capture(1)
1304         self.verify_capture_out(capture, same_port=True, packet_num=1)
1305         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1306
1307     def test_forwarding(self):
1308         """ NAT44 forwarding test """
1309
1310         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1312                                                   is_inside=0)
1313         self.vapi.nat44_forwarding_enable_disable(1)
1314
1315         real_ip = self.pg0.remote_ip4n
1316         alias_ip = self.nat_addr_n
1317         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318                                                external_ip=alias_ip)
1319
1320         try:
1321             # in2out - static mapping match
1322
1323             pkts = self.create_stream_out(self.pg1)
1324             self.pg1.add_stream(pkts)
1325             self.pg_enable_capture(self.pg_interfaces)
1326             self.pg_start()
1327             capture = self.pg0.get_capture(len(pkts))
1328             self.verify_capture_in(capture, self.pg0)
1329
1330             pkts = self.create_stream_in(self.pg0, self.pg1)
1331             self.pg0.add_stream(pkts)
1332             self.pg_enable_capture(self.pg_interfaces)
1333             self.pg_start()
1334             capture = self.pg1.get_capture(len(pkts))
1335             self.verify_capture_out(capture, same_port=True)
1336
1337             # in2out - no static mapping match
1338
1339             host0 = self.pg0.remote_hosts[0]
1340             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1341             try:
1342                 pkts = self.create_stream_out(self.pg1,
1343                                               dst_ip=self.pg0.remote_ip4,
1344                                               use_inside_ports=True)
1345                 self.pg1.add_stream(pkts)
1346                 self.pg_enable_capture(self.pg_interfaces)
1347                 self.pg_start()
1348                 capture = self.pg0.get_capture(len(pkts))
1349                 self.verify_capture_in(capture, self.pg0)
1350
1351                 pkts = self.create_stream_in(self.pg0, self.pg1)
1352                 self.pg0.add_stream(pkts)
1353                 self.pg_enable_capture(self.pg_interfaces)
1354                 self.pg_start()
1355                 capture = self.pg1.get_capture(len(pkts))
1356                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1357                                         same_port=True)
1358             finally:
1359                 self.pg0.remote_hosts[0] = host0
1360
1361         finally:
1362             self.vapi.nat44_forwarding_enable_disable(0)
1363             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364                                                    external_ip=alias_ip,
1365                                                    is_add=0)
1366
1367     def test_static_in(self):
1368         """ 1:1 NAT initialized from inside network """
1369
1370         nat_ip = "10.0.0.10"
1371         self.tcp_port_out = 6303
1372         self.udp_port_out = 6304
1373         self.icmp_id_out = 6305
1374
1375         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1378                                                   is_inside=0)
1379
1380         # in2out
1381         pkts = self.create_stream_in(self.pg0, self.pg1)
1382         self.pg0.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg1.get_capture(len(pkts))
1386         self.verify_capture_out(capture, nat_ip, True)
1387
1388         # out2in
1389         pkts = self.create_stream_out(self.pg1, nat_ip)
1390         self.pg1.add_stream(pkts)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg0.get_capture(len(pkts))
1394         self.verify_capture_in(capture, self.pg0)
1395
1396     def test_static_out(self):
1397         """ 1:1 NAT initialized from outside network """
1398
1399         nat_ip = "10.0.0.20"
1400         self.tcp_port_out = 6303
1401         self.udp_port_out = 6304
1402         self.icmp_id_out = 6305
1403
1404         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1407                                                   is_inside=0)
1408
1409         # out2in
1410         pkts = self.create_stream_out(self.pg1, nat_ip)
1411         self.pg1.add_stream(pkts)
1412         self.pg_enable_capture(self.pg_interfaces)
1413         self.pg_start()
1414         capture = self.pg0.get_capture(len(pkts))
1415         self.verify_capture_in(capture, self.pg0)
1416
1417         # in2out
1418         pkts = self.create_stream_in(self.pg0, self.pg1)
1419         self.pg0.add_stream(pkts)
1420         self.pg_enable_capture(self.pg_interfaces)
1421         self.pg_start()
1422         capture = self.pg1.get_capture(len(pkts))
1423         self.verify_capture_out(capture, nat_ip, True)
1424
1425     def test_static_with_port_in(self):
1426         """ 1:1 NAPT initialized from inside network """
1427
1428         self.tcp_port_out = 3606
1429         self.udp_port_out = 3607
1430         self.icmp_id_out = 3608
1431
1432         self.nat44_add_address(self.nat_addr)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.tcp_port_in, self.tcp_port_out,
1435                                       proto=IP_PROTOS.tcp)
1436         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437                                       self.udp_port_in, self.udp_port_out,
1438                                       proto=IP_PROTOS.udp)
1439         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440                                       self.icmp_id_in, self.icmp_id_out,
1441                                       proto=IP_PROTOS.icmp)
1442         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1444                                                   is_inside=0)
1445
1446         # in2out
1447         pkts = self.create_stream_in(self.pg0, self.pg1)
1448         self.pg0.add_stream(pkts)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg1.get_capture(len(pkts))
1452         self.verify_capture_out(capture)
1453
1454         # out2in
1455         pkts = self.create_stream_out(self.pg1)
1456         self.pg1.add_stream(pkts)
1457         self.pg_enable_capture(self.pg_interfaces)
1458         self.pg_start()
1459         capture = self.pg0.get_capture(len(pkts))
1460         self.verify_capture_in(capture, self.pg0)
1461
1462     def test_static_with_port_out(self):
1463         """ 1:1 NAPT initialized from outside network """
1464
1465         self.tcp_port_out = 30606
1466         self.udp_port_out = 30607
1467         self.icmp_id_out = 30608
1468
1469         self.nat44_add_address(self.nat_addr)
1470         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471                                       self.tcp_port_in, self.tcp_port_out,
1472                                       proto=IP_PROTOS.tcp)
1473         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474                                       self.udp_port_in, self.udp_port_out,
1475                                       proto=IP_PROTOS.udp)
1476         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477                                       self.icmp_id_in, self.icmp_id_out,
1478                                       proto=IP_PROTOS.icmp)
1479         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481                                                   is_inside=0)
1482
1483         # out2in
1484         pkts = self.create_stream_out(self.pg1)
1485         self.pg1.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg0.get_capture(len(pkts))
1489         self.verify_capture_in(capture, self.pg0)
1490
1491         # in2out
1492         pkts = self.create_stream_in(self.pg0, self.pg1)
1493         self.pg0.add_stream(pkts)
1494         self.pg_enable_capture(self.pg_interfaces)
1495         self.pg_start()
1496         capture = self.pg1.get_capture(len(pkts))
1497         self.verify_capture_out(capture)
1498
1499     def test_static_with_port_out2(self):
1500         """ 1:1 NAPT symmetrical rule """
1501
1502         external_port = 80
1503         local_port = 8080
1504
1505         self.vapi.nat44_forwarding_enable_disable(1)
1506         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507                                       local_port, external_port,
1508                                       proto=IP_PROTOS.tcp, out2in_only=1)
1509         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1511                                                   is_inside=0)
1512
1513         # from client to service
1514         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516              TCP(sport=12345, dport=external_port))
1517         self.pg1.add_stream(p)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(1)
1521         p = capture[0]
1522         server = None
1523         try:
1524             ip = p[IP]
1525             tcp = p[TCP]
1526             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527             self.assertEqual(tcp.dport, local_port)
1528             self.check_tcp_checksum(p)
1529             self.check_ip_checksum(p)
1530         except:
1531             self.logger.error(ppp("Unexpected or invalid packet:", p))
1532             raise
1533
1534         # from service back to client
1535         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537              TCP(sport=local_port, dport=12345))
1538         self.pg0.add_stream(p)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         capture = self.pg1.get_capture(1)
1542         p = capture[0]
1543         try:
1544             ip = p[IP]
1545             tcp = p[TCP]
1546             self.assertEqual(ip.src, self.nat_addr)
1547             self.assertEqual(tcp.sport, external_port)
1548             self.check_tcp_checksum(p)
1549             self.check_ip_checksum(p)
1550         except:
1551             self.logger.error(ppp("Unexpected or invalid packet:", p))
1552             raise
1553
1554         # from client to server (no translation)
1555         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557              TCP(sport=12346, dport=local_port))
1558         self.pg1.add_stream(p)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg0.get_capture(1)
1562         p = capture[0]
1563         server = None
1564         try:
1565             ip = p[IP]
1566             tcp = p[TCP]
1567             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568             self.assertEqual(tcp.dport, local_port)
1569             self.check_tcp_checksum(p)
1570             self.check_ip_checksum(p)
1571         except:
1572             self.logger.error(ppp("Unexpected or invalid packet:", p))
1573             raise
1574
1575         # from service back to client (no translation)
1576         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578              TCP(sport=local_port, dport=12346))
1579         self.pg0.add_stream(p)
1580         self.pg_enable_capture(self.pg_interfaces)
1581         self.pg_start()
1582         capture = self.pg1.get_capture(1)
1583         p = capture[0]
1584         try:
1585             ip = p[IP]
1586             tcp = p[TCP]
1587             self.assertEqual(ip.src, self.pg0.remote_ip4)
1588             self.assertEqual(tcp.sport, local_port)
1589             self.check_tcp_checksum(p)
1590             self.check_ip_checksum(p)
1591         except:
1592             self.logger.error(ppp("Unexpected or invalid packet:", p))
1593             raise
1594
1595     def test_static_vrf_aware(self):
1596         """ 1:1 NAT VRF awareness """
1597
1598         nat_ip1 = "10.0.0.30"
1599         nat_ip2 = "10.0.0.40"
1600         self.tcp_port_out = 6303
1601         self.udp_port_out = 6304
1602         self.icmp_id_out = 6305
1603
1604         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1605                                       vrf_id=10)
1606         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1607                                       vrf_id=10)
1608         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1609                                                   is_inside=0)
1610         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1612
1613         # inside interface VRF match NAT44 static mapping VRF
1614         pkts = self.create_stream_in(self.pg4, self.pg3)
1615         self.pg4.add_stream(pkts)
1616         self.pg_enable_capture(self.pg_interfaces)
1617         self.pg_start()
1618         capture = self.pg3.get_capture(len(pkts))
1619         self.verify_capture_out(capture, nat_ip1, True)
1620
1621         # inside interface VRF don't match NAT44 static mapping VRF (packets
1622         # are dropped)
1623         pkts = self.create_stream_in(self.pg0, self.pg3)
1624         self.pg0.add_stream(pkts)
1625         self.pg_enable_capture(self.pg_interfaces)
1626         self.pg_start()
1627         self.pg3.assert_nothing_captured()
1628
1629     def test_identity_nat(self):
1630         """ Identity NAT """
1631
1632         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1635                                                   is_inside=0)
1636
1637         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639              TCP(sport=12345, dport=56789))
1640         self.pg1.add_stream(p)
1641         self.pg_enable_capture(self.pg_interfaces)
1642         self.pg_start()
1643         capture = self.pg0.get_capture(1)
1644         p = capture[0]
1645         try:
1646             ip = p[IP]
1647             tcp = p[TCP]
1648             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649             self.assertEqual(ip.src, self.pg1.remote_ip4)
1650             self.assertEqual(tcp.dport, 56789)
1651             self.assertEqual(tcp.sport, 12345)
1652             self.check_tcp_checksum(p)
1653             self.check_ip_checksum(p)
1654         except:
1655             self.logger.error(ppp("Unexpected or invalid packet:", p))
1656             raise
1657
1658     def test_static_lb(self):
1659         """ NAT44 local service load balancing """
1660         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1661         external_port = 80
1662         local_port = 8080
1663         server1 = self.pg0.remote_hosts[0]
1664         server2 = self.pg0.remote_hosts[1]
1665
1666         locals = [{'addr': server1.ip4n,
1667                    'port': local_port,
1668                    'probability': 70},
1669                   {'addr': server2.ip4n,
1670                    'port': local_port,
1671                    'probability': 30}]
1672
1673         self.nat44_add_address(self.nat_addr)
1674         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1675                                                   external_port,
1676                                                   IP_PROTOS.tcp,
1677                                                   local_num=len(locals),
1678                                                   locals=locals)
1679         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1681                                                   is_inside=0)
1682
1683         # from client to service
1684         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686              TCP(sport=12345, dport=external_port))
1687         self.pg1.add_stream(p)
1688         self.pg_enable_capture(self.pg_interfaces)
1689         self.pg_start()
1690         capture = self.pg0.get_capture(1)
1691         p = capture[0]
1692         server = None
1693         try:
1694             ip = p[IP]
1695             tcp = p[TCP]
1696             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697             if ip.dst == server1.ip4:
1698                 server = server1
1699             else:
1700                 server = server2
1701             self.assertEqual(tcp.dport, local_port)
1702             self.check_tcp_checksum(p)
1703             self.check_ip_checksum(p)
1704         except:
1705             self.logger.error(ppp("Unexpected or invalid packet:", p))
1706             raise
1707
1708         # from service back to client
1709         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711              TCP(sport=local_port, dport=12345))
1712         self.pg0.add_stream(p)
1713         self.pg_enable_capture(self.pg_interfaces)
1714         self.pg_start()
1715         capture = self.pg1.get_capture(1)
1716         p = capture[0]
1717         try:
1718             ip = p[IP]
1719             tcp = p[TCP]
1720             self.assertEqual(ip.src, self.nat_addr)
1721             self.assertEqual(tcp.sport, external_port)
1722             self.check_tcp_checksum(p)
1723             self.check_ip_checksum(p)
1724         except:
1725             self.logger.error(ppp("Unexpected or invalid packet:", p))
1726             raise
1727
1728         # multiple clients
1729         server1_n = 0
1730         server2_n = 0
1731         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1732         pkts = []
1733         for client in clients:
1734             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735                  IP(src=client, dst=self.nat_addr) /
1736                  TCP(sport=12345, dport=external_port))
1737             pkts.append(p)
1738         self.pg1.add_stream(pkts)
1739         self.pg_enable_capture(self.pg_interfaces)
1740         self.pg_start()
1741         capture = self.pg0.get_capture(len(pkts))
1742         for p in capture:
1743             if p[IP].dst == server1.ip4:
1744                 server1_n += 1
1745             else:
1746                 server2_n += 1
1747         self.assertTrue(server1_n > server2_n)
1748
1749     def test_static_lb_2(self):
1750         """ NAT44 local service load balancing (asymmetrical rule) """
1751         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1752         external_port = 80
1753         local_port = 8080
1754         server1 = self.pg0.remote_hosts[0]
1755         server2 = self.pg0.remote_hosts[1]
1756
1757         locals = [{'addr': server1.ip4n,
1758                    'port': local_port,
1759                    'probability': 70},
1760                   {'addr': server2.ip4n,
1761                    'port': local_port,
1762                    'probability': 30}]
1763
1764         self.vapi.nat44_forwarding_enable_disable(1)
1765         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1766                                                   external_port,
1767                                                   IP_PROTOS.tcp,
1768                                                   out2in_only=1,
1769                                                   local_num=len(locals),
1770                                                   locals=locals)
1771         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1773                                                   is_inside=0)
1774
1775         # from client to service
1776         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778              TCP(sport=12345, dport=external_port))
1779         self.pg1.add_stream(p)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782         capture = self.pg0.get_capture(1)
1783         p = capture[0]
1784         server = None
1785         try:
1786             ip = p[IP]
1787             tcp = p[TCP]
1788             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789             if ip.dst == server1.ip4:
1790                 server = server1
1791             else:
1792                 server = server2
1793             self.assertEqual(tcp.dport, local_port)
1794             self.check_tcp_checksum(p)
1795             self.check_ip_checksum(p)
1796         except:
1797             self.logger.error(ppp("Unexpected or invalid packet:", p))
1798             raise
1799
1800         # from service back to client
1801         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803              TCP(sport=local_port, dport=12345))
1804         self.pg0.add_stream(p)
1805         self.pg_enable_capture(self.pg_interfaces)
1806         self.pg_start()
1807         capture = self.pg1.get_capture(1)
1808         p = capture[0]
1809         try:
1810             ip = p[IP]
1811             tcp = p[TCP]
1812             self.assertEqual(ip.src, self.nat_addr)
1813             self.assertEqual(tcp.sport, external_port)
1814             self.check_tcp_checksum(p)
1815             self.check_ip_checksum(p)
1816         except:
1817             self.logger.error(ppp("Unexpected or invalid packet:", p))
1818             raise
1819
1820         # from client to server (no translation)
1821         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823              TCP(sport=12346, dport=local_port))
1824         self.pg1.add_stream(p)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg0.get_capture(1)
1828         p = capture[0]
1829         server = None
1830         try:
1831             ip = p[IP]
1832             tcp = p[TCP]
1833             self.assertEqual(ip.dst, server1.ip4)
1834             self.assertEqual(tcp.dport, local_port)
1835             self.check_tcp_checksum(p)
1836             self.check_ip_checksum(p)
1837         except:
1838             self.logger.error(ppp("Unexpected or invalid packet:", p))
1839             raise
1840
1841         # from service back to client (no translation)
1842         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844              TCP(sport=local_port, dport=12346))
1845         self.pg0.add_stream(p)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg1.get_capture(1)
1849         p = capture[0]
1850         try:
1851             ip = p[IP]
1852             tcp = p[TCP]
1853             self.assertEqual(ip.src, server1.ip4)
1854             self.assertEqual(tcp.sport, local_port)
1855             self.check_tcp_checksum(p)
1856             self.check_ip_checksum(p)
1857         except:
1858             self.logger.error(ppp("Unexpected or invalid packet:", p))
1859             raise
1860
1861     def test_multiple_inside_interfaces(self):
1862         """ NAT44 multiple non-overlapping address space inside interfaces """
1863
1864         self.nat44_add_address(self.nat_addr)
1865         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1868                                                   is_inside=0)
1869
1870         # between two NAT44 inside interfaces (no translation)
1871         pkts = self.create_stream_in(self.pg0, self.pg1)
1872         self.pg0.add_stream(pkts)
1873         self.pg_enable_capture(self.pg_interfaces)
1874         self.pg_start()
1875         capture = self.pg1.get_capture(len(pkts))
1876         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1877
1878         # from NAT44 inside to interface without NAT44 feature (no translation)
1879         pkts = self.create_stream_in(self.pg0, self.pg2)
1880         self.pg0.add_stream(pkts)
1881         self.pg_enable_capture(self.pg_interfaces)
1882         self.pg_start()
1883         capture = self.pg2.get_capture(len(pkts))
1884         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1885
1886         # in2out 1st interface
1887         pkts = self.create_stream_in(self.pg0, self.pg3)
1888         self.pg0.add_stream(pkts)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         capture = self.pg3.get_capture(len(pkts))
1892         self.verify_capture_out(capture)
1893
1894         # out2in 1st interface
1895         pkts = self.create_stream_out(self.pg3)
1896         self.pg3.add_stream(pkts)
1897         self.pg_enable_capture(self.pg_interfaces)
1898         self.pg_start()
1899         capture = self.pg0.get_capture(len(pkts))
1900         self.verify_capture_in(capture, self.pg0)
1901
1902         # in2out 2nd interface
1903         pkts = self.create_stream_in(self.pg1, self.pg3)
1904         self.pg1.add_stream(pkts)
1905         self.pg_enable_capture(self.pg_interfaces)
1906         self.pg_start()
1907         capture = self.pg3.get_capture(len(pkts))
1908         self.verify_capture_out(capture)
1909
1910         # out2in 2nd interface
1911         pkts = self.create_stream_out(self.pg3)
1912         self.pg3.add_stream(pkts)
1913         self.pg_enable_capture(self.pg_interfaces)
1914         self.pg_start()
1915         capture = self.pg1.get_capture(len(pkts))
1916         self.verify_capture_in(capture, self.pg1)
1917
1918     def test_inside_overlapping_interfaces(self):
1919         """ NAT44 multiple inside interfaces with overlapping address space """
1920
1921         static_nat_ip = "10.0.0.10"
1922         self.nat44_add_address(self.nat_addr)
1923         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1924                                                   is_inside=0)
1925         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1929                                       vrf_id=20)
1930
1931         # between NAT44 inside interfaces with same VRF (no translation)
1932         pkts = self.create_stream_in(self.pg4, self.pg5)
1933         self.pg4.add_stream(pkts)
1934         self.pg_enable_capture(self.pg_interfaces)
1935         self.pg_start()
1936         capture = self.pg5.get_capture(len(pkts))
1937         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1938
1939         # between NAT44 inside interfaces with different VRF (hairpinning)
1940         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942              TCP(sport=1234, dport=5678))
1943         self.pg4.add_stream(p)
1944         self.pg_enable_capture(self.pg_interfaces)
1945         self.pg_start()
1946         capture = self.pg6.get_capture(1)
1947         p = capture[0]
1948         try:
1949             ip = p[IP]
1950             tcp = p[TCP]
1951             self.assertEqual(ip.src, self.nat_addr)
1952             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953             self.assertNotEqual(tcp.sport, 1234)
1954             self.assertEqual(tcp.dport, 5678)
1955         except:
1956             self.logger.error(ppp("Unexpected or invalid packet:", p))
1957             raise
1958
1959         # in2out 1st interface
1960         pkts = self.create_stream_in(self.pg4, self.pg3)
1961         self.pg4.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg3.get_capture(len(pkts))
1965         self.verify_capture_out(capture)
1966
1967         # out2in 1st interface
1968         pkts = self.create_stream_out(self.pg3)
1969         self.pg3.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg4.get_capture(len(pkts))
1973         self.verify_capture_in(capture, self.pg4)
1974
1975         # in2out 2nd interface
1976         pkts = self.create_stream_in(self.pg5, self.pg3)
1977         self.pg5.add_stream(pkts)
1978         self.pg_enable_capture(self.pg_interfaces)
1979         self.pg_start()
1980         capture = self.pg3.get_capture(len(pkts))
1981         self.verify_capture_out(capture)
1982
1983         # out2in 2nd interface
1984         pkts = self.create_stream_out(self.pg3)
1985         self.pg3.add_stream(pkts)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg5.get_capture(len(pkts))
1989         self.verify_capture_in(capture, self.pg5)
1990
1991         # pg5 session dump
1992         addresses = self.vapi.nat44_address_dump()
1993         self.assertEqual(len(addresses), 1)
1994         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995         self.assertEqual(len(sessions), 3)
1996         for session in sessions:
1997             self.assertFalse(session.is_static)
1998             self.assertEqual(session.inside_ip_address[0:4],
1999                              self.pg5.remote_ip4n)
2000             self.assertEqual(session.outside_ip_address,
2001                              addresses[0].ip_address)
2002         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2011
2012         # in2out 3rd interface
2013         pkts = self.create_stream_in(self.pg6, self.pg3)
2014         self.pg6.add_stream(pkts)
2015         self.pg_enable_capture(self.pg_interfaces)
2016         self.pg_start()
2017         capture = self.pg3.get_capture(len(pkts))
2018         self.verify_capture_out(capture, static_nat_ip, True)
2019
2020         # out2in 3rd interface
2021         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022         self.pg3.add_stream(pkts)
2023         self.pg_enable_capture(self.pg_interfaces)
2024         self.pg_start()
2025         capture = self.pg6.get_capture(len(pkts))
2026         self.verify_capture_in(capture, self.pg6)
2027
2028         # general user and session dump verifications
2029         users = self.vapi.nat44_user_dump()
2030         self.assertTrue(len(users) >= 3)
2031         addresses = self.vapi.nat44_address_dump()
2032         self.assertEqual(len(addresses), 1)
2033         for user in users:
2034             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2035                                                          user.vrf_id)
2036             for session in sessions:
2037                 self.assertEqual(user.ip_address, session.inside_ip_address)
2038                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039                 self.assertTrue(session.protocol in
2040                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2041                                  IP_PROTOS.icmp])
2042
2043         # pg4 session dump
2044         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045         self.assertTrue(len(sessions) >= 4)
2046         for session in sessions:
2047             self.assertFalse(session.is_static)
2048             self.assertEqual(session.inside_ip_address[0:4],
2049                              self.pg4.remote_ip4n)
2050             self.assertEqual(session.outside_ip_address,
2051                              addresses[0].ip_address)
2052
2053         # pg6 session dump
2054         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055         self.assertTrue(len(sessions) >= 3)
2056         for session in sessions:
2057             self.assertTrue(session.is_static)
2058             self.assertEqual(session.inside_ip_address[0:4],
2059                              self.pg6.remote_ip4n)
2060             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061                              map(int, static_nat_ip.split('.')))
2062             self.assertTrue(session.inside_port in
2063                             [self.tcp_port_in, self.udp_port_in,
2064                              self.icmp_id_in])
2065
2066     def test_hairpinning(self):
2067         """ NAT44 hairpinning - 1:1 NAPT """
2068
2069         host = self.pg0.remote_hosts[0]
2070         server = self.pg0.remote_hosts[1]
2071         host_in_port = 1234
2072         host_out_port = 0
2073         server_in_port = 5678
2074         server_out_port = 8765
2075
2076         self.nat44_add_address(self.nat_addr)
2077         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2079                                                   is_inside=0)
2080         # add static mapping for server
2081         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082                                       server_in_port, server_out_port,
2083                                       proto=IP_PROTOS.tcp)
2084
2085         # send packet from host to server
2086         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087              IP(src=host.ip4, dst=self.nat_addr) /
2088              TCP(sport=host_in_port, dport=server_out_port))
2089         self.pg0.add_stream(p)
2090         self.pg_enable_capture(self.pg_interfaces)
2091         self.pg_start()
2092         capture = self.pg0.get_capture(1)
2093         p = capture[0]
2094         try:
2095             ip = p[IP]
2096             tcp = p[TCP]
2097             self.assertEqual(ip.src, self.nat_addr)
2098             self.assertEqual(ip.dst, server.ip4)
2099             self.assertNotEqual(tcp.sport, host_in_port)
2100             self.assertEqual(tcp.dport, server_in_port)
2101             self.check_tcp_checksum(p)
2102             host_out_port = tcp.sport
2103         except:
2104             self.logger.error(ppp("Unexpected or invalid packet:", p))
2105             raise
2106
2107         # send reply from server to host
2108         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109              IP(src=server.ip4, dst=self.nat_addr) /
2110              TCP(sport=server_in_port, dport=host_out_port))
2111         self.pg0.add_stream(p)
2112         self.pg_enable_capture(self.pg_interfaces)
2113         self.pg_start()
2114         capture = self.pg0.get_capture(1)
2115         p = capture[0]
2116         try:
2117             ip = p[IP]
2118             tcp = p[TCP]
2119             self.assertEqual(ip.src, self.nat_addr)
2120             self.assertEqual(ip.dst, host.ip4)
2121             self.assertEqual(tcp.sport, server_out_port)
2122             self.assertEqual(tcp.dport, host_in_port)
2123             self.check_tcp_checksum(p)
2124         except:
2125             self.logger.error(ppp("Unexpected or invalid packet:", p))
2126             raise
2127
2128     def test_hairpinning2(self):
2129         """ NAT44 hairpinning - 1:1 NAT"""
2130
2131         server1_nat_ip = "10.0.0.10"
2132         server2_nat_ip = "10.0.0.11"
2133         host = self.pg0.remote_hosts[0]
2134         server1 = self.pg0.remote_hosts[1]
2135         server2 = self.pg0.remote_hosts[2]
2136         server_tcp_port = 22
2137         server_udp_port = 20
2138
2139         self.nat44_add_address(self.nat_addr)
2140         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2142                                                   is_inside=0)
2143
2144         # add static mapping for servers
2145         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2147
2148         # host to server1
2149         pkts = []
2150         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151              IP(src=host.ip4, dst=server1_nat_ip) /
2152              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2153         pkts.append(p)
2154         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155              IP(src=host.ip4, dst=server1_nat_ip) /
2156              UDP(sport=self.udp_port_in, dport=server_udp_port))
2157         pkts.append(p)
2158         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159              IP(src=host.ip4, dst=server1_nat_ip) /
2160              ICMP(id=self.icmp_id_in, type='echo-request'))
2161         pkts.append(p)
2162         self.pg0.add_stream(pkts)
2163         self.pg_enable_capture(self.pg_interfaces)
2164         self.pg_start()
2165         capture = self.pg0.get_capture(len(pkts))
2166         for packet in capture:
2167             try:
2168                 self.assertEqual(packet[IP].src, self.nat_addr)
2169                 self.assertEqual(packet[IP].dst, server1.ip4)
2170                 if packet.haslayer(TCP):
2171                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2173                     self.tcp_port_out = packet[TCP].sport
2174                     self.check_tcp_checksum(packet)
2175                 elif packet.haslayer(UDP):
2176                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177                     self.assertEqual(packet[UDP].dport, server_udp_port)
2178                     self.udp_port_out = packet[UDP].sport
2179                 else:
2180                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181                     self.icmp_id_out = packet[ICMP].id
2182             except:
2183                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2184                 raise
2185
2186         # server1 to host
2187         pkts = []
2188         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189              IP(src=server1.ip4, dst=self.nat_addr) /
2190              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2191         pkts.append(p)
2192         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193              IP(src=server1.ip4, dst=self.nat_addr) /
2194              UDP(sport=server_udp_port, dport=self.udp_port_out))
2195         pkts.append(p)
2196         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197              IP(src=server1.ip4, dst=self.nat_addr) /
2198              ICMP(id=self.icmp_id_out, type='echo-reply'))
2199         pkts.append(p)
2200         self.pg0.add_stream(pkts)
2201         self.pg_enable_capture(self.pg_interfaces)
2202         self.pg_start()
2203         capture = self.pg0.get_capture(len(pkts))
2204         for packet in capture:
2205             try:
2206                 self.assertEqual(packet[IP].src, server1_nat_ip)
2207                 self.assertEqual(packet[IP].dst, host.ip4)
2208                 if packet.haslayer(TCP):
2209                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2211                     self.check_tcp_checksum(packet)
2212                 elif packet.haslayer(UDP):
2213                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214                     self.assertEqual(packet[UDP].sport, server_udp_port)
2215                 else:
2216                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2217             except:
2218                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2219                 raise
2220
2221         # server2 to server1
2222         pkts = []
2223         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224              IP(src=server2.ip4, dst=server1_nat_ip) /
2225              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2226         pkts.append(p)
2227         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228              IP(src=server2.ip4, dst=server1_nat_ip) /
2229              UDP(sport=self.udp_port_in, dport=server_udp_port))
2230         pkts.append(p)
2231         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232              IP(src=server2.ip4, dst=server1_nat_ip) /
2233              ICMP(id=self.icmp_id_in, type='echo-request'))
2234         pkts.append(p)
2235         self.pg0.add_stream(pkts)
2236         self.pg_enable_capture(self.pg_interfaces)
2237         self.pg_start()
2238         capture = self.pg0.get_capture(len(pkts))
2239         for packet in capture:
2240             try:
2241                 self.assertEqual(packet[IP].src, server2_nat_ip)
2242                 self.assertEqual(packet[IP].dst, server1.ip4)
2243                 if packet.haslayer(TCP):
2244                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2246                     self.tcp_port_out = packet[TCP].sport
2247                     self.check_tcp_checksum(packet)
2248                 elif packet.haslayer(UDP):
2249                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250                     self.assertEqual(packet[UDP].dport, server_udp_port)
2251                     self.udp_port_out = packet[UDP].sport
2252                 else:
2253                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254                     self.icmp_id_out = packet[ICMP].id
2255             except:
2256                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2257                 raise
2258
2259         # server1 to server2
2260         pkts = []
2261         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262              IP(src=server1.ip4, dst=server2_nat_ip) /
2263              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2264         pkts.append(p)
2265         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266              IP(src=server1.ip4, dst=server2_nat_ip) /
2267              UDP(sport=server_udp_port, dport=self.udp_port_out))
2268         pkts.append(p)
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=server1.ip4, dst=server2_nat_ip) /
2271              ICMP(id=self.icmp_id_out, type='echo-reply'))
2272         pkts.append(p)
2273         self.pg0.add_stream(pkts)
2274         self.pg_enable_capture(self.pg_interfaces)
2275         self.pg_start()
2276         capture = self.pg0.get_capture(len(pkts))
2277         for packet in capture:
2278             try:
2279                 self.assertEqual(packet[IP].src, server1_nat_ip)
2280                 self.assertEqual(packet[IP].dst, server2.ip4)
2281                 if packet.haslayer(TCP):
2282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2284                     self.check_tcp_checksum(packet)
2285                 elif packet.haslayer(UDP):
2286                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287                     self.assertEqual(packet[UDP].sport, server_udp_port)
2288                 else:
2289                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2290             except:
2291                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2292                 raise
2293
2294     def test_max_translations_per_user(self):
2295         """ MAX translations per user - recycle the least recently used """
2296
2297         self.nat44_add_address(self.nat_addr)
2298         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2300                                                   is_inside=0)
2301
2302         # get maximum number of translations per user
2303         nat44_config = self.vapi.nat_show_config()
2304
2305         # send more than maximum number of translations per user packets
2306         pkts_num = nat44_config.max_translations_per_user + 5
2307         pkts = []
2308         for port in range(0, pkts_num):
2309             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311                  TCP(sport=1025 + port))
2312             pkts.append(p)
2313         self.pg0.add_stream(pkts)
2314         self.pg_enable_capture(self.pg_interfaces)
2315         self.pg_start()
2316
2317         # verify number of translated packet
2318         self.pg1.get_capture(pkts_num)
2319
2320     def test_interface_addr(self):
2321         """ Acquire NAT44 addresses from interface """
2322         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2323
2324         # no address in NAT pool
2325         adresses = self.vapi.nat44_address_dump()
2326         self.assertEqual(0, len(adresses))
2327
2328         # configure interface address and check NAT address pool
2329         self.pg7.config_ip4()
2330         adresses = self.vapi.nat44_address_dump()
2331         self.assertEqual(1, len(adresses))
2332         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2333
2334         # remove interface address and check NAT address pool
2335         self.pg7.unconfig_ip4()
2336         adresses = self.vapi.nat44_address_dump()
2337         self.assertEqual(0, len(adresses))
2338
2339     def test_interface_addr_static_mapping(self):
2340         """ Static mapping with addresses from interface """
2341         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342         self.nat44_add_static_mapping(
2343             '1.2.3.4',
2344             external_sw_if_index=self.pg7.sw_if_index)
2345
2346         # static mappings with external interface
2347         static_mappings = self.vapi.nat44_static_mapping_dump()
2348         self.assertEqual(1, len(static_mappings))
2349         self.assertEqual(self.pg7.sw_if_index,
2350                          static_mappings[0].external_sw_if_index)
2351
2352         # configure interface address and check static mappings
2353         self.pg7.config_ip4()
2354         static_mappings = self.vapi.nat44_static_mapping_dump()
2355         self.assertEqual(1, len(static_mappings))
2356         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357                          self.pg7.local_ip4n)
2358         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2359
2360         # remove interface address and check static mappings
2361         self.pg7.unconfig_ip4()
2362         static_mappings = self.vapi.nat44_static_mapping_dump()
2363         self.assertEqual(0, len(static_mappings))
2364
2365     def test_interface_addr_identity_nat(self):
2366         """ Identity NAT with addresses from interface """
2367
2368         port = 53053
2369         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370         self.vapi.nat44_add_del_identity_mapping(
2371             sw_if_index=self.pg7.sw_if_index,
2372             port=port,
2373             protocol=IP_PROTOS.tcp,
2374             addr_only=0)
2375
2376         # identity mappings with external interface
2377         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378         self.assertEqual(1, len(identity_mappings))
2379         self.assertEqual(self.pg7.sw_if_index,
2380                          identity_mappings[0].sw_if_index)
2381
2382         # configure interface address and check identity mappings
2383         self.pg7.config_ip4()
2384         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385         self.assertEqual(1, len(identity_mappings))
2386         self.assertEqual(identity_mappings[0].ip_address,
2387                          self.pg7.local_ip4n)
2388         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389         self.assertEqual(port, identity_mappings[0].port)
2390         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2391
2392         # remove interface address and check identity mappings
2393         self.pg7.unconfig_ip4()
2394         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395         self.assertEqual(0, len(identity_mappings))
2396
2397     def test_ipfix_nat44_sess(self):
2398         """ IPFIX logging NAT44 session created/delted """
2399         self.ipfix_domain_id = 10
2400         self.ipfix_src_port = 20202
2401         colector_port = 30303
2402         bind_layers(UDP, IPFIX, dport=30303)
2403         self.nat44_add_address(self.nat_addr)
2404         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2406                                                   is_inside=0)
2407         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408                                      src_address=self.pg3.local_ip4n,
2409                                      path_mtu=512,
2410                                      template_interval=10,
2411                                      collector_port=colector_port)
2412         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413                             src_port=self.ipfix_src_port)
2414
2415         pkts = self.create_stream_in(self.pg0, self.pg1)
2416         self.pg0.add_stream(pkts)
2417         self.pg_enable_capture(self.pg_interfaces)
2418         self.pg_start()
2419         capture = self.pg1.get_capture(len(pkts))
2420         self.verify_capture_out(capture)
2421         self.nat44_add_address(self.nat_addr, is_add=0)
2422         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2423         capture = self.pg3.get_capture(9)
2424         ipfix = IPFIXDecoder()
2425         # first load template
2426         for p in capture:
2427             self.assertTrue(p.haslayer(IPFIX))
2428             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431             self.assertEqual(p[UDP].dport, colector_port)
2432             self.assertEqual(p[IPFIX].observationDomainID,
2433                              self.ipfix_domain_id)
2434             if p.haslayer(Template):
2435                 ipfix.add_template(p.getlayer(Template))
2436         # verify events in data set
2437         for p in capture:
2438             if p.haslayer(Data):
2439                 data = ipfix.decode_data_set(p.getlayer(Set))
2440                 self.verify_ipfix_nat44_ses(data)
2441
2442     def test_ipfix_addr_exhausted(self):
2443         """ IPFIX logging NAT addresses exhausted """
2444         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2446                                                   is_inside=0)
2447         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448                                      src_address=self.pg3.local_ip4n,
2449                                      path_mtu=512,
2450                                      template_interval=10)
2451         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452                             src_port=self.ipfix_src_port)
2453
2454         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2456              TCP(sport=3025))
2457         self.pg0.add_stream(p)
2458         self.pg_enable_capture(self.pg_interfaces)
2459         self.pg_start()
2460         capture = self.pg1.get_capture(0)
2461         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2462         capture = self.pg3.get_capture(9)
2463         ipfix = IPFIXDecoder()
2464         # first load template
2465         for p in capture:
2466             self.assertTrue(p.haslayer(IPFIX))
2467             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470             self.assertEqual(p[UDP].dport, 4739)
2471             self.assertEqual(p[IPFIX].observationDomainID,
2472                              self.ipfix_domain_id)
2473             if p.haslayer(Template):
2474                 ipfix.add_template(p.getlayer(Template))
2475         # verify events in data set
2476         for p in capture:
2477             if p.haslayer(Data):
2478                 data = ipfix.decode_data_set(p.getlayer(Set))
2479                 self.verify_ipfix_addr_exhausted(data)
2480
2481     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482     def test_ipfix_max_sessions(self):
2483         """ IPFIX logging maximum session entries exceeded """
2484         self.nat44_add_address(self.nat_addr)
2485         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2487                                                   is_inside=0)
2488
2489         nat44_config = self.vapi.nat_show_config()
2490         max_sessions = 10 * nat44_config.translation_buckets
2491
2492         pkts = []
2493         for i in range(0, max_sessions):
2494             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496                  IP(src=src, dst=self.pg1.remote_ip4) /
2497                  TCP(sport=1025))
2498             pkts.append(p)
2499         self.pg0.add_stream(pkts)
2500         self.pg_enable_capture(self.pg_interfaces)
2501         self.pg_start()
2502
2503         self.pg1.get_capture(max_sessions)
2504         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505                                      src_address=self.pg3.local_ip4n,
2506                                      path_mtu=512,
2507                                      template_interval=10)
2508         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509                             src_port=self.ipfix_src_port)
2510
2511         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2513              TCP(sport=1025))
2514         self.pg0.add_stream(p)
2515         self.pg_enable_capture(self.pg_interfaces)
2516         self.pg_start()
2517         self.pg1.get_capture(0)
2518         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2519         capture = self.pg3.get_capture(9)
2520         ipfix = IPFIXDecoder()
2521         # first load template
2522         for p in capture:
2523             self.assertTrue(p.haslayer(IPFIX))
2524             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527             self.assertEqual(p[UDP].dport, 4739)
2528             self.assertEqual(p[IPFIX].observationDomainID,
2529                              self.ipfix_domain_id)
2530             if p.haslayer(Template):
2531                 ipfix.add_template(p.getlayer(Template))
2532         # verify events in data set
2533         for p in capture:
2534             if p.haslayer(Data):
2535                 data = ipfix.decode_data_set(p.getlayer(Set))
2536                 self.verify_ipfix_max_sessions(data, max_sessions)
2537
2538     def test_pool_addr_fib(self):
2539         """ NAT44 add pool addresses to FIB """
2540         static_addr = '10.0.0.10'
2541         self.nat44_add_address(self.nat_addr)
2542         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2544                                                   is_inside=0)
2545         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2546
2547         # NAT44 address
2548         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549              ARP(op=ARP.who_has, pdst=self.nat_addr,
2550                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551         self.pg1.add_stream(p)
2552         self.pg_enable_capture(self.pg_interfaces)
2553         self.pg_start()
2554         capture = self.pg1.get_capture(1)
2555         self.assertTrue(capture[0].haslayer(ARP))
2556         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2557
2558         # 1:1 NAT address
2559         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560              ARP(op=ARP.who_has, pdst=static_addr,
2561                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562         self.pg1.add_stream(p)
2563         self.pg_enable_capture(self.pg_interfaces)
2564         self.pg_start()
2565         capture = self.pg1.get_capture(1)
2566         self.assertTrue(capture[0].haslayer(ARP))
2567         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2568
2569         # send ARP to non-NAT44 interface
2570         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571              ARP(op=ARP.who_has, pdst=self.nat_addr,
2572                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573         self.pg2.add_stream(p)
2574         self.pg_enable_capture(self.pg_interfaces)
2575         self.pg_start()
2576         capture = self.pg1.get_capture(0)
2577
2578         # remove addresses and verify
2579         self.nat44_add_address(self.nat_addr, is_add=0)
2580         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2581                                       is_add=0)
2582
2583         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584              ARP(op=ARP.who_has, pdst=self.nat_addr,
2585                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586         self.pg1.add_stream(p)
2587         self.pg_enable_capture(self.pg_interfaces)
2588         self.pg_start()
2589         capture = self.pg1.get_capture(0)
2590
2591         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592              ARP(op=ARP.who_has, pdst=static_addr,
2593                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594         self.pg1.add_stream(p)
2595         self.pg_enable_capture(self.pg_interfaces)
2596         self.pg_start()
2597         capture = self.pg1.get_capture(0)
2598
2599     def test_vrf_mode(self):
2600         """ NAT44 tenant VRF aware address pool mode """
2601
2602         vrf_id1 = 1
2603         vrf_id2 = 2
2604         nat_ip1 = "10.0.0.10"
2605         nat_ip2 = "10.0.0.11"
2606
2607         self.pg0.unconfig_ip4()
2608         self.pg1.unconfig_ip4()
2609         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611         self.pg0.set_table_ip4(vrf_id1)
2612         self.pg1.set_table_ip4(vrf_id2)
2613         self.pg0.config_ip4()
2614         self.pg1.config_ip4()
2615
2616         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2621                                                   is_inside=0)
2622
2623         # first VRF
2624         pkts = self.create_stream_in(self.pg0, self.pg2)
2625         self.pg0.add_stream(pkts)
2626         self.pg_enable_capture(self.pg_interfaces)
2627         self.pg_start()
2628         capture = self.pg2.get_capture(len(pkts))
2629         self.verify_capture_out(capture, nat_ip1)
2630
2631         # second VRF
2632         pkts = self.create_stream_in(self.pg1, self.pg2)
2633         self.pg1.add_stream(pkts)
2634         self.pg_enable_capture(self.pg_interfaces)
2635         self.pg_start()
2636         capture = self.pg2.get_capture(len(pkts))
2637         self.verify_capture_out(capture, nat_ip2)
2638
2639         self.pg0.unconfig_ip4()
2640         self.pg1.unconfig_ip4()
2641         self.pg0.set_table_ip4(0)
2642         self.pg1.set_table_ip4(0)
2643         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2645
2646     def test_vrf_feature_independent(self):
2647         """ NAT44 tenant VRF independent address pool mode """
2648
2649         nat_ip1 = "10.0.0.10"
2650         nat_ip2 = "10.0.0.11"
2651
2652         self.nat44_add_address(nat_ip1)
2653         self.nat44_add_address(nat_ip2, vrf_id=99)
2654         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2657                                                   is_inside=0)
2658
2659         # first VRF
2660         pkts = self.create_stream_in(self.pg0, self.pg2)
2661         self.pg0.add_stream(pkts)
2662         self.pg_enable_capture(self.pg_interfaces)
2663         self.pg_start()
2664         capture = self.pg2.get_capture(len(pkts))
2665         self.verify_capture_out(capture, nat_ip1)
2666
2667         # second VRF
2668         pkts = self.create_stream_in(self.pg1, self.pg2)
2669         self.pg1.add_stream(pkts)
2670         self.pg_enable_capture(self.pg_interfaces)
2671         self.pg_start()
2672         capture = self.pg2.get_capture(len(pkts))
2673         self.verify_capture_out(capture, nat_ip1)
2674
2675     def test_dynamic_ipless_interfaces(self):
2676         """ NAT44 interfaces without configured IP address """
2677
2678         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679                                       mactobinary(self.pg7.remote_mac),
2680                                       self.pg7.remote_ip4n,
2681                                       is_static=1)
2682         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683                                       mactobinary(self.pg8.remote_mac),
2684                                       self.pg8.remote_ip4n,
2685                                       is_static=1)
2686
2687         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688                                    dst_address_length=32,
2689                                    next_hop_address=self.pg7.remote_ip4n,
2690                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2691         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692                                    dst_address_length=32,
2693                                    next_hop_address=self.pg8.remote_ip4n,
2694                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2695
2696         self.nat44_add_address(self.nat_addr)
2697         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2699                                                   is_inside=0)
2700
2701         # in2out
2702         pkts = self.create_stream_in(self.pg7, self.pg8)
2703         self.pg7.add_stream(pkts)
2704         self.pg_enable_capture(self.pg_interfaces)
2705         self.pg_start()
2706         capture = self.pg8.get_capture(len(pkts))
2707         self.verify_capture_out(capture)
2708
2709         # out2in
2710         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711         self.pg8.add_stream(pkts)
2712         self.pg_enable_capture(self.pg_interfaces)
2713         self.pg_start()
2714         capture = self.pg7.get_capture(len(pkts))
2715         self.verify_capture_in(capture, self.pg7)
2716
2717     def test_static_ipless_interfaces(self):
2718         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2719
2720         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721                                       mactobinary(self.pg7.remote_mac),
2722                                       self.pg7.remote_ip4n,
2723                                       is_static=1)
2724         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725                                       mactobinary(self.pg8.remote_mac),
2726                                       self.pg8.remote_ip4n,
2727                                       is_static=1)
2728
2729         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730                                    dst_address_length=32,
2731                                    next_hop_address=self.pg7.remote_ip4n,
2732                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2733         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734                                    dst_address_length=32,
2735                                    next_hop_address=self.pg8.remote_ip4n,
2736                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2737
2738         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2741                                                   is_inside=0)
2742
2743         # out2in
2744         pkts = self.create_stream_out(self.pg8)
2745         self.pg8.add_stream(pkts)
2746         self.pg_enable_capture(self.pg_interfaces)
2747         self.pg_start()
2748         capture = self.pg7.get_capture(len(pkts))
2749         self.verify_capture_in(capture, self.pg7)
2750
2751         # in2out
2752         pkts = self.create_stream_in(self.pg7, self.pg8)
2753         self.pg7.add_stream(pkts)
2754         self.pg_enable_capture(self.pg_interfaces)
2755         self.pg_start()
2756         capture = self.pg8.get_capture(len(pkts))
2757         self.verify_capture_out(capture, self.nat_addr, True)
2758
2759     def test_static_with_port_ipless_interfaces(self):
2760         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2761
2762         self.tcp_port_out = 30606
2763         self.udp_port_out = 30607
2764         self.icmp_id_out = 30608
2765
2766         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767                                       mactobinary(self.pg7.remote_mac),
2768                                       self.pg7.remote_ip4n,
2769                                       is_static=1)
2770         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771                                       mactobinary(self.pg8.remote_mac),
2772                                       self.pg8.remote_ip4n,
2773                                       is_static=1)
2774
2775         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776                                    dst_address_length=32,
2777                                    next_hop_address=self.pg7.remote_ip4n,
2778                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2779         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780                                    dst_address_length=32,
2781                                    next_hop_address=self.pg8.remote_ip4n,
2782                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2783
2784         self.nat44_add_address(self.nat_addr)
2785         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786                                       self.tcp_port_in, self.tcp_port_out,
2787                                       proto=IP_PROTOS.tcp)
2788         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789                                       self.udp_port_in, self.udp_port_out,
2790                                       proto=IP_PROTOS.udp)
2791         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792                                       self.icmp_id_in, self.icmp_id_out,
2793                                       proto=IP_PROTOS.icmp)
2794         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2796                                                   is_inside=0)
2797
2798         # out2in
2799         pkts = self.create_stream_out(self.pg8)
2800         self.pg8.add_stream(pkts)
2801         self.pg_enable_capture(self.pg_interfaces)
2802         self.pg_start()
2803         capture = self.pg7.get_capture(len(pkts))
2804         self.verify_capture_in(capture, self.pg7)
2805
2806         # in2out
2807         pkts = self.create_stream_in(self.pg7, self.pg8)
2808         self.pg7.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         capture = self.pg8.get_capture(len(pkts))
2812         self.verify_capture_out(capture)
2813
2814     def test_static_unknown_proto(self):
2815         """ 1:1 NAT translate packet with unknown protocol """
2816         nat_ip = "10.0.0.10"
2817         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2820                                                   is_inside=0)
2821
2822         # in2out
2823         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2825              GRE() /
2826              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827              TCP(sport=1234, dport=1234))
2828         self.pg0.add_stream(p)
2829         self.pg_enable_capture(self.pg_interfaces)
2830         self.pg_start()
2831         p = self.pg1.get_capture(1)
2832         packet = p[0]
2833         try:
2834             self.assertEqual(packet[IP].src, nat_ip)
2835             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836             self.assertTrue(packet.haslayer(GRE))
2837             self.check_ip_checksum(packet)
2838         except:
2839             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2840             raise
2841
2842         # out2in
2843         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2845              GRE() /
2846              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847              TCP(sport=1234, dport=1234))
2848         self.pg1.add_stream(p)
2849         self.pg_enable_capture(self.pg_interfaces)
2850         self.pg_start()
2851         p = self.pg0.get_capture(1)
2852         packet = p[0]
2853         try:
2854             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856             self.assertTrue(packet.haslayer(GRE))
2857             self.check_ip_checksum(packet)
2858         except:
2859             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2860             raise
2861
2862     def test_hairpinning_static_unknown_proto(self):
2863         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2864
2865         host = self.pg0.remote_hosts[0]
2866         server = self.pg0.remote_hosts[1]
2867
2868         host_nat_ip = "10.0.0.10"
2869         server_nat_ip = "10.0.0.11"
2870
2871         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2875                                                   is_inside=0)
2876
2877         # host to server
2878         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879              IP(src=host.ip4, dst=server_nat_ip) /
2880              GRE() /
2881              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882              TCP(sport=1234, dport=1234))
2883         self.pg0.add_stream(p)
2884         self.pg_enable_capture(self.pg_interfaces)
2885         self.pg_start()
2886         p = self.pg0.get_capture(1)
2887         packet = p[0]
2888         try:
2889             self.assertEqual(packet[IP].src, host_nat_ip)
2890             self.assertEqual(packet[IP].dst, server.ip4)
2891             self.assertTrue(packet.haslayer(GRE))
2892             self.check_ip_checksum(packet)
2893         except:
2894             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2895             raise
2896
2897         # server to host
2898         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899              IP(src=server.ip4, dst=host_nat_ip) /
2900              GRE() /
2901              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902              TCP(sport=1234, dport=1234))
2903         self.pg0.add_stream(p)
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906         p = self.pg0.get_capture(1)
2907         packet = p[0]
2908         try:
2909             self.assertEqual(packet[IP].src, server_nat_ip)
2910             self.assertEqual(packet[IP].dst, host.ip4)
2911             self.assertTrue(packet.haslayer(GRE))
2912             self.check_ip_checksum(packet)
2913         except:
2914             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2915             raise
2916
2917     def test_unknown_proto(self):
2918         """ NAT44 translate packet with unknown protocol """
2919         self.nat44_add_address(self.nat_addr)
2920         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2922                                                   is_inside=0)
2923
2924         # in2out
2925         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927              TCP(sport=self.tcp_port_in, dport=20))
2928         self.pg0.add_stream(p)
2929         self.pg_enable_capture(self.pg_interfaces)
2930         self.pg_start()
2931         p = self.pg1.get_capture(1)
2932
2933         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2935              GRE() /
2936              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937              TCP(sport=1234, dport=1234))
2938         self.pg0.add_stream(p)
2939         self.pg_enable_capture(self.pg_interfaces)
2940         self.pg_start()
2941         p = self.pg1.get_capture(1)
2942         packet = p[0]
2943         try:
2944             self.assertEqual(packet[IP].src, self.nat_addr)
2945             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946             self.assertTrue(packet.haslayer(GRE))
2947             self.check_ip_checksum(packet)
2948         except:
2949             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2950             raise
2951
2952         # out2in
2953         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2955              GRE() /
2956              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957              TCP(sport=1234, dport=1234))
2958         self.pg1.add_stream(p)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         p = self.pg0.get_capture(1)
2962         packet = p[0]
2963         try:
2964             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966             self.assertTrue(packet.haslayer(GRE))
2967             self.check_ip_checksum(packet)
2968         except:
2969             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970             raise
2971
2972     def test_hairpinning_unknown_proto(self):
2973         """ NAT44 translate packet with unknown protocol - hairpinning """
2974         host = self.pg0.remote_hosts[0]
2975         server = self.pg0.remote_hosts[1]
2976         host_in_port = 1234
2977         host_out_port = 0
2978         server_in_port = 5678
2979         server_out_port = 8765
2980         server_nat_ip = "10.0.0.11"
2981
2982         self.nat44_add_address(self.nat_addr)
2983         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2985                                                   is_inside=0)
2986
2987         # add static mapping for server
2988         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2989
2990         # host to server
2991         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992              IP(src=host.ip4, dst=server_nat_ip) /
2993              TCP(sport=host_in_port, dport=server_out_port))
2994         self.pg0.add_stream(p)
2995         self.pg_enable_capture(self.pg_interfaces)
2996         self.pg_start()
2997         capture = self.pg0.get_capture(1)
2998
2999         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000              IP(src=host.ip4, dst=server_nat_ip) /
3001              GRE() /
3002              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003              TCP(sport=1234, dport=1234))
3004         self.pg0.add_stream(p)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007         p = self.pg0.get_capture(1)
3008         packet = p[0]
3009         try:
3010             self.assertEqual(packet[IP].src, self.nat_addr)
3011             self.assertEqual(packet[IP].dst, server.ip4)
3012             self.assertTrue(packet.haslayer(GRE))
3013             self.check_ip_checksum(packet)
3014         except:
3015             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3016             raise
3017
3018         # server to host
3019         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020              IP(src=server.ip4, dst=self.nat_addr) /
3021              GRE() /
3022              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023              TCP(sport=1234, dport=1234))
3024         self.pg0.add_stream(p)
3025         self.pg_enable_capture(self.pg_interfaces)
3026         self.pg_start()
3027         p = self.pg0.get_capture(1)
3028         packet = p[0]
3029         try:
3030             self.assertEqual(packet[IP].src, server_nat_ip)
3031             self.assertEqual(packet[IP].dst, host.ip4)
3032             self.assertTrue(packet.haslayer(GRE))
3033             self.check_ip_checksum(packet)
3034         except:
3035             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3036             raise
3037
3038     def test_output_feature(self):
3039         """ NAT44 interface output feature (in2out postrouting) """
3040         self.nat44_add_address(self.nat_addr)
3041         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3044                                                          is_inside=0)
3045
3046         # in2out
3047         pkts = self.create_stream_in(self.pg0, self.pg3)
3048         self.pg0.add_stream(pkts)
3049         self.pg_enable_capture(self.pg_interfaces)
3050         self.pg_start()
3051         capture = self.pg3.get_capture(len(pkts))
3052         self.verify_capture_out(capture)
3053
3054         # out2in
3055         pkts = self.create_stream_out(self.pg3)
3056         self.pg3.add_stream(pkts)
3057         self.pg_enable_capture(self.pg_interfaces)
3058         self.pg_start()
3059         capture = self.pg0.get_capture(len(pkts))
3060         self.verify_capture_in(capture, self.pg0)
3061
3062         # from non-NAT interface to NAT inside interface
3063         pkts = self.create_stream_in(self.pg2, self.pg0)
3064         self.pg2.add_stream(pkts)
3065         self.pg_enable_capture(self.pg_interfaces)
3066         self.pg_start()
3067         capture = self.pg0.get_capture(len(pkts))
3068         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3069
3070     def test_output_feature_vrf_aware(self):
3071         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072         nat_ip_vrf10 = "10.0.0.10"
3073         nat_ip_vrf20 = "10.0.0.20"
3074
3075         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076                                    dst_address_length=32,
3077                                    next_hop_address=self.pg3.remote_ip4n,
3078                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3079                                    table_id=10)
3080         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081                                    dst_address_length=32,
3082                                    next_hop_address=self.pg3.remote_ip4n,
3083                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3084                                    table_id=20)
3085
3086         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3091                                                          is_inside=0)
3092
3093         # in2out VRF 10
3094         pkts = self.create_stream_in(self.pg4, self.pg3)
3095         self.pg4.add_stream(pkts)
3096         self.pg_enable_capture(self.pg_interfaces)
3097         self.pg_start()
3098         capture = self.pg3.get_capture(len(pkts))
3099         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3100
3101         # out2in VRF 10
3102         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103         self.pg3.add_stream(pkts)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg4.get_capture(len(pkts))
3107         self.verify_capture_in(capture, self.pg4)
3108
3109         # in2out VRF 20
3110         pkts = self.create_stream_in(self.pg6, self.pg3)
3111         self.pg6.add_stream(pkts)
3112         self.pg_enable_capture(self.pg_interfaces)
3113         self.pg_start()
3114         capture = self.pg3.get_capture(len(pkts))
3115         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3116
3117         # out2in VRF 20
3118         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119         self.pg3.add_stream(pkts)
3120         self.pg_enable_capture(self.pg_interfaces)
3121         self.pg_start()
3122         capture = self.pg6.get_capture(len(pkts))
3123         self.verify_capture_in(capture, self.pg6)
3124
3125     def test_output_feature_hairpinning(self):
3126         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127         host = self.pg0.remote_hosts[0]
3128         server = self.pg0.remote_hosts[1]
3129         host_in_port = 1234
3130         host_out_port = 0
3131         server_in_port = 5678
3132         server_out_port = 8765
3133
3134         self.nat44_add_address(self.nat_addr)
3135         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3137                                                          is_inside=0)
3138
3139         # add static mapping for server
3140         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141                                       server_in_port, server_out_port,
3142                                       proto=IP_PROTOS.tcp)
3143
3144         # send packet from host to server
3145         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146              IP(src=host.ip4, dst=self.nat_addr) /
3147              TCP(sport=host_in_port, dport=server_out_port))
3148         self.pg0.add_stream(p)
3149         self.pg_enable_capture(self.pg_interfaces)
3150         self.pg_start()
3151         capture = self.pg0.get_capture(1)
3152         p = capture[0]
3153         try:
3154             ip = p[IP]
3155             tcp = p[TCP]
3156             self.assertEqual(ip.src, self.nat_addr)
3157             self.assertEqual(ip.dst, server.ip4)
3158             self.assertNotEqual(tcp.sport, host_in_port)
3159             self.assertEqual(tcp.dport, server_in_port)
3160             self.check_tcp_checksum(p)
3161             host_out_port = tcp.sport
3162         except:
3163             self.logger.error(ppp("Unexpected or invalid packet:", p))
3164             raise
3165
3166         # send reply from server to host
3167         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168              IP(src=server.ip4, dst=self.nat_addr) /
3169              TCP(sport=server_in_port, dport=host_out_port))
3170         self.pg0.add_stream(p)
3171         self.pg_enable_capture(self.pg_interfaces)
3172         self.pg_start()
3173         capture = self.pg0.get_capture(1)
3174         p = capture[0]
3175         try:
3176             ip = p[IP]
3177             tcp = p[TCP]
3178             self.assertEqual(ip.src, self.nat_addr)
3179             self.assertEqual(ip.dst, host.ip4)
3180             self.assertEqual(tcp.sport, server_out_port)
3181             self.assertEqual(tcp.dport, host_in_port)
3182             self.check_tcp_checksum(p)
3183         except:
3184             self.logger.error(ppp("Unexpected or invalid packet:", p))
3185             raise
3186
3187     def test_one_armed_nat44(self):
3188         """ One armed NAT44 """
3189         remote_host = self.pg9.remote_hosts[0]
3190         local_host = self.pg9.remote_hosts[1]
3191         external_port = 0
3192
3193         self.nat44_add_address(self.nat_addr)
3194         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3196                                                   is_inside=0)
3197
3198         # in2out
3199         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200              IP(src=local_host.ip4, dst=remote_host.ip4) /
3201              TCP(sport=12345, dport=80))
3202         self.pg9.add_stream(p)
3203         self.pg_enable_capture(self.pg_interfaces)
3204         self.pg_start()
3205         capture = self.pg9.get_capture(1)
3206         p = capture[0]
3207         try:
3208             ip = p[IP]
3209             tcp = p[TCP]
3210             self.assertEqual(ip.src, self.nat_addr)
3211             self.assertEqual(ip.dst, remote_host.ip4)
3212             self.assertNotEqual(tcp.sport, 12345)
3213             external_port = tcp.sport
3214             self.assertEqual(tcp.dport, 80)
3215             self.check_tcp_checksum(p)
3216             self.check_ip_checksum(p)
3217         except:
3218             self.logger.error(ppp("Unexpected or invalid packet:", p))
3219             raise
3220
3221         # out2in
3222         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223              IP(src=remote_host.ip4, dst=self.nat_addr) /
3224              TCP(sport=80, dport=external_port))
3225         self.pg9.add_stream(p)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         capture = self.pg9.get_capture(1)
3229         p = capture[0]
3230         try:
3231             ip = p[IP]
3232             tcp = p[TCP]
3233             self.assertEqual(ip.src, remote_host.ip4)
3234             self.assertEqual(ip.dst, local_host.ip4)
3235             self.assertEqual(tcp.sport, 80)
3236             self.assertEqual(tcp.dport, 12345)
3237             self.check_tcp_checksum(p)
3238             self.check_ip_checksum(p)
3239         except:
3240             self.logger.error(ppp("Unexpected or invalid packet:", p))
3241             raise
3242
3243     def test_one_armed_nat44_static(self):
3244         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3245         remote_host = self.pg9.remote_hosts[0]
3246         local_host = self.pg9.remote_hosts[1]
3247         external_port = 80
3248         local_port = 8080
3249         eh_port_in = 0
3250
3251         self.vapi.nat44_forwarding_enable_disable(1)
3252         self.nat44_add_address(self.nat_addr, twice_nat=1)
3253         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3254                                       local_port, external_port,
3255                                       proto=IP_PROTOS.tcp, out2in_only=1,
3256                                       twice_nat=1)
3257         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3258         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3259                                                   is_inside=0)
3260
3261         # from client to service
3262         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3263              IP(src=remote_host.ip4, dst=self.nat_addr) /
3264              TCP(sport=12345, dport=external_port))
3265         self.pg9.add_stream(p)
3266         self.pg_enable_capture(self.pg_interfaces)
3267         self.pg_start()
3268         capture = self.pg9.get_capture(1)
3269         p = capture[0]
3270         server = None
3271         try:
3272             ip = p[IP]
3273             tcp = p[TCP]
3274             self.assertEqual(ip.dst, local_host.ip4)
3275             self.assertEqual(ip.src, self.nat_addr)
3276             self.assertEqual(tcp.dport, local_port)
3277             self.assertNotEqual(tcp.sport, 12345)
3278             eh_port_in = tcp.sport
3279             self.check_tcp_checksum(p)
3280             self.check_ip_checksum(p)
3281         except:
3282             self.logger.error(ppp("Unexpected or invalid packet:", p))
3283             raise
3284
3285         # from service back to client
3286         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3287              IP(src=local_host.ip4, dst=self.nat_addr) /
3288              TCP(sport=local_port, dport=eh_port_in))
3289         self.pg9.add_stream(p)
3290         self.pg_enable_capture(self.pg_interfaces)
3291         self.pg_start()
3292         capture = self.pg9.get_capture(1)
3293         p = capture[0]
3294         try:
3295             ip = p[IP]
3296             tcp = p[TCP]
3297             self.assertEqual(ip.src, self.nat_addr)
3298             self.assertEqual(ip.dst, remote_host.ip4)
3299             self.assertEqual(tcp.sport, external_port)
3300             self.assertEqual(tcp.dport, 12345)
3301             self.check_tcp_checksum(p)
3302             self.check_ip_checksum(p)
3303         except:
3304             self.logger.error(ppp("Unexpected or invalid packet:", p))
3305             raise
3306
3307     def test_del_session(self):
3308         """ Delete NAT44 session """
3309         self.nat44_add_address(self.nat_addr)
3310         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3312                                                   is_inside=0)
3313
3314         pkts = self.create_stream_in(self.pg0, self.pg1)
3315         self.pg0.add_stream(pkts)
3316         self.pg_enable_capture(self.pg_interfaces)
3317         self.pg_start()
3318         capture = self.pg1.get_capture(len(pkts))
3319
3320         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3321         nsessions = len(sessions)
3322
3323         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3324                                     sessions[0].inside_port,
3325                                     sessions[0].protocol)
3326         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3327                                     sessions[1].outside_port,
3328                                     sessions[1].protocol,
3329                                     is_in=0)
3330
3331         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3332         self.assertEqual(nsessions - len(sessions), 2)
3333
3334     def test_set_get_reass(self):
3335         """ NAT44 set/get virtual fragmentation reassembly """
3336         reas_cfg1 = self.vapi.nat_get_reass()
3337
3338         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3339                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3340                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3341
3342         reas_cfg2 = self.vapi.nat_get_reass()
3343
3344         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3345         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3346         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3347
3348         self.vapi.nat_set_reass(drop_frag=1)
3349         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3350
3351     def test_frag_in_order(self):
3352         """ NAT44 translate fragments arriving in order """
3353         self.nat44_add_address(self.nat_addr)
3354         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3356                                                   is_inside=0)
3357
3358         data = "A" * 4 + "B" * 16 + "C" * 3
3359         self.tcp_port_in = random.randint(1025, 65535)
3360
3361         reass = self.vapi.nat_reass_dump()
3362         reass_n_start = len(reass)
3363
3364         # in2out
3365         pkts = self.create_stream_frag(self.pg0,
3366                                        self.pg1.remote_ip4,
3367                                        self.tcp_port_in,
3368                                        20,
3369                                        data)
3370         self.pg0.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         frags = self.pg1.get_capture(len(pkts))
3374         p = self.reass_frags_and_verify(frags,
3375                                         self.nat_addr,
3376                                         self.pg1.remote_ip4)
3377         self.assertEqual(p[TCP].dport, 20)
3378         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3379         self.tcp_port_out = p[TCP].sport
3380         self.assertEqual(data, p[Raw].load)
3381
3382         # out2in
3383         pkts = self.create_stream_frag(self.pg1,
3384                                        self.nat_addr,
3385                                        20,
3386                                        self.tcp_port_out,
3387                                        data)
3388         self.pg1.add_stream(pkts)
3389         self.pg_enable_capture(self.pg_interfaces)
3390         self.pg_start()
3391         frags = self.pg0.get_capture(len(pkts))
3392         p = self.reass_frags_and_verify(frags,
3393                                         self.pg1.remote_ip4,
3394                                         self.pg0.remote_ip4)
3395         self.assertEqual(p[TCP].sport, 20)
3396         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3397         self.assertEqual(data, p[Raw].load)
3398
3399         reass = self.vapi.nat_reass_dump()
3400         reass_n_end = len(reass)
3401
3402         self.assertEqual(reass_n_end - reass_n_start, 2)
3403
3404     def test_reass_hairpinning(self):
3405         """ NAT44 fragments hairpinning """
3406         host = self.pg0.remote_hosts[0]
3407         server = self.pg0.remote_hosts[1]
3408         host_in_port = random.randint(1025, 65535)
3409         host_out_port = 0
3410         server_in_port = random.randint(1025, 65535)
3411         server_out_port = random.randint(1025, 65535)
3412         data = "A" * 4 + "B" * 16 + "C" * 3
3413
3414         self.nat44_add_address(self.nat_addr)
3415         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3416         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3417                                                   is_inside=0)
3418         # add static mapping for server
3419         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3420                                       server_in_port, server_out_port,
3421                                       proto=IP_PROTOS.tcp)
3422
3423         # send packet from host to server
3424         pkts = self.create_stream_frag(self.pg0,
3425                                        self.nat_addr,
3426                                        host_in_port,
3427                                        server_out_port,
3428                                        data)
3429         self.pg0.add_stream(pkts)
3430         self.pg_enable_capture(self.pg_interfaces)
3431         self.pg_start()
3432         frags = self.pg0.get_capture(len(pkts))
3433         p = self.reass_frags_and_verify(frags,
3434                                         self.nat_addr,
3435                                         server.ip4)
3436         self.assertNotEqual(p[TCP].sport, host_in_port)
3437         self.assertEqual(p[TCP].dport, server_in_port)
3438         self.assertEqual(data, p[Raw].load)
3439
3440     def test_frag_out_of_order(self):
3441         """ NAT44 translate fragments arriving out of order """
3442         self.nat44_add_address(self.nat_addr)
3443         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3445                                                   is_inside=0)
3446
3447         data = "A" * 4 + "B" * 16 + "C" * 3
3448         random.randint(1025, 65535)
3449
3450         # in2out
3451         pkts = self.create_stream_frag(self.pg0,
3452                                        self.pg1.remote_ip4,
3453                                        self.tcp_port_in,
3454                                        20,
3455                                        data)
3456         pkts.reverse()
3457         self.pg0.add_stream(pkts)
3458         self.pg_enable_capture(self.pg_interfaces)
3459         self.pg_start()
3460         frags = self.pg1.get_capture(len(pkts))
3461         p = self.reass_frags_and_verify(frags,
3462                                         self.nat_addr,
3463                                         self.pg1.remote_ip4)
3464         self.assertEqual(p[TCP].dport, 20)
3465         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3466         self.tcp_port_out = p[TCP].sport
3467         self.assertEqual(data, p[Raw].load)
3468
3469         # out2in
3470         pkts = self.create_stream_frag(self.pg1,
3471                                        self.nat_addr,
3472                                        20,
3473                                        self.tcp_port_out,
3474                                        data)
3475         pkts.reverse()
3476         self.pg1.add_stream(pkts)
3477         self.pg_enable_capture(self.pg_interfaces)
3478         self.pg_start()
3479         frags = self.pg0.get_capture(len(pkts))
3480         p = self.reass_frags_and_verify(frags,
3481                                         self.pg1.remote_ip4,
3482                                         self.pg0.remote_ip4)
3483         self.assertEqual(p[TCP].sport, 20)
3484         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3485         self.assertEqual(data, p[Raw].load)
3486
3487     def test_port_restricted(self):
3488         """ Port restricted NAT44 (MAP-E CE) """
3489         self.nat44_add_address(self.nat_addr)
3490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3492                                                   is_inside=0)
3493         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3494                       "psid-offset 6 psid-len 6")
3495
3496         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3497              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3498              TCP(sport=4567, dport=22))
3499         self.pg0.add_stream(p)
3500         self.pg_enable_capture(self.pg_interfaces)
3501         self.pg_start()
3502         capture = self.pg1.get_capture(1)
3503         p = capture[0]
3504         try:
3505             ip = p[IP]
3506             tcp = p[TCP]
3507             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3508             self.assertEqual(ip.src, self.nat_addr)
3509             self.assertEqual(tcp.dport, 22)
3510             self.assertNotEqual(tcp.sport, 4567)
3511             self.assertEqual((tcp.sport >> 6) & 63, 10)
3512             self.check_tcp_checksum(p)
3513             self.check_ip_checksum(p)
3514         except:
3515             self.logger.error(ppp("Unexpected or invalid packet:", p))
3516             raise
3517
3518     def test_twice_nat(self):
3519         """ Twice NAT44 """
3520         twice_nat_addr = '10.0.1.3'
3521         port_in = 8080
3522         port_out = 80
3523         eh_port_out = 4567
3524         eh_port_in = 0
3525         self.nat44_add_address(self.nat_addr)
3526         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3527         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3528                                       port_in, port_out, proto=IP_PROTOS.tcp,
3529                                       twice_nat=1)
3530         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3531         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3532                                                   is_inside=0)
3533
3534         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3535              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3536              TCP(sport=eh_port_out, dport=port_out))
3537         self.pg1.add_stream(p)
3538         self.pg_enable_capture(self.pg_interfaces)
3539         self.pg_start()
3540         capture = self.pg0.get_capture(1)
3541         p = capture[0]
3542         try:
3543             ip = p[IP]
3544             tcp = p[TCP]
3545             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3546             self.assertEqual(ip.src, twice_nat_addr)
3547             self.assertEqual(tcp.dport, port_in)
3548             self.assertNotEqual(tcp.sport, eh_port_out)
3549             eh_port_in = tcp.sport
3550             self.check_tcp_checksum(p)
3551             self.check_ip_checksum(p)
3552         except:
3553             self.logger.error(ppp("Unexpected or invalid packet:", p))
3554             raise
3555
3556         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3557              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3558              TCP(sport=port_in, dport=eh_port_in))
3559         self.pg0.add_stream(p)
3560         self.pg_enable_capture(self.pg_interfaces)
3561         self.pg_start()
3562         capture = self.pg1.get_capture(1)
3563         p = capture[0]
3564         try:
3565             ip = p[IP]
3566             tcp = p[TCP]
3567             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3568             self.assertEqual(ip.src, self.nat_addr)
3569             self.assertEqual(tcp.dport, eh_port_out)
3570             self.assertEqual(tcp.sport, port_out)
3571             self.check_tcp_checksum(p)
3572             self.check_ip_checksum(p)
3573         except:
3574             self.logger.error(ppp("Unexpected or invalid packet:", p))
3575             raise
3576
3577     def test_twice_nat_lb(self):
3578         """ Twice NAT44 local service load balancing """
3579         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3580         twice_nat_addr = '10.0.1.3'
3581         local_port = 8080
3582         external_port = 80
3583         eh_port_out = 4567
3584         eh_port_in = 0
3585         server1 = self.pg0.remote_hosts[0]
3586         server2 = self.pg0.remote_hosts[1]
3587
3588         locals = [{'addr': server1.ip4n,
3589                    'port': local_port,
3590                    'probability': 50},
3591                   {'addr': server2.ip4n,
3592                    'port': local_port,
3593                    'probability': 50}]
3594
3595         self.nat44_add_address(self.nat_addr)
3596         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3597
3598         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3599                                                   external_port,
3600                                                   IP_PROTOS.tcp,
3601                                                   twice_nat=1,
3602                                                   local_num=len(locals),
3603                                                   locals=locals)
3604         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3605         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3606                                                   is_inside=0)
3607
3608         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3609              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3610              TCP(sport=eh_port_out, dport=external_port))
3611         self.pg1.add_stream(p)
3612         self.pg_enable_capture(self.pg_interfaces)
3613         self.pg_start()
3614         capture = self.pg0.get_capture(1)
3615         p = capture[0]
3616         server = None
3617         try:
3618             ip = p[IP]
3619             tcp = p[TCP]
3620             self.assertEqual(ip.src, twice_nat_addr)
3621             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3622             if ip.dst == server1.ip4:
3623                 server = server1
3624             else:
3625                 server = server2
3626             self.assertNotEqual(tcp.sport, eh_port_out)
3627             eh_port_in = tcp.sport
3628             self.assertEqual(tcp.dport, local_port)
3629             self.check_tcp_checksum(p)
3630             self.check_ip_checksum(p)
3631         except:
3632             self.logger.error(ppp("Unexpected or invalid packet:", p))
3633             raise
3634
3635         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3636              IP(src=server.ip4, dst=twice_nat_addr) /
3637              TCP(sport=local_port, dport=eh_port_in))
3638         self.pg0.add_stream(p)
3639         self.pg_enable_capture(self.pg_interfaces)
3640         self.pg_start()
3641         capture = self.pg1.get_capture(1)
3642         p = capture[0]
3643         try:
3644             ip = p[IP]
3645             tcp = p[TCP]
3646             self.assertEqual(ip.src, self.nat_addr)
3647             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3648             self.assertEqual(tcp.sport, external_port)
3649             self.assertEqual(tcp.dport, eh_port_out)
3650             self.check_tcp_checksum(p)
3651             self.check_ip_checksum(p)
3652         except:
3653             self.logger.error(ppp("Unexpected or invalid packet:", p))
3654             raise
3655
3656     def test_twice_nat_interface_addr(self):
3657         """ Acquire twice NAT44 addresses from interface """
3658         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3659
3660         # no address in NAT pool
3661         adresses = self.vapi.nat44_address_dump()
3662         self.assertEqual(0, len(adresses))
3663
3664         # configure interface address and check NAT address pool
3665         self.pg7.config_ip4()
3666         adresses = self.vapi.nat44_address_dump()
3667         self.assertEqual(1, len(adresses))
3668         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3669         self.assertEqual(adresses[0].twice_nat, 1)
3670
3671         # remove interface address and check NAT address pool
3672         self.pg7.unconfig_ip4()
3673         adresses = self.vapi.nat44_address_dump()
3674         self.assertEqual(0, len(adresses))
3675
3676     def test_ipfix_max_frags(self):
3677         """ IPFIX logging maximum fragments pending reassembly exceeded """
3678         self.nat44_add_address(self.nat_addr)
3679         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3680         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3681                                                   is_inside=0)
3682         self.vapi.nat_set_reass(max_frag=0)
3683         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3684                                      src_address=self.pg3.local_ip4n,
3685                                      path_mtu=512,
3686                                      template_interval=10)
3687         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3688                             src_port=self.ipfix_src_port)
3689
3690         data = "A" * 4 + "B" * 16 + "C" * 3
3691         self.tcp_port_in = random.randint(1025, 65535)
3692         pkts = self.create_stream_frag(self.pg0,
3693                                        self.pg1.remote_ip4,
3694                                        self.tcp_port_in,
3695                                        20,
3696                                        data)
3697         self.pg0.add_stream(pkts[-1])
3698         self.pg_enable_capture(self.pg_interfaces)
3699         self.pg_start()
3700         frags = self.pg1.get_capture(0)
3701         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3702         capture = self.pg3.get_capture(9)
3703         ipfix = IPFIXDecoder()
3704         # first load template
3705         for p in capture:
3706             self.assertTrue(p.haslayer(IPFIX))
3707             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3708             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3709             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3710             self.assertEqual(p[UDP].dport, 4739)
3711             self.assertEqual(p[IPFIX].observationDomainID,
3712                              self.ipfix_domain_id)
3713             if p.haslayer(Template):
3714                 ipfix.add_template(p.getlayer(Template))
3715         # verify events in data set
3716         for p in capture:
3717             if p.haslayer(Data):
3718                 data = ipfix.decode_data_set(p.getlayer(Set))
3719                 self.verify_ipfix_max_fragments_ip4(data, 0,
3720                                                     self.pg0.remote_ip4n)
3721
3722     def tearDown(self):
3723         super(TestNAT44, self).tearDown()
3724         if not self.vpp_dead:
3725             self.logger.info(self.vapi.cli("show nat44 addresses"))
3726             self.logger.info(self.vapi.cli("show nat44 interfaces"))
3727             self.logger.info(self.vapi.cli("show nat44 static mappings"))
3728             self.logger.info(self.vapi.cli("show nat44 interface address"))
3729             self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3730             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3731             self.vapi.cli("nat addr-port-assignment-alg default")
3732             self.clear_nat44()
3733
3734
3735 class TestNAT44Out2InDPO(MethodHolder):
3736     """ NAT44 Test Cases using out2in DPO """
3737
3738     @classmethod
3739     def setUpConstants(cls):
3740         super(TestNAT44Out2InDPO, cls).setUpConstants()
3741         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3742
3743     @classmethod
3744     def setUpClass(cls):
3745         super(TestNAT44Out2InDPO, cls).setUpClass()
3746
3747         try:
3748             cls.tcp_port_in = 6303
3749             cls.tcp_port_out = 6303
3750             cls.udp_port_in = 6304
3751             cls.udp_port_out = 6304
3752             cls.icmp_id_in = 6305
3753             cls.icmp_id_out = 6305
3754             cls.nat_addr = '10.0.0.3'
3755             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3756             cls.dst_ip4 = '192.168.70.1'
3757
3758             cls.create_pg_interfaces(range(2))
3759
3760             cls.pg0.admin_up()
3761             cls.pg0.config_ip4()
3762             cls.pg0.resolve_arp()
3763
3764             cls.pg1.admin_up()
3765             cls.pg1.config_ip6()
3766             cls.pg1.resolve_ndp()
3767
3768             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3769                                       dst_address_length=0,
3770                                       next_hop_address=cls.pg1.remote_ip6n,
3771                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3772
3773         except Exception:
3774             super(TestNAT44Out2InDPO, cls).tearDownClass()
3775             raise
3776
3777     def configure_xlat(self):
3778         self.dst_ip6_pfx = '1:2:3::'
3779         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3780                                               self.dst_ip6_pfx)
3781         self.dst_ip6_pfx_len = 96
3782         self.src_ip6_pfx = '4:5:6::'
3783         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3784                                               self.src_ip6_pfx)
3785         self.src_ip6_pfx_len = 96
3786         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3787                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3788                                  '\x00\x00\x00\x00', 0, is_translation=1,
3789                                  is_rfc6052=1)
3790
3791     def test_464xlat_ce(self):
3792         """ Test 464XLAT CE with NAT44 """
3793
3794         self.configure_xlat()
3795
3796         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3797         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3798
3799         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3800                                        self.dst_ip6_pfx_len)
3801         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3802                                        self.src_ip6_pfx_len)
3803
3804         try:
3805             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3806             self.pg0.add_stream(pkts)
3807             self.pg_enable_capture(self.pg_interfaces)
3808             self.pg_start()
3809             capture = self.pg1.get_capture(len(pkts))
3810             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3811                                         dst_ip=out_src_ip6)
3812
3813             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3814                                               out_dst_ip6)
3815             self.pg1.add_stream(pkts)
3816             self.pg_enable_capture(self.pg_interfaces)
3817             self.pg_start()
3818             capture = self.pg0.get_capture(len(pkts))
3819             self.verify_capture_in(capture, self.pg0)
3820         finally:
3821             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3822                                                       is_add=0)
3823             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3824                                                   self.nat_addr_n, is_add=0)
3825
3826     def test_464xlat_ce_no_nat(self):
3827         """ Test 464XLAT CE without NAT44 """
3828
3829         self.configure_xlat()
3830
3831         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3832                                        self.dst_ip6_pfx_len)
3833         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3834                                        self.src_ip6_pfx_len)
3835
3836         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3837         self.pg0.add_stream(pkts)
3838         self.pg_enable_capture(self.pg_interfaces)
3839         self.pg_start()
3840         capture = self.pg1.get_capture(len(pkts))
3841         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3842                                     nat_ip=out_dst_ip6, same_port=True)
3843
3844         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3845         self.pg1.add_stream(pkts)
3846         self.pg_enable_capture(self.pg_interfaces)
3847         self.pg_start()
3848         capture = self.pg0.get_capture(len(pkts))
3849         self.verify_capture_in(capture, self.pg0)
3850
3851
3852 class TestDeterministicNAT(MethodHolder):
3853     """ Deterministic NAT Test Cases """
3854
3855     @classmethod
3856     def setUpConstants(cls):
3857         super(TestDeterministicNAT, cls).setUpConstants()
3858         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3859
3860     @classmethod
3861     def setUpClass(cls):
3862         super(TestDeterministicNAT, cls).setUpClass()
3863
3864         try:
3865             cls.tcp_port_in = 6303
3866             cls.tcp_external_port = 6303
3867             cls.udp_port_in = 6304
3868             cls.udp_external_port = 6304
3869             cls.icmp_id_in = 6305
3870             cls.nat_addr = '10.0.0.3'
3871
3872             cls.create_pg_interfaces(range(3))
3873             cls.interfaces = list(cls.pg_interfaces)
3874
3875             for i in cls.interfaces:
3876                 i.admin_up()
3877                 i.config_ip4()
3878                 i.resolve_arp()
3879
3880             cls.pg0.generate_remote_hosts(2)
3881             cls.pg0.configure_ipv4_neighbors()
3882
3883         except Exception:
3884             super(TestDeterministicNAT, cls).tearDownClass()
3885             raise
3886
3887     def create_stream_in(self, in_if, out_if, ttl=64):
3888         """
3889         Create packet stream for inside network
3890
3891         :param in_if: Inside interface
3892         :param out_if: Outside interface
3893         :param ttl: TTL of generated packets
3894         """
3895         pkts = []
3896         # TCP
3897         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3898              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3899              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3900         pkts.append(p)
3901
3902         # UDP
3903         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3904              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3905              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3906         pkts.append(p)
3907
3908         # ICMP
3909         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3910              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3911              ICMP(id=self.icmp_id_in, type='echo-request'))
3912         pkts.append(p)
3913
3914         return pkts
3915
3916     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3917         """
3918         Create packet stream for outside network
3919
3920         :param out_if: Outside interface
3921         :param dst_ip: Destination IP address (Default use global NAT address)
3922         :param ttl: TTL of generated packets
3923         """
3924         if dst_ip is None:
3925             dst_ip = self.nat_addr
3926         pkts = []
3927         # TCP
3928         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3929              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3930              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3931         pkts.append(p)
3932
3933         # UDP
3934         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3935              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3936              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3937         pkts.append(p)
3938
3939         # ICMP
3940         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3941              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3942              ICMP(id=self.icmp_external_id, type='echo-reply'))
3943         pkts.append(p)
3944
3945         return pkts
3946
3947     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3948         """
3949         Verify captured packets on outside network
3950
3951         :param capture: Captured packets
3952         :param nat_ip: Translated IP address (Default use global NAT address)
3953         :param same_port: Sorce port number is not translated (Default False)
3954         :param packet_num: Expected number of packets (Default 3)
3955         """
3956         if nat_ip is None:
3957             nat_ip = self.nat_addr
3958         self.assertEqual(packet_num, len(capture))
3959         for packet in capture:
3960             try:
3961                 self.assertEqual(packet[IP].src, nat_ip)
3962                 if packet.haslayer(TCP):
3963                     self.tcp_port_out = packet[TCP].sport
3964                 elif packet.haslayer(UDP):
3965                     self.udp_port_out = packet[UDP].sport
3966                 else:
3967                     self.icmp_external_id = packet[ICMP].id
3968             except:
3969                 self.logger.error(ppp("Unexpected or invalid packet "
3970                                       "(outside network):", packet))
3971                 raise
3972
3973     def initiate_tcp_session(self, in_if, out_if):
3974         """
3975         Initiates TCP session
3976
3977         :param in_if: Inside interface
3978         :param out_if: Outside interface
3979         """
3980         try:
3981             # SYN packet in->out
3982             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3983                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3984                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3985                      flags="S"))
3986             in_if.add_stream(p)
3987             self.pg_enable_capture(self.pg_interfaces)
3988             self.pg_start()
3989             capture = out_if.get_capture(1)
3990             p = capture[0]
3991             self.tcp_port_out = p[TCP].sport
3992
3993             # SYN + ACK packet out->in
3994             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3995                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3996                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3997                      flags="SA"))
3998             out_if.add_stream(p)
3999             self.pg_enable_capture(self.pg_interfaces)
4000             self.pg_start()
4001             in_if.get_capture(1)
4002
4003             # ACK packet in->out
4004             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4005                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4006                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4007                      flags="A"))
4008             in_if.add_stream(p)
4009             self.pg_enable_capture(self.pg_interfaces)
4010             self.pg_start()
4011             out_if.get_capture(1)
4012
4013         except:
4014             self.logger.error("TCP 3 way handshake failed")
4015             raise
4016
4017     def verify_ipfix_max_entries_per_user(self, data):
4018         """
4019         Verify IPFIX maximum entries per user exceeded event
4020
4021         :param data: Decoded IPFIX data records
4022         """
4023         self.assertEqual(1, len(data))
4024         record = data[0]
4025         # natEvent
4026         self.assertEqual(ord(record[230]), 13)
4027         # natQuotaExceededEvent
4028         self.assertEqual('\x03\x00\x00\x00', record[466])
4029         # maxEntriesPerUser
4030         self.assertEqual('\xe8\x03\x00\x00', record[473])
4031         # sourceIPv4Address
4032         self.assertEqual(self.pg0.remote_ip4n, record[8])
4033
4034     def test_deterministic_mode(self):
4035         """ NAT plugin run deterministic mode """
4036         in_addr = '172.16.255.0'
4037         out_addr = '172.17.255.50'
4038         in_addr_t = '172.16.255.20'
4039         in_addr_n = socket.inet_aton(in_addr)
4040         out_addr_n = socket.inet_aton(out_addr)
4041         in_addr_t_n = socket.inet_aton(in_addr_t)
4042         in_plen = 24
4043         out_plen = 32
4044
4045         nat_config = self.vapi.nat_show_config()
4046         self.assertEqual(1, nat_config.deterministic)
4047
4048         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4049
4050         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4051         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4052         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4053         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4054
4055         deterministic_mappings = self.vapi.nat_det_map_dump()
4056         self.assertEqual(len(deterministic_mappings), 1)
4057         dsm = deterministic_mappings[0]
4058         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4059         self.assertEqual(in_plen, dsm.in_plen)
4060         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4061         self.assertEqual(out_plen, dsm.out_plen)
4062
4063         self.clear_nat_det()
4064         deterministic_mappings = self.vapi.nat_det_map_dump()
4065         self.assertEqual(len(deterministic_mappings), 0)
4066
4067     def test_set_timeouts(self):
4068         """ Set deterministic NAT timeouts """
4069         timeouts_before = self.vapi.nat_det_get_timeouts()
4070
4071         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4072                                        timeouts_before.tcp_established + 10,
4073                                        timeouts_before.tcp_transitory + 10,
4074                                        timeouts_before.icmp + 10)
4075
4076         timeouts_after = self.vapi.nat_det_get_timeouts()
4077
4078         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4079         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4080         self.assertNotEqual(timeouts_before.tcp_established,
4081                             timeouts_after.tcp_established)
4082         self.assertNotEqual(timeouts_before.tcp_transitory,
4083                             timeouts_after.tcp_transitory)
4084
4085     def test_det_in(self):
4086         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4087
4088         nat_ip = "10.0.0.10"
4089
4090         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4091                                       32,
4092                                       socket.inet_aton(nat_ip),
4093                                       32)
4094         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4095         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4096                                                   is_inside=0)
4097
4098         # in2out
4099         pkts = self.create_stream_in(self.pg0, self.pg1)
4100         self.pg0.add_stream(pkts)
4101         self.pg_enable_capture(self.pg_interfaces)
4102         self.pg_start()
4103         capture = self.pg1.get_capture(len(pkts))
4104         self.verify_capture_out(capture, nat_ip)
4105
4106         # out2in
4107         pkts = self.create_stream_out(self.pg1, nat_ip)
4108         self.pg1.add_stream(pkts)
4109         self.pg_enable_capture(self.pg_interfaces)
4110         self.pg_start()
4111         capture = self.pg0.get_capture(len(pkts))
4112         self.verify_capture_in(capture, self.pg0)
4113
4114         # session dump test
4115         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4116         self.assertEqual(len(sessions), 3)
4117
4118         # TCP session
4119         s = sessions[0]
4120         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4121         self.assertEqual(s.in_port, self.tcp_port_in)
4122         self.assertEqual(s.out_port, self.tcp_port_out)
4123         self.assertEqual(s.ext_port, self.tcp_external_port)
4124
4125         # UDP session
4126         s = sessions[1]
4127         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4128         self.assertEqual(s.in_port, self.udp_port_in)
4129         self.assertEqual(s.out_port, self.udp_port_out)
4130         self.assertEqual(s.ext_port, self.udp_external_port)
4131
4132         # ICMP session
4133         s = sessions[2]
4134         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4135         self.assertEqual(s.in_port, self.icmp_id_in)
4136         self.assertEqual(s.out_port, self.icmp_external_id)
4137
4138     def test_multiple_users(self):
4139         """ Deterministic NAT multiple users """
4140
4141         nat_ip = "10.0.0.10"
4142         port_in = 80
4143         external_port = 6303
4144
4145         host0 = self.pg0.remote_hosts[0]
4146         host1 = self.pg0.remote_hosts[1]
4147
4148         self.vapi.nat_det_add_del_map(host0.ip4n,
4149                                       24,
4150                                       socket.inet_aton(nat_ip),
4151                                       32)
4152         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4153         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4154                                                   is_inside=0)
4155
4156         # host0 to out
4157         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4158              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4159              TCP(sport=port_in, dport=external_port))
4160         self.pg0.add_stream(p)
4161         self.pg_enable_capture(self.pg_interfaces)
4162         self.pg_start()
4163         capture = self.pg1.get_capture(1)
4164         p = capture[0]
4165         try:
4166             ip = p[IP]
4167             tcp = p[TCP]
4168             self.assertEqual(ip.src, nat_ip)
4169             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4170             self.assertEqual(tcp.dport, external_port)
4171             port_out0 = tcp.sport
4172         except:
4173             self.logger.error(ppp("Unexpected or invalid packet:", p))
4174             raise
4175
4176         # host1 to out
4177         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4178              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4179              TCP(sport=port_in, dport=external_port))
4180         self.pg0.add_stream(p)
4181         self.pg_enable_capture(self.pg_interfaces)
4182         self.pg_start()
4183         capture = self.pg1.get_capture(1)
4184         p = capture[0]
4185         try:
4186             ip = p[IP]
4187             tcp = p[TCP]
4188             self.assertEqual(ip.src, nat_ip)
4189             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4190             self.assertEqual(tcp.dport, external_port)
4191             port_out1 = tcp.sport
4192         except:
4193             self.logger.error(ppp("Unexpected or invalid packet:", p))
4194             raise
4195
4196         dms = self.vapi.nat_det_map_dump()
4197         self.assertEqual(1, len(dms))
4198         self.assertEqual(2, dms[0].ses_num)
4199
4200         # out to host0
4201         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4202              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4203              TCP(sport=external_port, dport=port_out0))
4204         self.pg1.add_stream(p)
4205         self.pg_enable_capture(self.pg_interfaces)
4206         self.pg_start()
4207         capture = self.pg0.get_capture(1)
4208         p = capture[0]
4209         try:
4210             ip = p[IP]
4211             tcp = p[TCP]
4212             self.assertEqual(ip.src, self.pg1.remote_ip4)
4213             self.assertEqual(ip.dst, host0.ip4)
4214             self.assertEqual(tcp.dport, port_in)
4215             self.assertEqual(tcp.sport, external_port)
4216         except:
4217             self.logger.error(ppp("Unexpected or invalid packet:", p))
4218             raise
4219
4220         # out to host1
4221         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4222              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4223              TCP(sport=external_port, dport=port_out1))
4224         self.pg1.add_stream(p)
4225         self.pg_enable_capture(self.pg_interfaces)
4226         self.pg_start()
4227         capture = self.pg0.get_capture(1)
4228         p = capture[0]
4229         try:
4230             ip = p[IP]
4231             tcp = p[TCP]
4232             self.assertEqual(ip.src, self.pg1.remote_ip4)
4233             self.assertEqual(ip.dst, host1.ip4)
4234             self.assertEqual(tcp.dport, port_in)
4235             self.assertEqual(tcp.sport, external_port)
4236         except:
4237             self.logger.error(ppp("Unexpected or invalid packet", p))
4238             raise
4239
4240         # session close api test
4241         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4242                                             port_out1,
4243                                             self.pg1.remote_ip4n,
4244                                             external_port)
4245         dms = self.vapi.nat_det_map_dump()
4246         self.assertEqual(dms[0].ses_num, 1)
4247
4248         self.vapi.nat_det_close_session_in(host0.ip4n,
4249                                            port_in,
4250                                            self.pg1.remote_ip4n,
4251                                            external_port)
4252         dms = self.vapi.nat_det_map_dump()
4253         self.assertEqual(dms[0].ses_num, 0)
4254
4255     def test_tcp_session_close_detection_in(self):
4256         """ Deterministic NAT TCP session close from inside network """
4257         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4258                                       32,
4259                                       socket.inet_aton(self.nat_addr),
4260                                       32)
4261         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4262         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4263                                                   is_inside=0)
4264
4265         self.initiate_tcp_session(self.pg0, self.pg1)
4266
4267         # close the session from inside
4268         try:
4269             # FIN packet in -> out
4270             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4271                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4272                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4273                      flags="F"))
4274             self.pg0.add_stream(p)
4275             self.pg_enable_capture(self.pg_interfaces)
4276             self.pg_start()
4277             self.pg1.get_capture(1)
4278
4279             pkts = []
4280
4281             # ACK packet out -> in
4282             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4283                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4284                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4285                      flags="A"))
4286             pkts.append(p)
4287
4288             # FIN packet out -> in
4289             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4290                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4291                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4292                      flags="F"))
4293             pkts.append(p)
4294
4295             self.pg1.add_stream(pkts)
4296             self.pg_enable_capture(self.pg_interfaces)
4297             self.pg_start()
4298             self.pg0.get_capture(2)
4299
4300             # ACK packet in -> out
4301             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4302                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4303                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4304                      flags="A"))
4305             self.pg0.add_stream(p)
4306             self.pg_enable_capture(self.pg_interfaces)
4307             self.pg_start()
4308             self.pg1.get_capture(1)
4309
4310             # Check if deterministic NAT44 closed the session
4311             dms = self.vapi.nat_det_map_dump()
4312             self.assertEqual(0, dms[0].ses_num)
4313         except:
4314             self.logger.error("TCP session termination failed")
4315             raise
4316
4317     def test_tcp_session_close_detection_out(self):
4318         """ Deterministic NAT TCP session close from outside network """
4319         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4320                                       32,
4321                                       socket.inet_aton(self.nat_addr),
4322                                       32)
4323         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4324         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4325                                                   is_inside=0)
4326
4327         self.initiate_tcp_session(self.pg0, self.pg1)
4328
4329         # close the session from outside
4330         try:
4331             # FIN packet out -> in
4332             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4333                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4334                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4335                      flags="F"))
4336             self.pg1.add_stream(p)
4337             self.pg_enable_capture(self.pg_interfaces)
4338             self.pg_start()
4339             self.pg0.get_capture(1)
4340
4341             pkts = []
4342
4343             # ACK packet in -> out
4344             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4345                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4346                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4347                      flags="A"))
4348             pkts.append(p)
4349
4350             # ACK packet in -> out
4351             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4352                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4353                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4354                      flags="F"))
4355             pkts.append(p)
4356
4357             self.pg0.add_stream(pkts)
4358             self.pg_enable_capture(self.pg_interfaces)
4359             self.pg_start()
4360             self.pg1.get_capture(2)
4361
4362             # ACK packet out -> in
4363             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4364                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4365                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4366                      flags="A"))
4367             self.pg1.add_stream(p)
4368             self.pg_enable_capture(self.pg_interfaces)
4369             self.pg_start()
4370             self.pg0.get_capture(1)
4371
4372             # Check if deterministic NAT44 closed the session
4373             dms = self.vapi.nat_det_map_dump()
4374             self.assertEqual(0, dms[0].ses_num)
4375         except:
4376             self.logger.error("TCP session termination failed")
4377             raise
4378
4379     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4380     def test_session_timeout(self):
4381         """ Deterministic NAT session timeouts """
4382         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4383                                       32,
4384                                       socket.inet_aton(self.nat_addr),
4385                                       32)
4386         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4387         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4388                                                   is_inside=0)
4389
4390         self.initiate_tcp_session(self.pg0, self.pg1)
4391         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4392         pkts = self.create_stream_in(self.pg0, self.pg1)
4393         self.pg0.add_stream(pkts)
4394         self.pg_enable_capture(self.pg_interfaces)
4395         self.pg_start()
4396         capture = self.pg1.get_capture(len(pkts))
4397         sleep(15)
4398
4399         dms = self.vapi.nat_det_map_dump()
4400         self.assertEqual(0, dms[0].ses_num)
4401
4402     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4403     def test_session_limit_per_user(self):
4404         """ Deterministic NAT maximum sessions per user limit """
4405         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4406                                       32,
4407                                       socket.inet_aton(self.nat_addr),
4408                                       32)
4409         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4410         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4411                                                   is_inside=0)
4412         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4413                                      src_address=self.pg2.local_ip4n,
4414                                      path_mtu=512,
4415                                      template_interval=10)
4416         self.vapi.nat_ipfix()
4417
4418         pkts = []
4419         for port in range(1025, 2025):
4420             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4421                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4422                  UDP(sport=port, dport=port))
4423             pkts.append(p)
4424
4425         self.pg0.add_stream(pkts)
4426         self.pg_enable_capture(self.pg_interfaces)
4427         self.pg_start()
4428         capture = self.pg1.get_capture(len(pkts))
4429
4430         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4431              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4432              UDP(sport=3001, dport=3002))
4433         self.pg0.add_stream(p)
4434         self.pg_enable_capture(self.pg_interfaces)
4435         self.pg_start()
4436         capture = self.pg1.assert_nothing_captured()
4437
4438         # verify ICMP error packet
4439         capture = self.pg0.get_capture(1)
4440         p = capture[0]
4441         self.assertTrue(p.haslayer(ICMP))
4442         icmp = p[ICMP]
4443         self.assertEqual(icmp.type, 3)
4444         self.assertEqual(icmp.code, 1)
4445         self.assertTrue(icmp.haslayer(IPerror))
4446         inner_ip = icmp[IPerror]
4447         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4448         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4449
4450         dms = self.vapi.nat_det_map_dump()
4451
4452         self.assertEqual(1000, dms[0].ses_num)
4453
4454         # verify IPFIX logging
4455         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4456         sleep(1)
4457         capture = self.pg2.get_capture(2)
4458         ipfix = IPFIXDecoder()
4459         # first load template
4460         for p in capture:
4461             self.assertTrue(p.haslayer(IPFIX))
4462             if p.haslayer(Template):
4463                 ipfix.add_template(p.getlayer(Template))
4464         # verify events in data set
4465         for p in capture:
4466             if p.haslayer(Data):
4467                 data = ipfix.decode_data_set(p.getlayer(Set))
4468                 self.verify_ipfix_max_entries_per_user(data)
4469
4470     def clear_nat_det(self):
4471         """
4472         Clear deterministic NAT configuration.
4473         """
4474         self.vapi.nat_ipfix(enable=0)
4475         self.vapi.nat_det_set_timeouts()
4476         deterministic_mappings = self.vapi.nat_det_map_dump()
4477         for dsm in deterministic_mappings:
4478             self.vapi.nat_det_add_del_map(dsm.in_addr,
4479                                           dsm.in_plen,
4480                                           dsm.out_addr,
4481                                           dsm.out_plen,
4482                                           is_add=0)
4483
4484         interfaces = self.vapi.nat44_interface_dump()
4485         for intf in interfaces:
4486             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4487                                                       intf.is_inside,
4488                                                       is_add=0)
4489
4490     def tearDown(self):
4491         super(TestDeterministicNAT, self).tearDown()
4492         if not self.vpp_dead:
4493             self.logger.info(self.vapi.cli("show nat44 interfaces"))
4494             self.logger.info(
4495                 self.vapi.cli("show nat44 deterministic mappings"))
4496             self.logger.info(
4497                 self.vapi.cli("show nat44 deterministic timeouts"))
4498             self.logger.info(
4499                 self.vapi.cli("show nat44 deterministic sessions"))
4500             self.clear_nat_det()
4501
4502
4503 class TestNAT64(MethodHolder):
4504     """ NAT64 Test Cases """
4505
4506     @classmethod
4507     def setUpConstants(cls):
4508         super(TestNAT64, cls).setUpConstants()
4509         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4510                                 "nat64 st hash buckets 256", "}"])
4511
4512     @classmethod
4513     def setUpClass(cls):
4514         super(TestNAT64, cls).setUpClass()
4515
4516         try:
4517             cls.tcp_port_in = 6303
4518             cls.tcp_port_out = 6303
4519             cls.udp_port_in = 6304
4520             cls.udp_port_out = 6304
4521             cls.icmp_id_in = 6305
4522             cls.icmp_id_out = 6305
4523             cls.nat_addr = '10.0.0.3'
4524             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4525             cls.vrf1_id = 10
4526             cls.vrf1_nat_addr = '10.0.10.3'
4527             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4528                                                    cls.vrf1_nat_addr)
4529             cls.ipfix_src_port = 4739
4530             cls.ipfix_domain_id = 1
4531
4532             cls.create_pg_interfaces(range(5))
4533             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4534             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4535             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4536
4537             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4538
4539             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4540
4541             cls.pg0.generate_remote_hosts(2)
4542
4543             for i in cls.ip6_interfaces:
4544                 i.admin_up()
4545                 i.config_ip6()
4546                 i.configure_ipv6_neighbors()
4547
4548             for i in cls.ip4_interfaces:
4549                 i.admin_up()
4550                 i.config_ip4()
4551                 i.resolve_arp()
4552
4553             cls.pg3.admin_up()
4554             cls.pg3.config_ip4()
4555             cls.pg3.resolve_arp()
4556             cls.pg3.config_ip6()
4557             cls.pg3.configure_ipv6_neighbors()
4558
4559         except Exception:
4560             super(TestNAT64, cls).tearDownClass()
4561             raise
4562
4563     def test_pool(self):
4564         """ Add/delete address to NAT64 pool """
4565         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4566
4567         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4568
4569         addresses = self.vapi.nat64_pool_addr_dump()
4570         self.assertEqual(len(addresses), 1)
4571         self.assertEqual(addresses[0].address, nat_addr)
4572
4573         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4574
4575         addresses = self.vapi.nat64_pool_addr_dump()
4576         self.assertEqual(len(addresses), 0)
4577
4578     def test_interface(self):
4579         """ Enable/disable NAT64 feature on the interface """
4580         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4581         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4582
4583         interfaces = self.vapi.nat64_interface_dump()
4584         self.assertEqual(len(interfaces), 2)
4585         pg0_found = False
4586         pg1_found = False
4587         for intf in interfaces:
4588             if intf.sw_if_index == self.pg0.sw_if_index:
4589                 self.assertEqual(intf.is_inside, 1)
4590                 pg0_found = True
4591             elif intf.sw_if_index == self.pg1.sw_if_index:
4592                 self.assertEqual(intf.is_inside, 0)
4593                 pg1_found = True
4594         self.assertTrue(pg0_found)
4595         self.assertTrue(pg1_found)
4596
4597         features = self.vapi.cli("show interface features pg0")
4598         self.assertNotEqual(features.find('nat64-in2out'), -1)
4599         features = self.vapi.cli("show interface features pg1")
4600         self.assertNotEqual(features.find('nat64-out2in'), -1)
4601
4602         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4603         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4604
4605         interfaces = self.vapi.nat64_interface_dump()
4606         self.assertEqual(len(interfaces), 0)
4607
4608     def test_static_bib(self):
4609         """ Add/delete static BIB entry """
4610         in_addr = socket.inet_pton(socket.AF_INET6,
4611                                    '2001:db8:85a3::8a2e:370:7334')
4612         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4613         in_port = 1234
4614         out_port = 5678
4615         proto = IP_PROTOS.tcp
4616
4617         self.vapi.nat64_add_del_static_bib(in_addr,
4618                                            out_addr,
4619                                            in_port,
4620                                            out_port,
4621                                            proto)
4622         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4623         static_bib_num = 0
4624         for bibe in bib:
4625             if bibe.is_static:
4626                 static_bib_num += 1
4627                 self.assertEqual(bibe.i_addr, in_addr)
4628                 self.assertEqual(bibe.o_addr, out_addr)
4629                 self.assertEqual(bibe.i_port, in_port)
4630                 self.assertEqual(bibe.o_port, out_port)
4631         self.assertEqual(static_bib_num, 1)
4632
4633         self.vapi.nat64_add_del_static_bib(in_addr,
4634                                            out_addr,
4635                                            in_port,
4636                                            out_port,
4637                                            proto,
4638                                            is_add=0)
4639         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4640         static_bib_num = 0
4641         for bibe in bib:
4642             if bibe.is_static:
4643                 static_bib_num += 1
4644         self.assertEqual(static_bib_num, 0)
4645
4646     def test_set_timeouts(self):
4647         """ Set NAT64 timeouts """
4648         # verify default values
4649         timeouts = self.vapi.nat64_get_timeouts()
4650         self.assertEqual(timeouts.udp, 300)
4651         self.assertEqual(timeouts.icmp, 60)
4652         self.assertEqual(timeouts.tcp_trans, 240)
4653         self.assertEqual(timeouts.tcp_est, 7440)
4654         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4655
4656         # set and verify custom values
4657         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4658                                      tcp_est=7450, tcp_incoming_syn=10)
4659         timeouts = self.vapi.nat64_get_timeouts()
4660         self.assertEqual(timeouts.udp, 200)
4661         self.assertEqual(timeouts.icmp, 30)
4662         self.assertEqual(timeouts.tcp_trans, 250)
4663         self.assertEqual(timeouts.tcp_est, 7450)
4664         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4665
4666     def test_dynamic(self):
4667         """ NAT64 dynamic translation test """
4668         self.tcp_port_in = 6303
4669         self.udp_port_in = 6304
4670         self.icmp_id_in = 6305
4671
4672         ses_num_start = self.nat64_get_ses_num()
4673
4674         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4675                                                 self.nat_addr_n)
4676         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4677         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4678
4679         # in2out
4680         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4681         self.pg0.add_stream(pkts)
4682         self.pg_enable_capture(self.pg_interfaces)
4683         self.pg_start()
4684         capture = self.pg1.get_capture(len(pkts))
4685         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4686                                 dst_ip=self.pg1.remote_ip4)
4687
4688         # out2in
4689         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4690         self.pg1.add_stream(pkts)
4691         self.pg_enable_capture(self.pg_interfaces)
4692         self.pg_start()
4693         capture = self.pg0.get_capture(len(pkts))
4694         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4695         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4696
4697         # in2out
4698         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4699         self.pg0.add_stream(pkts)
4700         self.pg_enable_capture(self.pg_interfaces)
4701         self.pg_start()
4702         capture = self.pg1.get_capture(len(pkts))
4703         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4704                                 dst_ip=self.pg1.remote_ip4)
4705
4706         # out2in
4707         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4708         self.pg1.add_stream(pkts)
4709         self.pg_enable_capture(self.pg_interfaces)
4710         self.pg_start()
4711         capture = self.pg0.get_capture(len(pkts))
4712         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4713
4714         ses_num_end = self.nat64_get_ses_num()
4715
4716         self.assertEqual(ses_num_end - ses_num_start, 3)
4717
4718         # tenant with specific VRF
4719         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4720                                                 self.vrf1_nat_addr_n,
4721                                                 vrf_id=self.vrf1_id)
4722         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4723
4724         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4725         self.pg2.add_stream(pkts)
4726         self.pg_enable_capture(self.pg_interfaces)
4727         self.pg_start()
4728         capture = self.pg1.get_capture(len(pkts))
4729         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4730                                 dst_ip=self.pg1.remote_ip4)
4731
4732         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4733         self.pg1.add_stream(pkts)
4734         self.pg_enable_capture(self.pg_interfaces)
4735         self.pg_start()
4736         capture = self.pg2.get_capture(len(pkts))
4737         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4738
4739     def test_static(self):
4740         """ NAT64 static translation test """
4741         self.tcp_port_in = 60303
4742         self.udp_port_in = 60304
4743         self.icmp_id_in = 60305
4744         self.tcp_port_out = 60303
4745         self.udp_port_out = 60304
4746         self.icmp_id_out = 60305
4747
4748         ses_num_start = self.nat64_get_ses_num()
4749
4750         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4751                                                 self.nat_addr_n)
4752         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4753         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4754
4755         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4756                                            self.nat_addr_n,
4757                                            self.tcp_port_in,
4758                                            self.tcp_port_out,
4759                                            IP_PROTOS.tcp)
4760         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4761                                            self.nat_addr_n,
4762                                            self.udp_port_in,
4763                                            self.udp_port_out,
4764                                            IP_PROTOS.udp)
4765         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4766                                            self.nat_addr_n,
4767                                            self.icmp_id_in,
4768                                            self.icmp_id_out,
4769                                            IP_PROTOS.icmp)
4770
4771         # in2out
4772         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4773         self.pg0.add_stream(pkts)
4774         self.pg_enable_capture(self.pg_interfaces)
4775         self.pg_start()
4776         capture = self.pg1.get_capture(len(pkts))
4777         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4778                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4779
4780         # out2in
4781         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4782         self.pg1.add_stream(pkts)
4783         self.pg_enable_capture(self.pg_interfaces)
4784         self.pg_start()
4785         capture = self.pg0.get_capture(len(pkts))
4786         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4787         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4788
4789         ses_num_end = self.nat64_get_ses_num()
4790
4791         self.assertEqual(ses_num_end - ses_num_start, 3)
4792
4793     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4794     def test_session_timeout(self):
4795         """ NAT64 session timeout """
4796         self.icmp_id_in = 1234
4797         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4798                                                 self.nat_addr_n)
4799         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4800         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4801         self.vapi.nat64_set_timeouts(icmp=5)
4802
4803         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4804         self.pg0.add_stream(pkts)
4805         self.pg_enable_capture(self.pg_interfaces)
4806         self.pg_start()
4807         capture = self.pg1.get_capture(len(pkts))
4808
4809         ses_num_before_timeout = self.nat64_get_ses_num()
4810
4811         sleep(15)
4812
4813         # ICMP session after timeout
4814         ses_num_after_timeout = self.nat64_get_ses_num()
4815         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4816
4817     def test_icmp_error(self):
4818         """ NAT64 ICMP Error message translation """
4819         self.tcp_port_in = 6303
4820         self.udp_port_in = 6304
4821         self.icmp_id_in = 6305
4822
4823         ses_num_start = self.nat64_get_ses_num()
4824
4825         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4826                                                 self.nat_addr_n)
4827         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4828         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4829
4830         # send some packets to create sessions
4831         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4832         self.pg0.add_stream(pkts)
4833         self.pg_enable_capture(self.pg_interfaces)
4834         self.pg_start()
4835         capture_ip4 = self.pg1.get_capture(len(pkts))
4836         self.verify_capture_out(capture_ip4,
4837                                 nat_ip=self.nat_addr,
4838                                 dst_ip=self.pg1.remote_ip4)
4839
4840         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4841         self.pg1.add_stream(pkts)
4842         self.pg_enable_capture(self.pg_interfaces)
4843         self.pg_start()
4844         capture_ip6 = self.pg0.get_capture(len(pkts))
4845         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4846         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4847                                    self.pg0.remote_ip6)
4848
4849         # in2out
4850         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4851                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4852                 ICMPv6DestUnreach(code=1) /
4853                 packet[IPv6] for packet in capture_ip6]
4854         self.pg0.add_stream(pkts)
4855         self.pg_enable_capture(self.pg_interfaces)
4856         self.pg_start()
4857         capture = self.pg1.get_capture(len(pkts))
4858         for packet in capture:
4859             try:
4860                 self.assertEqual(packet[IP].src, self.nat_addr)
4861                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4862                 self.assertEqual(packet[ICMP].type, 3)
4863                 self.assertEqual(packet[ICMP].code, 13)
4864                 inner = packet[IPerror]
4865                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4866                 self.assertEqual(inner.dst, self.nat_addr)
4867                 self.check_icmp_checksum(packet)
4868                 if inner.haslayer(TCPerror):
4869                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4870                 elif inner.haslayer(UDPerror):
4871                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4872                 else:
4873                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4874             except:
4875                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4876                 raise
4877
4878         # out2in
4879         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4880                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4881                 ICMP(type=3, code=13) /
4882                 packet[IP] for packet in capture_ip4]
4883         self.pg1.add_stream(pkts)
4884         self.pg_enable_capture(self.pg_interfaces)
4885         self.pg_start()
4886         capture = self.pg0.get_capture(len(pkts))
4887         for packet in capture:
4888             try:
4889                 self.assertEqual(packet[IPv6].src, ip.src)
4890                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4891                 icmp = packet[ICMPv6DestUnreach]
4892                 self.assertEqual(icmp.code, 1)
4893                 inner = icmp[IPerror6]
4894                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4895                 self.assertEqual(inner.dst, ip.src)
4896                 self.check_icmpv6_checksum(packet)
4897                 if inner.haslayer(TCPerror):
4898                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4899                 elif inner.haslayer(UDPerror):
4900                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4901                 else:
4902                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4903                                      self.icmp_id_in)
4904             except:
4905                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4906                 raise
4907
4908     def test_hairpinning(self):
4909         """ NAT64 hairpinning """
4910
4911         client = self.pg0.remote_hosts[0]
4912         server = self.pg0.remote_hosts[1]
4913         server_tcp_in_port = 22
4914         server_tcp_out_port = 4022
4915         server_udp_in_port = 23
4916         server_udp_out_port = 4023
4917         client_tcp_in_port = 1234
4918         client_udp_in_port = 1235
4919         client_tcp_out_port = 0
4920         client_udp_out_port = 0
4921         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4922         nat_addr_ip6 = ip.src
4923
4924         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4925                                                 self.nat_addr_n)
4926         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4927         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4928
4929         self.vapi.nat64_add_del_static_bib(server.ip6n,
4930                                            self.nat_addr_n,
4931                                            server_tcp_in_port,
4932                                            server_tcp_out_port,
4933                                            IP_PROTOS.tcp)
4934         self.vapi.nat64_add_del_static_bib(server.ip6n,
4935                                            self.nat_addr_n,
4936                                            server_udp_in_port,
4937                                            server_udp_out_port,
4938                                            IP_PROTOS.udp)
4939
4940         # client to server
4941         pkts = []
4942         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4943              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4944              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4945         pkts.append(p)
4946         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4947              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4948              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4949         pkts.append(p)
4950         self.pg0.add_stream(pkts)
4951         self.pg_enable_capture(self.pg_interfaces)
4952         self.pg_start()
4953         capture = self.pg0.get_capture(len(pkts))
4954         for packet in capture:
4955             try:
4956                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4957                 self.assertEqual(packet[IPv6].dst, server.ip6)
4958                 if packet.haslayer(TCP):
4959                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4960                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4961                     self.check_tcp_checksum(packet)
4962                     client_tcp_out_port = packet[TCP].sport
4963                 else:
4964                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4965                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4966                     self.check_udp_checksum(packet)
4967                     client_udp_out_port = packet[UDP].sport
4968             except:
4969                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4970                 raise
4971
4972         # server to client
4973         pkts = []
4974         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4975              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4976              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4977         pkts.append(p)
4978         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4979              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4980              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4981         pkts.append(p)
4982         self.pg0.add_stream(pkts)
4983         self.pg_enable_capture(self.pg_interfaces)
4984         self.pg_start()
4985         capture = self.pg0.get_capture(len(pkts))
4986         for packet in capture:
4987             try:
4988                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4989                 self.assertEqual(packet[IPv6].dst, client.ip6)
4990                 if packet.haslayer(TCP):
4991                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4992                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4993                     self.check_tcp_checksum(packet)
4994                 else:
4995                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4996                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4997                     self.check_udp_checksum(packet)
4998             except:
4999                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5000                 raise
5001
5002         # ICMP error
5003         pkts = []
5004         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5005                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5006                 ICMPv6DestUnreach(code=1) /
5007                 packet[IPv6] for packet in capture]
5008         self.pg0.add_stream(pkts)
5009         self.pg_enable_capture(self.pg_interfaces)
5010         self.pg_start()
5011         capture = self.pg0.get_capture(len(pkts))
5012         for packet in capture:
5013             try:
5014                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5015                 self.assertEqual(packet[IPv6].dst, server.ip6)
5016                 icmp = packet[ICMPv6DestUnreach]
5017                 self.assertEqual(icmp.code, 1)
5018                 inner = icmp[IPerror6]
5019                 self.assertEqual(inner.src, server.ip6)
5020                 self.assertEqual(inner.dst, nat_addr_ip6)
5021                 self.check_icmpv6_checksum(packet)
5022                 if inner.haslayer(TCPerror):
5023                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5024                     self.assertEqual(inner[TCPerror].dport,
5025                                      client_tcp_out_port)
5026                 else:
5027                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5028                     self.assertEqual(inner[UDPerror].dport,
5029                                      client_udp_out_port)
5030             except:
5031                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5032                 raise
5033
5034     def test_prefix(self):
5035         """ NAT64 Network-Specific Prefix """
5036
5037         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5038                                                 self.nat_addr_n)
5039         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5040         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5041         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5042                                                 self.vrf1_nat_addr_n,
5043                                                 vrf_id=self.vrf1_id)
5044         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5045
5046         # Add global prefix
5047         global_pref64 = "2001:db8::"
5048         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5049         global_pref64_len = 32
5050         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5051
5052         prefix = self.vapi.nat64_prefix_dump()
5053         self.assertEqual(len(prefix), 1)
5054         self.assertEqual(prefix[0].prefix, global_pref64_n)
5055         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5056         self.assertEqual(prefix[0].vrf_id, 0)
5057
5058         # Add tenant specific prefix
5059         vrf1_pref64 = "2001:db8:122:300::"
5060         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5061         vrf1_pref64_len = 56
5062         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5063                                        vrf1_pref64_len,
5064                                        vrf_id=self.vrf1_id)
5065         prefix = self.vapi.nat64_prefix_dump()
5066         self.assertEqual(len(prefix), 2)
5067
5068         # Global prefix
5069         pkts = self.create_stream_in_ip6(self.pg0,
5070                                          self.pg1,
5071                                          pref=global_pref64,
5072                                          plen=global_pref64_len)
5073         self.pg0.add_stream(pkts)
5074         self.pg_enable_capture(self.pg_interfaces)
5075         self.pg_start()
5076         capture = self.pg1.get_capture(len(pkts))
5077         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5078                                 dst_ip=self.pg1.remote_ip4)
5079
5080         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5081         self.pg1.add_stream(pkts)
5082         self.pg_enable_capture(self.pg_interfaces)
5083         self.pg_start()
5084         capture = self.pg0.get_capture(len(pkts))
5085         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5086                                   global_pref64,
5087                                   global_pref64_len)
5088         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5089
5090         # Tenant specific prefix
5091         pkts = self.create_stream_in_ip6(self.pg2,
5092                                          self.pg1,
5093                                          pref=vrf1_pref64,
5094                                          plen=vrf1_pref64_len)
5095         self.pg2.add_stream(pkts)
5096         self.pg_enable_capture(self.pg_interfaces)
5097         self.pg_start()
5098         capture = self.pg1.get_capture(len(pkts))
5099         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5100                                 dst_ip=self.pg1.remote_ip4)
5101
5102         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5103         self.pg1.add_stream(pkts)
5104         self.pg_enable_capture(self.pg_interfaces)
5105         self.pg_start()
5106         capture = self.pg2.get_capture(len(pkts))
5107         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5108                                   vrf1_pref64,
5109                                   vrf1_pref64_len)
5110         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5111
5112     def test_unknown_proto(self):
5113         """ NAT64 translate packet with unknown protocol """
5114
5115         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5116                                                 self.nat_addr_n)
5117         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5118         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5119         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5120
5121         # in2out
5122         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5123              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5124              TCP(sport=self.tcp_port_in, dport=20))
5125         self.pg0.add_stream(p)
5126         self.pg_enable_capture(self.pg_interfaces)
5127         self.pg_start()
5128         p = self.pg1.get_capture(1)
5129
5130         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5131              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5132              GRE() /
5133              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5134              TCP(sport=1234, dport=1234))
5135         self.pg0.add_stream(p)
5136         self.pg_enable_capture(self.pg_interfaces)
5137         self.pg_start()
5138         p = self.pg1.get_capture(1)
5139         packet = p[0]
5140         try:
5141             self.assertEqual(packet[IP].src, self.nat_addr)
5142             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5143             self.assertTrue(packet.haslayer(GRE))
5144             self.check_ip_checksum(packet)
5145         except:
5146             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5147             raise
5148
5149         # out2in
5150         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5151              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5152              GRE() /
5153              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5154              TCP(sport=1234, dport=1234))
5155         self.pg1.add_stream(p)
5156         self.pg_enable_capture(self.pg_interfaces)
5157         self.pg_start()
5158         p = self.pg0.get_capture(1)
5159         packet = p[0]
5160         try:
5161             self.assertEqual(packet[IPv6].src, remote_ip6)
5162             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5163             self.assertEqual(packet[IPv6].nh, 47)
5164         except:
5165             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5166             raise
5167
5168     def test_hairpinning_unknown_proto(self):
5169         """ NAT64 translate packet with unknown protocol - hairpinning """
5170
5171         client = self.pg0.remote_hosts[0]
5172         server = self.pg0.remote_hosts[1]
5173         server_tcp_in_port = 22
5174         server_tcp_out_port = 4022
5175         client_tcp_in_port = 1234
5176         client_tcp_out_port = 1235
5177         server_nat_ip = "10.0.0.100"
5178         client_nat_ip = "10.0.0.110"
5179         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5180         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5181         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5182         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5183
5184         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5185                                                 client_nat_ip_n)
5186         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5187         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5188
5189         self.vapi.nat64_add_del_static_bib(server.ip6n,
5190                                            server_nat_ip_n,
5191                                            server_tcp_in_port,
5192                                            server_tcp_out_port,
5193                                            IP_PROTOS.tcp)
5194
5195         self.vapi.nat64_add_del_static_bib(server.ip6n,
5196                                            server_nat_ip_n,
5197                                            0,
5198                                            0,
5199                                            IP_PROTOS.gre)
5200
5201         self.vapi.nat64_add_del_static_bib(client.ip6n,
5202                                            client_nat_ip_n,
5203                                            client_tcp_in_port,
5204                                            client_tcp_out_port,
5205                                            IP_PROTOS.tcp)
5206
5207         # client to server
5208         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5209              IPv6(src=client.ip6, dst=server_nat_ip6) /
5210              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5211         self.pg0.add_stream(p)
5212         self.pg_enable_capture(self.pg_interfaces)
5213         self.pg_start()
5214         p = self.pg0.get_capture(1)
5215
5216         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5217              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5218              GRE() /
5219              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5220              TCP(sport=1234, dport=1234))
5221         self.pg0.add_stream(p)
5222         self.pg_enable_capture(self.pg_interfaces)
5223         self.pg_start()
5224         p = self.pg0.get_capture(1)
5225         packet = p[0]
5226         try:
5227             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5228             self.assertEqual(packet[IPv6].dst, server.ip6)
5229             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5230         except:
5231             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5232             raise
5233
5234         # server to client
5235         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5236              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5237              GRE() /
5238              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5239              TCP(sport=1234, dport=1234))
5240         self.pg0.add_stream(p)
5241         self.pg_enable_capture(self.pg_interfaces)
5242         self.pg_start()
5243         p = self.pg0.get_capture(1)
5244         packet = p[0]
5245         try:
5246             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5247             self.assertEqual(packet[IPv6].dst, client.ip6)
5248             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5249         except:
5250             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5251             raise
5252
5253     def test_one_armed_nat64(self):
5254         """ One armed NAT64 """
5255         external_port = 0
5256         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5257                                            '64:ff9b::',
5258                                            96)
5259
5260         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5261                                                 self.nat_addr_n)
5262         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5263         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5264
5265         # in2out
5266         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5267              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5268              TCP(sport=12345, dport=80))
5269         self.pg3.add_stream(p)
5270         self.pg_enable_capture(self.pg_interfaces)
5271         self.pg_start()
5272         capture = self.pg3.get_capture(1)
5273         p = capture[0]
5274         try:
5275             ip = p[IP]
5276             tcp = p[TCP]
5277             self.assertEqual(ip.src, self.nat_addr)
5278             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5279             self.assertNotEqual(tcp.sport, 12345)
5280             external_port = tcp.sport
5281             self.assertEqual(tcp.dport, 80)
5282             self.check_tcp_checksum(p)
5283             self.check_ip_checksum(p)
5284         except:
5285             self.logger.error(ppp("Unexpected or invalid packet:", p))
5286             raise
5287
5288         # out2in
5289         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5290              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5291              TCP(sport=80, dport=external_port))
5292         self.pg3.add_stream(p)
5293         self.pg_enable_capture(self.pg_interfaces)
5294         self.pg_start()
5295         capture = self.pg3.get_capture(1)
5296         p = capture[0]
5297         try:
5298             ip = p[IPv6]
5299             tcp = p[TCP]
5300             self.assertEqual(ip.src, remote_host_ip6)
5301             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5302             self.assertEqual(tcp.sport, 80)
5303             self.assertEqual(tcp.dport, 12345)
5304             self.check_tcp_checksum(p)
5305         except:
5306             self.logger.error(ppp("Unexpected or invalid packet:", p))
5307             raise
5308
5309     def test_frag_in_order(self):
5310         """ NAT64 translate fragments arriving in order """
5311         self.tcp_port_in = random.randint(1025, 65535)
5312
5313         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5314                                                 self.nat_addr_n)
5315         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5316         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5317
5318         reass = self.vapi.nat_reass_dump()
5319         reass_n_start = len(reass)
5320
5321         # in2out
5322         data = 'a' * 200
5323         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5324                                            self.tcp_port_in, 20, data)
5325         self.pg0.add_stream(pkts)
5326         self.pg_enable_capture(self.pg_interfaces)
5327         self.pg_start()
5328         frags = self.pg1.get_capture(len(pkts))
5329         p = self.reass_frags_and_verify(frags,
5330                                         self.nat_addr,
5331                                         self.pg1.remote_ip4)
5332         self.assertEqual(p[TCP].dport, 20)
5333         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5334         self.tcp_port_out = p[TCP].sport
5335         self.assertEqual(data, p[Raw].load)
5336
5337         # out2in
5338         data = "A" * 4 + "b" * 16 + "C" * 3
5339         pkts = self.create_stream_frag(self.pg1,
5340                                        self.nat_addr,
5341                                        20,
5342                                        self.tcp_port_out,
5343                                        data)
5344         self.pg1.add_stream(pkts)
5345         self.pg_enable_capture(self.pg_interfaces)
5346         self.pg_start()
5347         frags = self.pg0.get_capture(len(pkts))
5348         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5349         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5350         self.assertEqual(p[TCP].sport, 20)
5351         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5352         self.assertEqual(data, p[Raw].load)
5353
5354         reass = self.vapi.nat_reass_dump()
5355         reass_n_end = len(reass)
5356
5357         self.assertEqual(reass_n_end - reass_n_start, 2)
5358
5359     def test_reass_hairpinning(self):
5360         """ NAT64 fragments hairpinning """
5361         data = 'a' * 200
5362         client = self.pg0.remote_hosts[0]
5363         server = self.pg0.remote_hosts[1]
5364         server_in_port = random.randint(1025, 65535)
5365         server_out_port = random.randint(1025, 65535)
5366         client_in_port = random.randint(1025, 65535)
5367         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5368         nat_addr_ip6 = ip.src
5369
5370         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5371                                                 self.nat_addr_n)
5372         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5373         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5374
5375         # add static BIB entry for server
5376         self.vapi.nat64_add_del_static_bib(server.ip6n,
5377                                            self.nat_addr_n,
5378                                            server_in_port,
5379                                            server_out_port,
5380                                            IP_PROTOS.tcp)
5381
5382         # send packet from host to server
5383         pkts = self.create_stream_frag_ip6(self.pg0,
5384                                            self.nat_addr,
5385                                            client_in_port,
5386                                            server_out_port,
5387                                            data)
5388         self.pg0.add_stream(pkts)
5389         self.pg_enable_capture(self.pg_interfaces)
5390         self.pg_start()
5391         frags = self.pg0.get_capture(len(pkts))
5392         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5393         self.assertNotEqual(p[TCP].sport, client_in_port)
5394         self.assertEqual(p[TCP].dport, server_in_port)
5395         self.assertEqual(data, p[Raw].load)
5396
5397     def test_frag_out_of_order(self):
5398         """ NAT64 translate fragments arriving out of order """
5399         self.tcp_port_in = random.randint(1025, 65535)
5400
5401         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5402                                                 self.nat_addr_n)
5403         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5404         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5405
5406         # in2out
5407         data = 'a' * 200
5408         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5409                                            self.tcp_port_in, 20, data)
5410         pkts.reverse()
5411         self.pg0.add_stream(pkts)
5412         self.pg_enable_capture(self.pg_interfaces)
5413         self.pg_start()
5414         frags = self.pg1.get_capture(len(pkts))
5415         p = self.reass_frags_and_verify(frags,
5416                                         self.nat_addr,
5417                                         self.pg1.remote_ip4)
5418         self.assertEqual(p[TCP].dport, 20)
5419         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5420         self.tcp_port_out = p[TCP].sport
5421         self.assertEqual(data, p[Raw].load)
5422
5423         # out2in
5424         data = "A" * 4 + "B" * 16 + "C" * 3
5425         pkts = self.create_stream_frag(self.pg1,
5426                                        self.nat_addr,
5427                                        20,
5428                                        self.tcp_port_out,
5429                                        data)
5430         pkts.reverse()
5431         self.pg1.add_stream(pkts)
5432         self.pg_enable_capture(self.pg_interfaces)
5433         self.pg_start()
5434         frags = self.pg0.get_capture(len(pkts))
5435         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5436         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5437         self.assertEqual(p[TCP].sport, 20)
5438         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5439         self.assertEqual(data, p[Raw].load)
5440
5441     def test_interface_addr(self):
5442         """ Acquire NAT64 pool addresses from interface """
5443         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5444
5445         # no address in NAT64 pool
5446         adresses = self.vapi.nat44_address_dump()
5447         self.assertEqual(0, len(adresses))
5448
5449         # configure interface address and check NAT64 address pool
5450         self.pg4.config_ip4()
5451         addresses = self.vapi.nat64_pool_addr_dump()
5452         self.assertEqual(len(addresses), 1)
5453         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5454
5455         # remove interface address and check NAT64 address pool
5456         self.pg4.unconfig_ip4()
5457         addresses = self.vapi.nat64_pool_addr_dump()
5458         self.assertEqual(0, len(adresses))
5459
5460     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5461     def test_ipfix_max_bibs_sessions(self):
5462         """ IPFIX logging maximum session and BIB entries exceeded """
5463         max_bibs = 1280
5464         max_sessions = 2560
5465         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5466                                            '64:ff9b::',
5467                                            96)
5468
5469         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5470                                                 self.nat_addr_n)
5471         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5472         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5473
5474         pkts = []
5475         src = ""
5476         for i in range(0, max_bibs):
5477             src = "fd01:aa::%x" % (i)
5478             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5479                  IPv6(src=src, dst=remote_host_ip6) /
5480                  TCP(sport=12345, dport=80))
5481             pkts.append(p)
5482             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5483                  IPv6(src=src, dst=remote_host_ip6) /
5484                  TCP(sport=12345, dport=22))
5485             pkts.append(p)
5486         self.pg0.add_stream(pkts)
5487         self.pg_enable_capture(self.pg_interfaces)
5488         self.pg_start()
5489         self.pg1.get_capture(max_sessions)
5490
5491         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5492                                      src_address=self.pg3.local_ip4n,
5493                                      path_mtu=512,
5494                                      template_interval=10)
5495         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5496                             src_port=self.ipfix_src_port)
5497
5498         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5499              IPv6(src=src, dst=remote_host_ip6) /
5500              TCP(sport=12345, dport=25))
5501         self.pg0.add_stream(p)
5502         self.pg_enable_capture(self.pg_interfaces)
5503         self.pg_start()
5504         self.pg1.get_capture(0)
5505         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5506         capture = self.pg3.get_capture(9)
5507         ipfix = IPFIXDecoder()
5508         # first load template
5509         for p in capture:
5510             self.assertTrue(p.haslayer(IPFIX))
5511             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5512             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5513             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5514             self.assertEqual(p[UDP].dport, 4739)
5515             self.assertEqual(p[IPFIX].observationDomainID,
5516                              self.ipfix_domain_id)
5517             if p.haslayer(Template):
5518                 ipfix.add_template(p.getlayer(Template))
5519         # verify events in data set
5520         for p in capture:
5521             if p.haslayer(Data):
5522                 data = ipfix.decode_data_set(p.getlayer(Set))
5523                 self.verify_ipfix_max_sessions(data, max_sessions)
5524
5525         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5526              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5527              TCP(sport=12345, dport=80))
5528         self.pg0.add_stream(p)
5529         self.pg_enable_capture(self.pg_interfaces)
5530         self.pg_start()
5531         self.pg1.get_capture(0)
5532         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5533         capture = self.pg3.get_capture(1)
5534         # verify events in data set
5535         for p in capture:
5536             self.assertTrue(p.haslayer(IPFIX))
5537             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5538             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5539             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5540             self.assertEqual(p[UDP].dport, 4739)
5541             self.assertEqual(p[IPFIX].observationDomainID,
5542                              self.ipfix_domain_id)
5543             if p.haslayer(Data):
5544                 data = ipfix.decode_data_set(p.getlayer(Set))
5545                 self.verify_ipfix_max_bibs(data, max_bibs)
5546
5547     def test_ipfix_max_frags(self):
5548         """ IPFIX logging maximum fragments pending reassembly exceeded """
5549         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5550                                                 self.nat_addr_n)
5551         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5552         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5553         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5554         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5555                                      src_address=self.pg3.local_ip4n,
5556                                      path_mtu=512,
5557                                      template_interval=10)
5558         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5559                             src_port=self.ipfix_src_port)
5560
5561         data = 'a' * 200
5562         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5563                                            self.tcp_port_in, 20, data)
5564         self.pg0.add_stream(pkts[-1])
5565         self.pg_enable_capture(self.pg_interfaces)
5566         self.pg_start()
5567         self.pg1.get_capture(0)
5568         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5569         capture = self.pg3.get_capture(9)
5570         ipfix = IPFIXDecoder()
5571         # first load template
5572         for p in capture:
5573             self.assertTrue(p.haslayer(IPFIX))
5574             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5575             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5576             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5577             self.assertEqual(p[UDP].dport, 4739)
5578             self.assertEqual(p[IPFIX].observationDomainID,
5579                              self.ipfix_domain_id)
5580             if p.haslayer(Template):
5581                 ipfix.add_template(p.getlayer(Template))
5582         # verify events in data set
5583         for p in capture:
5584             if p.haslayer(Data):
5585                 data = ipfix.decode_data_set(p.getlayer(Set))
5586                 self.verify_ipfix_max_fragments_ip6(data, 0,
5587                                                     self.pg0.remote_ip6n)
5588
5589     def test_ipfix_bib_ses(self):
5590         """ IPFIX logging NAT64 BIB/session create and delete events """
5591         self.tcp_port_in = random.randint(1025, 65535)
5592         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5593                                            '64:ff9b::',
5594                                            96)
5595
5596         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5597                                                 self.nat_addr_n)
5598         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5599         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5600         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5601                                      src_address=self.pg3.local_ip4n,
5602                                      path_mtu=512,
5603                                      template_interval=10)
5604         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5605                             src_port=self.ipfix_src_port)
5606
5607         # Create
5608         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5609              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5610              TCP(sport=self.tcp_port_in, dport=25))
5611         self.pg0.add_stream(p)
5612         self.pg_enable_capture(self.pg_interfaces)
5613         self.pg_start()
5614         p = self.pg1.get_capture(1)
5615         self.tcp_port_out = p[0][TCP].sport
5616         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5617         capture = self.pg3.get_capture(10)
5618         ipfix = IPFIXDecoder()
5619         # first load template
5620         for p in capture:
5621             self.assertTrue(p.haslayer(IPFIX))
5622             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5623             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5624             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5625             self.assertEqual(p[UDP].dport, 4739)
5626             self.assertEqual(p[IPFIX].observationDomainID,
5627                              self.ipfix_domain_id)
5628             if p.haslayer(Template):
5629                 ipfix.add_template(p.getlayer(Template))
5630         # verify events in data set
5631         for p in capture:
5632             if p.haslayer(Data):
5633                 data = ipfix.decode_data_set(p.getlayer(Set))
5634                 if ord(data[0][230]) == 10:
5635                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5636                 elif ord(data[0][230]) == 6:
5637                     self.verify_ipfix_nat64_ses(data,
5638                                                 1,
5639                                                 self.pg0.remote_ip6n,
5640                                                 self.pg1.remote_ip4,
5641                                                 25)
5642                 else:
5643                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5644
5645         # Delete
5646         self.pg_enable_capture(self.pg_interfaces)
5647         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5648                                                 self.nat_addr_n,
5649                                                 is_add=0)
5650         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5651         capture = self.pg3.get_capture(2)
5652         # verify events in data set
5653         for p in capture:
5654             self.assertTrue(p.haslayer(IPFIX))
5655             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5656             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5657             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5658             self.assertEqual(p[UDP].dport, 4739)
5659             self.assertEqual(p[IPFIX].observationDomainID,
5660                              self.ipfix_domain_id)
5661             if p.haslayer(Data):
5662                 data = ipfix.decode_data_set(p.getlayer(Set))
5663                 if ord(data[0][230]) == 11:
5664                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5665                 elif ord(data[0][230]) == 7:
5666                     self.verify_ipfix_nat64_ses(data,
5667                                                 0,
5668                                                 self.pg0.remote_ip6n,
5669                                                 self.pg1.remote_ip4,
5670                                                 25)
5671                 else:
5672                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5673
5674     def nat64_get_ses_num(self):
5675         """
5676         Return number of active NAT64 sessions.
5677         """
5678         st = self.vapi.nat64_st_dump()
5679         return len(st)
5680
5681     def clear_nat64(self):
5682         """
5683         Clear NAT64 configuration.
5684         """
5685         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5686                             domain_id=self.ipfix_domain_id)
5687         self.ipfix_src_port = 4739
5688         self.ipfix_domain_id = 1
5689
5690         self.vapi.nat64_set_timeouts()
5691
5692         interfaces = self.vapi.nat64_interface_dump()
5693         for intf in interfaces:
5694             if intf.is_inside > 1:
5695                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5696                                                   0,
5697                                                   is_add=0)
5698             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5699                                               intf.is_inside,
5700                                               is_add=0)
5701
5702         bib = self.vapi.nat64_bib_dump(255)
5703         for bibe in bib:
5704             if bibe.is_static:
5705                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5706                                                    bibe.o_addr,
5707                                                    bibe.i_port,
5708                                                    bibe.o_port,
5709                                                    bibe.proto,
5710                                                    bibe.vrf_id,
5711                                                    is_add=0)
5712
5713         adresses = self.vapi.nat64_pool_addr_dump()
5714         for addr in adresses:
5715             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5716                                                     addr.address,
5717                                                     vrf_id=addr.vrf_id,
5718                                                     is_add=0)
5719
5720         prefixes = self.vapi.nat64_prefix_dump()
5721         for prefix in prefixes:
5722             self.vapi.nat64_add_del_prefix(prefix.prefix,
5723                                            prefix.prefix_len,
5724                                            vrf_id=prefix.vrf_id,
5725                                            is_add=0)
5726
5727     def tearDown(self):
5728         super(TestNAT64, self).tearDown()
5729         if not self.vpp_dead:
5730             self.logger.info(self.vapi.cli("show nat64 pool"))
5731             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5732             self.logger.info(self.vapi.cli("show nat64 prefix"))
5733             self.logger.info(self.vapi.cli("show nat64 bib all"))
5734             self.logger.info(self.vapi.cli("show nat64 session table all"))
5735             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5736             self.clear_nat64()
5737
5738
5739 class TestDSlite(MethodHolder):
5740     """ DS-Lite Test Cases """
5741
5742     @classmethod
5743     def setUpClass(cls):
5744         super(TestDSlite, cls).setUpClass()
5745
5746         try:
5747             cls.nat_addr = '10.0.0.3'
5748             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5749
5750             cls.create_pg_interfaces(range(2))
5751             cls.pg0.admin_up()
5752             cls.pg0.config_ip4()
5753             cls.pg0.resolve_arp()
5754             cls.pg1.admin_up()
5755             cls.pg1.config_ip6()
5756             cls.pg1.generate_remote_hosts(2)
5757             cls.pg1.configure_ipv6_neighbors()
5758
5759         except Exception:
5760             super(TestDSlite, cls).tearDownClass()
5761             raise
5762
5763     def test_dslite(self):
5764         """ Test DS-Lite """
5765         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5766                                                  self.nat_addr_n)
5767         aftr_ip4 = '192.0.0.1'
5768         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5769         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5770         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5771         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5772
5773         # UDP
5774         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5775              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5776              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5777              UDP(sport=20000, dport=10000))
5778         self.pg1.add_stream(p)
5779         self.pg_enable_capture(self.pg_interfaces)
5780         self.pg_start()
5781         capture = self.pg0.get_capture(1)
5782         capture = capture[0]
5783         self.assertFalse(capture.haslayer(IPv6))
5784         self.assertEqual(capture[IP].src, self.nat_addr)
5785         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5786         self.assertNotEqual(capture[UDP].sport, 20000)
5787         self.assertEqual(capture[UDP].dport, 10000)
5788         self.check_ip_checksum(capture)
5789         out_port = capture[UDP].sport
5790
5791         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5792              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5793              UDP(sport=10000, dport=out_port))
5794         self.pg0.add_stream(p)
5795         self.pg_enable_capture(self.pg_interfaces)
5796         self.pg_start()
5797         capture = self.pg1.get_capture(1)
5798         capture = capture[0]
5799         self.assertEqual(capture[IPv6].src, aftr_ip6)
5800         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5801         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5802         self.assertEqual(capture[IP].dst, '192.168.1.1')
5803         self.assertEqual(capture[UDP].sport, 10000)
5804         self.assertEqual(capture[UDP].dport, 20000)
5805         self.check_ip_checksum(capture)
5806
5807         # TCP
5808         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5809              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5810              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5811              TCP(sport=20001, dport=10001))
5812         self.pg1.add_stream(p)
5813         self.pg_enable_capture(self.pg_interfaces)
5814         self.pg_start()
5815         capture = self.pg0.get_capture(1)
5816         capture = capture[0]
5817         self.assertFalse(capture.haslayer(IPv6))
5818         self.assertEqual(capture[IP].src, self.nat_addr)
5819         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5820         self.assertNotEqual(capture[TCP].sport, 20001)
5821         self.assertEqual(capture[TCP].dport, 10001)
5822         self.check_ip_checksum(capture)
5823         self.check_tcp_checksum(capture)
5824         out_port = capture[TCP].sport
5825
5826         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5827              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5828              TCP(sport=10001, dport=out_port))
5829         self.pg0.add_stream(p)
5830         self.pg_enable_capture(self.pg_interfaces)
5831         self.pg_start()
5832         capture = self.pg1.get_capture(1)
5833         capture = capture[0]
5834         self.assertEqual(capture[IPv6].src, aftr_ip6)
5835         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5836         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5837         self.assertEqual(capture[IP].dst, '192.168.1.1')
5838         self.assertEqual(capture[TCP].sport, 10001)
5839         self.assertEqual(capture[TCP].dport, 20001)
5840         self.check_ip_checksum(capture)
5841         self.check_tcp_checksum(capture)
5842
5843         # ICMP
5844         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5845              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5846              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5847              ICMP(id=4000, type='echo-request'))
5848         self.pg1.add_stream(p)
5849         self.pg_enable_capture(self.pg_interfaces)
5850         self.pg_start()
5851         capture = self.pg0.get_capture(1)
5852         capture = capture[0]
5853         self.assertFalse(capture.haslayer(IPv6))
5854         self.assertEqual(capture[IP].src, self.nat_addr)
5855         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5856         self.assertNotEqual(capture[ICMP].id, 4000)
5857         self.check_ip_checksum(capture)
5858         self.check_icmp_checksum(capture)
5859         out_id = capture[ICMP].id
5860
5861         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5862              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5863              ICMP(id=out_id, type='echo-reply'))
5864         self.pg0.add_stream(p)
5865         self.pg_enable_capture(self.pg_interfaces)
5866         self.pg_start()
5867         capture = self.pg1.get_capture(1)
5868         capture = capture[0]
5869         self.assertEqual(capture[IPv6].src, aftr_ip6)
5870         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5871         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5872         self.assertEqual(capture[IP].dst, '192.168.1.1')
5873         self.assertEqual(capture[ICMP].id, 4000)
5874         self.check_ip_checksum(capture)
5875         self.check_icmp_checksum(capture)
5876
5877         # ping DS-Lite AFTR tunnel endpoint address
5878         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5879              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5880              ICMPv6EchoRequest())
5881         self.pg1.add_stream(p)
5882         self.pg_enable_capture(self.pg_interfaces)
5883         self.pg_start()
5884         capture = self.pg1.get_capture(1)
5885         self.assertEqual(1, len(capture))
5886         capture = capture[0]
5887         self.assertEqual(capture[IPv6].src, aftr_ip6)
5888         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5889         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5890
5891     def tearDown(self):
5892         super(TestDSlite, self).tearDown()
5893         if not self.vpp_dead:
5894             self.logger.info(self.vapi.cli("show dslite pool"))
5895             self.logger.info(
5896                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5897             self.logger.info(self.vapi.cli("show dslite sessions"))
5898
5899
5900 class TestDSliteCE(MethodHolder):
5901     """ DS-Lite CE Test Cases """
5902
5903     @classmethod
5904     def setUpConstants(cls):
5905         super(TestDSliteCE, cls).setUpConstants()
5906         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5907
5908     @classmethod
5909     def setUpClass(cls):
5910         super(TestDSliteCE, cls).setUpClass()
5911
5912         try:
5913             cls.create_pg_interfaces(range(2))
5914             cls.pg0.admin_up()
5915             cls.pg0.config_ip4()
5916             cls.pg0.resolve_arp()
5917             cls.pg1.admin_up()
5918             cls.pg1.config_ip6()
5919             cls.pg1.generate_remote_hosts(1)
5920             cls.pg1.configure_ipv6_neighbors()
5921
5922         except Exception:
5923             super(TestDSliteCE, cls).tearDownClass()
5924             raise
5925
5926     def test_dslite_ce(self):
5927         """ Test DS-Lite CE """
5928
5929         b4_ip4 = '192.0.0.2'
5930         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5931         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5932         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5933         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5934
5935         aftr_ip4 = '192.0.0.1'
5936         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5937         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5938         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5939         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5940
5941         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5942                                    dst_address_length=128,
5943                                    next_hop_address=self.pg1.remote_ip6n,
5944                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5945                                    is_ipv6=1)
5946
5947         # UDP encapsulation
5948         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5949              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5950              UDP(sport=10000, dport=20000))
5951         self.pg0.add_stream(p)
5952         self.pg_enable_capture(self.pg_interfaces)
5953         self.pg_start()
5954         capture = self.pg1.get_capture(1)
5955         capture = capture[0]
5956         self.assertEqual(capture[IPv6].src, b4_ip6)
5957         self.assertEqual(capture[IPv6].dst, aftr_ip6)
5958         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5959         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5960         self.assertEqual(capture[UDP].sport, 10000)
5961         self.assertEqual(capture[UDP].dport, 20000)
5962         self.check_ip_checksum(capture)
5963
5964         # UDP decapsulation
5965         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5966              IPv6(dst=b4_ip6, src=aftr_ip6) /
5967              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5968              UDP(sport=20000, dport=10000))
5969         self.pg1.add_stream(p)
5970         self.pg_enable_capture(self.pg_interfaces)
5971         self.pg_start()
5972         capture = self.pg0.get_capture(1)
5973         capture = capture[0]
5974         self.assertFalse(capture.haslayer(IPv6))
5975         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5976         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5977         self.assertEqual(capture[UDP].sport, 20000)
5978         self.assertEqual(capture[UDP].dport, 10000)
5979         self.check_ip_checksum(capture)
5980
5981         # ping DS-Lite B4 tunnel endpoint address
5982         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5983              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5984              ICMPv6EchoRequest())
5985         self.pg1.add_stream(p)
5986         self.pg_enable_capture(self.pg_interfaces)
5987         self.pg_start()
5988         capture = self.pg1.get_capture(1)
5989         self.assertEqual(1, len(capture))
5990         capture = capture[0]
5991         self.assertEqual(capture[IPv6].src, b4_ip6)
5992         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5993         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5994
5995     def tearDown(self):
5996         super(TestDSliteCE, self).tearDown()
5997         if not self.vpp_dead:
5998             self.logger.info(
5999                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6000             self.logger.info(
6001                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6002
6003 if __name__ == '__main__':
6004     unittest.main(testRunner=VppTestRunner)