NAT44: nat44_static_mapping_details protocol=0 if addr_only=0 (VPP-1158)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 tag=sm.tag,
1034                 is_add=0)
1035
1036         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037         for lb_sm in lb_static_mappings:
1038             self.vapi.nat44_add_del_lb_static_mapping(
1039                 lb_sm.external_addr,
1040                 lb_sm.external_port,
1041                 lb_sm.protocol,
1042                 vrf_id=lb_sm.vrf_id,
1043                 twice_nat=lb_sm.twice_nat,
1044                 out2in_only=lb_sm.out2in_only,
1045                 tag=lb_sm.tag,
1046                 is_add=0,
1047                 local_num=0,
1048                 locals=[])
1049
1050         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051         for id_m in identity_mappings:
1052             self.vapi.nat44_add_del_identity_mapping(
1053                 addr_only=id_m.addr_only,
1054                 ip=id_m.ip_address,
1055                 port=id_m.port,
1056                 sw_if_index=id_m.sw_if_index,
1057                 vrf_id=id_m.vrf_id,
1058                 protocol=id_m.protocol,
1059                 is_add=0)
1060
1061         adresses = self.vapi.nat44_address_dump()
1062         for addr in adresses:
1063             self.vapi.nat44_add_del_address_range(addr.ip_address,
1064                                                   addr.ip_address,
1065                                                   twice_nat=addr.twice_nat,
1066                                                   is_add=0)
1067
1068         self.vapi.nat_set_reass()
1069         self.vapi.nat_set_reass(is_ip6=1)
1070
1071     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072                                  local_port=0, external_port=0, vrf_id=0,
1073                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1074                                  proto=0, twice_nat=0, out2in_only=0, tag=""):
1075         """
1076         Add/delete NAT44 static mapping
1077
1078         :param local_ip: Local IP address
1079         :param external_ip: External IP address
1080         :param local_port: Local port number (Optional)
1081         :param external_port: External port number (Optional)
1082         :param vrf_id: VRF ID (Default 0)
1083         :param is_add: 1 if add, 0 if delete (Default add)
1084         :param external_sw_if_index: External interface instead of IP address
1085         :param proto: IP protocol (Mandatory if port specified)
1086         :param twice_nat: 1 if translate external host address and port
1087         :param out2in_only: if 1 rule is matching only out2in direction
1088         :param tag: Opaque string tag
1089         """
1090         addr_only = 1
1091         if local_port and external_port:
1092             addr_only = 0
1093         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095         self.vapi.nat44_add_del_static_mapping(
1096             l_ip,
1097             e_ip,
1098             external_sw_if_index,
1099             local_port,
1100             external_port,
1101             addr_only,
1102             vrf_id,
1103             proto,
1104             twice_nat,
1105             out2in_only,
1106             tag,
1107             is_add)
1108
1109     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1110         """
1111         Add/delete NAT44 address
1112
1113         :param ip: IP address
1114         :param is_add: 1 if add, 0 if delete (Default add)
1115         :param twice_nat: twice NAT address for extenal hosts
1116         """
1117         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1119                                               vrf_id=vrf_id,
1120                                               twice_nat=twice_nat)
1121
1122     def test_dynamic(self):
1123         """ NAT44 dynamic translation test """
1124
1125         self.nat44_add_address(self.nat_addr)
1126         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1128                                                   is_inside=0)
1129
1130         # in2out
1131         pkts = self.create_stream_in(self.pg0, self.pg1)
1132         self.pg0.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135         capture = self.pg1.get_capture(len(pkts))
1136         self.verify_capture_out(capture)
1137
1138         # out2in
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147         """ NAT44 handling of client packets with TTL=1 """
1148
1149         self.nat44_add_address(self.nat_addr)
1150         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                   is_inside=0)
1153
1154         # Client side - generate traffic
1155         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156         self.pg0.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159
1160         # Client side - verify ICMP type 11 packets
1161         capture = self.pg0.get_capture(len(pkts))
1162         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163
1164     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165         """ NAT44 handling of server packets with TTL=1 """
1166
1167         self.nat44_add_address(self.nat_addr)
1168         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170                                                   is_inside=0)
1171
1172         # Client side - create sessions
1173         pkts = self.create_stream_in(self.pg0, self.pg1)
1174         self.pg0.add_stream(pkts)
1175         self.pg_enable_capture(self.pg_interfaces)
1176         self.pg_start()
1177
1178         # Server side - generate traffic
1179         capture = self.pg1.get_capture(len(pkts))
1180         self.verify_capture_out(capture)
1181         pkts = self.create_stream_out(self.pg1, ttl=1)
1182         self.pg1.add_stream(pkts)
1183         self.pg_enable_capture(self.pg_interfaces)
1184         self.pg_start()
1185
1186         # Server side - verify ICMP type 11 packets
1187         capture = self.pg1.get_capture(len(pkts))
1188         self.verify_capture_out_with_icmp_errors(capture,
1189                                                  src_ip=self.pg1.local_ip4)
1190
1191     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192         """ NAT44 handling of error responses to client packets with TTL=2 """
1193
1194         self.nat44_add_address(self.nat_addr)
1195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1197                                                   is_inside=0)
1198
1199         # Client side - generate traffic
1200         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201         self.pg0.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204
1205         # Server side - simulate ICMP type 11 response
1206         capture = self.pg1.get_capture(len(pkts))
1207         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209                 ICMP(type=11) / packet[IP] for packet in capture]
1210         self.pg1.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213
1214         # Client side - verify ICMP type 11 packets
1215         capture = self.pg0.get_capture(len(pkts))
1216         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1217
1218     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219         """ NAT44 handling of error responses to server packets with TTL=2 """
1220
1221         self.nat44_add_address(self.nat_addr)
1222         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1224                                                   is_inside=0)
1225
1226         # Client side - create sessions
1227         pkts = self.create_stream_in(self.pg0, self.pg1)
1228         self.pg0.add_stream(pkts)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231
1232         # Server side - generate traffic
1233         capture = self.pg1.get_capture(len(pkts))
1234         self.verify_capture_out(capture)
1235         pkts = self.create_stream_out(self.pg1, ttl=2)
1236         self.pg1.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239
1240         # Client side - simulate ICMP type 11 response
1241         capture = self.pg0.get_capture(len(pkts))
1242         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244                 ICMP(type=11) / packet[IP] for packet in capture]
1245         self.pg0.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248
1249         # Server side - verify ICMP type 11 packets
1250         capture = self.pg1.get_capture(len(pkts))
1251         self.verify_capture_out_with_icmp_errors(capture)
1252
1253     def test_ping_out_interface_from_outside(self):
1254         """ Ping NAT44 out interface from outside network """
1255
1256         self.nat44_add_address(self.nat_addr)
1257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259                                                   is_inside=0)
1260
1261         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263              ICMP(id=self.icmp_id_out, type='echo-request'))
1264         pkts = [p]
1265         self.pg1.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg1.get_capture(len(pkts))
1269         self.assertEqual(1, len(capture))
1270         packet = capture[0]
1271         try:
1272             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1276         except:
1277             self.logger.error(ppp("Unexpected or invalid packet "
1278                                   "(outside network):", packet))
1279             raise
1280
1281     def test_ping_internal_host_from_outside(self):
1282         """ Ping internal host from outside network """
1283
1284         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287                                                   is_inside=0)
1288
1289         # out2in
1290         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292                ICMP(id=self.icmp_id_out, type='echo-request'))
1293         self.pg1.add_stream(pkt)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         capture = self.pg0.get_capture(1)
1297         self.verify_capture_in(capture, self.pg0, packet_num=1)
1298         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1299
1300         # in2out
1301         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303                ICMP(id=self.icmp_id_in, type='echo-reply'))
1304         self.pg0.add_stream(pkt)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         capture = self.pg1.get_capture(1)
1308         self.verify_capture_out(capture, same_port=True, packet_num=1)
1309         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1310
1311     def test_forwarding(self):
1312         """ NAT44 forwarding test """
1313
1314         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1316                                                   is_inside=0)
1317         self.vapi.nat44_forwarding_enable_disable(1)
1318
1319         real_ip = self.pg0.remote_ip4n
1320         alias_ip = self.nat_addr_n
1321         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322                                                external_ip=alias_ip)
1323
1324         try:
1325             # in2out - static mapping match
1326
1327             pkts = self.create_stream_out(self.pg1)
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, same_port=True)
1340
1341             # in2out - no static mapping match
1342
1343             host0 = self.pg0.remote_hosts[0]
1344             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1345             try:
1346                 pkts = self.create_stream_out(self.pg1,
1347                                               dst_ip=self.pg0.remote_ip4,
1348                                               use_inside_ports=True)
1349                 self.pg1.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg0.get_capture(len(pkts))
1353                 self.verify_capture_in(capture, self.pg0)
1354
1355                 pkts = self.create_stream_in(self.pg0, self.pg1)
1356                 self.pg0.add_stream(pkts)
1357                 self.pg_enable_capture(self.pg_interfaces)
1358                 self.pg_start()
1359                 capture = self.pg1.get_capture(len(pkts))
1360                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1361                                         same_port=True)
1362             finally:
1363                 self.pg0.remote_hosts[0] = host0
1364
1365         finally:
1366             self.vapi.nat44_forwarding_enable_disable(0)
1367             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368                                                    external_ip=alias_ip,
1369                                                    is_add=0)
1370
1371     def test_static_in(self):
1372         """ 1:1 NAT initialized from inside network """
1373
1374         nat_ip = "10.0.0.10"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378
1379         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1382                                                   is_inside=0)
1383         sm = self.vapi.nat44_static_mapping_dump()
1384         self.assertEqual(len(sm), 1)
1385         self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386         self.assertEqual(sm[0].protocol, 0)
1387         self.assertEqual(sm[0].local_port, 0)
1388         self.assertEqual(sm[0].external_port, 0)
1389
1390         # in2out
1391         pkts = self.create_stream_in(self.pg0, self.pg1)
1392         self.pg0.add_stream(pkts)
1393         self.pg_enable_capture(self.pg_interfaces)
1394         self.pg_start()
1395         capture = self.pg1.get_capture(len(pkts))
1396         self.verify_capture_out(capture, nat_ip, True)
1397
1398         # out2in
1399         pkts = self.create_stream_out(self.pg1, nat_ip)
1400         self.pg1.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg0.get_capture(len(pkts))
1404         self.verify_capture_in(capture, self.pg0)
1405
1406     def test_static_out(self):
1407         """ 1:1 NAT initialized from outside network """
1408
1409         nat_ip = "10.0.0.20"
1410         self.tcp_port_out = 6303
1411         self.udp_port_out = 6304
1412         self.icmp_id_out = 6305
1413         tag = "testTAG"
1414
1415         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1418                                                   is_inside=0)
1419         sm = self.vapi.nat44_static_mapping_dump()
1420         self.assertEqual(len(sm), 1)
1421         self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1422
1423         # out2in
1424         pkts = self.create_stream_out(self.pg1, nat_ip)
1425         self.pg1.add_stream(pkts)
1426         self.pg_enable_capture(self.pg_interfaces)
1427         self.pg_start()
1428         capture = self.pg0.get_capture(len(pkts))
1429         self.verify_capture_in(capture, self.pg0)
1430
1431         # in2out
1432         pkts = self.create_stream_in(self.pg0, self.pg1)
1433         self.pg0.add_stream(pkts)
1434         self.pg_enable_capture(self.pg_interfaces)
1435         self.pg_start()
1436         capture = self.pg1.get_capture(len(pkts))
1437         self.verify_capture_out(capture, nat_ip, True)
1438
1439     def test_static_with_port_in(self):
1440         """ 1:1 NAPT initialized from inside network """
1441
1442         self.tcp_port_out = 3606
1443         self.udp_port_out = 3607
1444         self.icmp_id_out = 3608
1445
1446         self.nat44_add_address(self.nat_addr)
1447         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448                                       self.tcp_port_in, self.tcp_port_out,
1449                                       proto=IP_PROTOS.tcp)
1450         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451                                       self.udp_port_in, self.udp_port_out,
1452                                       proto=IP_PROTOS.udp)
1453         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454                                       self.icmp_id_in, self.icmp_id_out,
1455                                       proto=IP_PROTOS.icmp)
1456         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1458                                                   is_inside=0)
1459
1460         # in2out
1461         pkts = self.create_stream_in(self.pg0, self.pg1)
1462         self.pg0.add_stream(pkts)
1463         self.pg_enable_capture(self.pg_interfaces)
1464         self.pg_start()
1465         capture = self.pg1.get_capture(len(pkts))
1466         self.verify_capture_out(capture)
1467
1468         # out2in
1469         pkts = self.create_stream_out(self.pg1)
1470         self.pg1.add_stream(pkts)
1471         self.pg_enable_capture(self.pg_interfaces)
1472         self.pg_start()
1473         capture = self.pg0.get_capture(len(pkts))
1474         self.verify_capture_in(capture, self.pg0)
1475
1476     def test_static_with_port_out(self):
1477         """ 1:1 NAPT initialized from outside network """
1478
1479         self.tcp_port_out = 30606
1480         self.udp_port_out = 30607
1481         self.icmp_id_out = 30608
1482
1483         self.nat44_add_address(self.nat_addr)
1484         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485                                       self.tcp_port_in, self.tcp_port_out,
1486                                       proto=IP_PROTOS.tcp)
1487         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488                                       self.udp_port_in, self.udp_port_out,
1489                                       proto=IP_PROTOS.udp)
1490         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491                                       self.icmp_id_in, self.icmp_id_out,
1492                                       proto=IP_PROTOS.icmp)
1493         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1495                                                   is_inside=0)
1496
1497         # out2in
1498         pkts = self.create_stream_out(self.pg1)
1499         self.pg1.add_stream(pkts)
1500         self.pg_enable_capture(self.pg_interfaces)
1501         self.pg_start()
1502         capture = self.pg0.get_capture(len(pkts))
1503         self.verify_capture_in(capture, self.pg0)
1504
1505         # in2out
1506         pkts = self.create_stream_in(self.pg0, self.pg1)
1507         self.pg0.add_stream(pkts)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         capture = self.pg1.get_capture(len(pkts))
1511         self.verify_capture_out(capture)
1512
1513     def test_static_with_port_out2(self):
1514         """ 1:1 NAPT symmetrical rule """
1515
1516         external_port = 80
1517         local_port = 8080
1518
1519         self.vapi.nat44_forwarding_enable_disable(1)
1520         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521                                       local_port, external_port,
1522                                       proto=IP_PROTOS.tcp, out2in_only=1)
1523         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1525                                                   is_inside=0)
1526
1527         # from client to service
1528         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530              TCP(sport=12345, dport=external_port))
1531         self.pg1.add_stream(p)
1532         self.pg_enable_capture(self.pg_interfaces)
1533         self.pg_start()
1534         capture = self.pg0.get_capture(1)
1535         p = capture[0]
1536         server = None
1537         try:
1538             ip = p[IP]
1539             tcp = p[TCP]
1540             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541             self.assertEqual(tcp.dport, local_port)
1542             self.check_tcp_checksum(p)
1543             self.check_ip_checksum(p)
1544         except:
1545             self.logger.error(ppp("Unexpected or invalid packet:", p))
1546             raise
1547
1548         # ICMP error
1549         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551              ICMP(type=11) / capture[0][IP])
1552         self.pg0.add_stream(p)
1553         self.pg_enable_capture(self.pg_interfaces)
1554         self.pg_start()
1555         capture = self.pg1.get_capture(1)
1556         p = capture[0]
1557         try:
1558             self.assertEqual(p[IP].src, self.nat_addr)
1559             inner = p[IPerror]
1560             self.assertEqual(inner.dst, self.nat_addr)
1561             self.assertEqual(inner[TCPerror].dport, external_port)
1562         except:
1563             self.logger.error(ppp("Unexpected or invalid packet:", p))
1564             raise
1565
1566         # from service back to client
1567         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569              TCP(sport=local_port, dport=12345))
1570         self.pg0.add_stream(p)
1571         self.pg_enable_capture(self.pg_interfaces)
1572         self.pg_start()
1573         capture = self.pg1.get_capture(1)
1574         p = capture[0]
1575         try:
1576             ip = p[IP]
1577             tcp = p[TCP]
1578             self.assertEqual(ip.src, self.nat_addr)
1579             self.assertEqual(tcp.sport, external_port)
1580             self.check_tcp_checksum(p)
1581             self.check_ip_checksum(p)
1582         except:
1583             self.logger.error(ppp("Unexpected or invalid packet:", p))
1584             raise
1585
1586         # ICMP error
1587         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589              ICMP(type=11) / capture[0][IP])
1590         self.pg1.add_stream(p)
1591         self.pg_enable_capture(self.pg_interfaces)
1592         self.pg_start()
1593         capture = self.pg0.get_capture(1)
1594         p = capture[0]
1595         try:
1596             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1597             inner = p[IPerror]
1598             self.assertEqual(inner.src, self.pg0.remote_ip4)
1599             self.assertEqual(inner[TCPerror].sport, local_port)
1600         except:
1601             self.logger.error(ppp("Unexpected or invalid packet:", p))
1602             raise
1603
1604         # from client to server (no translation)
1605         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607              TCP(sport=12346, dport=local_port))
1608         self.pg1.add_stream(p)
1609         self.pg_enable_capture(self.pg_interfaces)
1610         self.pg_start()
1611         capture = self.pg0.get_capture(1)
1612         p = capture[0]
1613         server = None
1614         try:
1615             ip = p[IP]
1616             tcp = p[TCP]
1617             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618             self.assertEqual(tcp.dport, local_port)
1619             self.check_tcp_checksum(p)
1620             self.check_ip_checksum(p)
1621         except:
1622             self.logger.error(ppp("Unexpected or invalid packet:", p))
1623             raise
1624
1625         # from service back to client (no translation)
1626         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628              TCP(sport=local_port, dport=12346))
1629         self.pg0.add_stream(p)
1630         self.pg_enable_capture(self.pg_interfaces)
1631         self.pg_start()
1632         capture = self.pg1.get_capture(1)
1633         p = capture[0]
1634         try:
1635             ip = p[IP]
1636             tcp = p[TCP]
1637             self.assertEqual(ip.src, self.pg0.remote_ip4)
1638             self.assertEqual(tcp.sport, local_port)
1639             self.check_tcp_checksum(p)
1640             self.check_ip_checksum(p)
1641         except:
1642             self.logger.error(ppp("Unexpected or invalid packet:", p))
1643             raise
1644
1645     def test_static_vrf_aware(self):
1646         """ 1:1 NAT VRF awareness """
1647
1648         nat_ip1 = "10.0.0.30"
1649         nat_ip2 = "10.0.0.40"
1650         self.tcp_port_out = 6303
1651         self.udp_port_out = 6304
1652         self.icmp_id_out = 6305
1653
1654         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1655                                       vrf_id=10)
1656         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1657                                       vrf_id=10)
1658         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1659                                                   is_inside=0)
1660         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1662
1663         # inside interface VRF match NAT44 static mapping VRF
1664         pkts = self.create_stream_in(self.pg4, self.pg3)
1665         self.pg4.add_stream(pkts)
1666         self.pg_enable_capture(self.pg_interfaces)
1667         self.pg_start()
1668         capture = self.pg3.get_capture(len(pkts))
1669         self.verify_capture_out(capture, nat_ip1, True)
1670
1671         # inside interface VRF don't match NAT44 static mapping VRF (packets
1672         # are dropped)
1673         pkts = self.create_stream_in(self.pg0, self.pg3)
1674         self.pg0.add_stream(pkts)
1675         self.pg_enable_capture(self.pg_interfaces)
1676         self.pg_start()
1677         self.pg3.assert_nothing_captured()
1678
1679     def test_identity_nat(self):
1680         """ Identity NAT """
1681
1682         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1683         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1684         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1685                                                   is_inside=0)
1686
1687         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1688              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1689              TCP(sport=12345, dport=56789))
1690         self.pg1.add_stream(p)
1691         self.pg_enable_capture(self.pg_interfaces)
1692         self.pg_start()
1693         capture = self.pg0.get_capture(1)
1694         p = capture[0]
1695         try:
1696             ip = p[IP]
1697             tcp = p[TCP]
1698             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1699             self.assertEqual(ip.src, self.pg1.remote_ip4)
1700             self.assertEqual(tcp.dport, 56789)
1701             self.assertEqual(tcp.sport, 12345)
1702             self.check_tcp_checksum(p)
1703             self.check_ip_checksum(p)
1704         except:
1705             self.logger.error(ppp("Unexpected or invalid packet:", p))
1706             raise
1707
1708     def test_static_lb(self):
1709         """ NAT44 local service load balancing """
1710         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1711         external_port = 80
1712         local_port = 8080
1713         server1 = self.pg0.remote_hosts[0]
1714         server2 = self.pg0.remote_hosts[1]
1715
1716         locals = [{'addr': server1.ip4n,
1717                    'port': local_port,
1718                    'probability': 70},
1719                   {'addr': server2.ip4n,
1720                    'port': local_port,
1721                    'probability': 30}]
1722
1723         self.nat44_add_address(self.nat_addr)
1724         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1725                                                   external_port,
1726                                                   IP_PROTOS.tcp,
1727                                                   local_num=len(locals),
1728                                                   locals=locals)
1729         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1730         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1731                                                   is_inside=0)
1732
1733         # from client to service
1734         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1736              TCP(sport=12345, dport=external_port))
1737         self.pg1.add_stream(p)
1738         self.pg_enable_capture(self.pg_interfaces)
1739         self.pg_start()
1740         capture = self.pg0.get_capture(1)
1741         p = capture[0]
1742         server = None
1743         try:
1744             ip = p[IP]
1745             tcp = p[TCP]
1746             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1747             if ip.dst == server1.ip4:
1748                 server = server1
1749             else:
1750                 server = server2
1751             self.assertEqual(tcp.dport, local_port)
1752             self.check_tcp_checksum(p)
1753             self.check_ip_checksum(p)
1754         except:
1755             self.logger.error(ppp("Unexpected or invalid packet:", p))
1756             raise
1757
1758         # from service back to client
1759         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1760              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1761              TCP(sport=local_port, dport=12345))
1762         self.pg0.add_stream(p)
1763         self.pg_enable_capture(self.pg_interfaces)
1764         self.pg_start()
1765         capture = self.pg1.get_capture(1)
1766         p = capture[0]
1767         try:
1768             ip = p[IP]
1769             tcp = p[TCP]
1770             self.assertEqual(ip.src, self.nat_addr)
1771             self.assertEqual(tcp.sport, external_port)
1772             self.check_tcp_checksum(p)
1773             self.check_ip_checksum(p)
1774         except:
1775             self.logger.error(ppp("Unexpected or invalid packet:", p))
1776             raise
1777
1778         # multiple clients
1779         server1_n = 0
1780         server2_n = 0
1781         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1782         pkts = []
1783         for client in clients:
1784             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1785                  IP(src=client, dst=self.nat_addr) /
1786                  TCP(sport=12345, dport=external_port))
1787             pkts.append(p)
1788         self.pg1.add_stream(pkts)
1789         self.pg_enable_capture(self.pg_interfaces)
1790         self.pg_start()
1791         capture = self.pg0.get_capture(len(pkts))
1792         for p in capture:
1793             if p[IP].dst == server1.ip4:
1794                 server1_n += 1
1795             else:
1796                 server2_n += 1
1797         self.assertTrue(server1_n > server2_n)
1798
1799     def test_static_lb_2(self):
1800         """ NAT44 local service load balancing (asymmetrical rule) """
1801         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1802         external_port = 80
1803         local_port = 8080
1804         server1 = self.pg0.remote_hosts[0]
1805         server2 = self.pg0.remote_hosts[1]
1806
1807         locals = [{'addr': server1.ip4n,
1808                    'port': local_port,
1809                    'probability': 70},
1810                   {'addr': server2.ip4n,
1811                    'port': local_port,
1812                    'probability': 30}]
1813
1814         self.vapi.nat44_forwarding_enable_disable(1)
1815         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1816                                                   external_port,
1817                                                   IP_PROTOS.tcp,
1818                                                   out2in_only=1,
1819                                                   local_num=len(locals),
1820                                                   locals=locals)
1821         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1822         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1823                                                   is_inside=0)
1824
1825         # from client to service
1826         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1827              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1828              TCP(sport=12345, dport=external_port))
1829         self.pg1.add_stream(p)
1830         self.pg_enable_capture(self.pg_interfaces)
1831         self.pg_start()
1832         capture = self.pg0.get_capture(1)
1833         p = capture[0]
1834         server = None
1835         try:
1836             ip = p[IP]
1837             tcp = p[TCP]
1838             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1839             if ip.dst == server1.ip4:
1840                 server = server1
1841             else:
1842                 server = server2
1843             self.assertEqual(tcp.dport, local_port)
1844             self.check_tcp_checksum(p)
1845             self.check_ip_checksum(p)
1846         except:
1847             self.logger.error(ppp("Unexpected or invalid packet:", p))
1848             raise
1849
1850         # from service back to client
1851         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1852              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1853              TCP(sport=local_port, dport=12345))
1854         self.pg0.add_stream(p)
1855         self.pg_enable_capture(self.pg_interfaces)
1856         self.pg_start()
1857         capture = self.pg1.get_capture(1)
1858         p = capture[0]
1859         try:
1860             ip = p[IP]
1861             tcp = p[TCP]
1862             self.assertEqual(ip.src, self.nat_addr)
1863             self.assertEqual(tcp.sport, external_port)
1864             self.check_tcp_checksum(p)
1865             self.check_ip_checksum(p)
1866         except:
1867             self.logger.error(ppp("Unexpected or invalid packet:", p))
1868             raise
1869
1870         # from client to server (no translation)
1871         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1872              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1873              TCP(sport=12346, dport=local_port))
1874         self.pg1.add_stream(p)
1875         self.pg_enable_capture(self.pg_interfaces)
1876         self.pg_start()
1877         capture = self.pg0.get_capture(1)
1878         p = capture[0]
1879         server = None
1880         try:
1881             ip = p[IP]
1882             tcp = p[TCP]
1883             self.assertEqual(ip.dst, server1.ip4)
1884             self.assertEqual(tcp.dport, local_port)
1885             self.check_tcp_checksum(p)
1886             self.check_ip_checksum(p)
1887         except:
1888             self.logger.error(ppp("Unexpected or invalid packet:", p))
1889             raise
1890
1891         # from service back to client (no translation)
1892         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1893              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1894              TCP(sport=local_port, dport=12346))
1895         self.pg0.add_stream(p)
1896         self.pg_enable_capture(self.pg_interfaces)
1897         self.pg_start()
1898         capture = self.pg1.get_capture(1)
1899         p = capture[0]
1900         try:
1901             ip = p[IP]
1902             tcp = p[TCP]
1903             self.assertEqual(ip.src, server1.ip4)
1904             self.assertEqual(tcp.sport, local_port)
1905             self.check_tcp_checksum(p)
1906             self.check_ip_checksum(p)
1907         except:
1908             self.logger.error(ppp("Unexpected or invalid packet:", p))
1909             raise
1910
1911     def test_multiple_inside_interfaces(self):
1912         """ NAT44 multiple non-overlapping address space inside interfaces """
1913
1914         self.nat44_add_address(self.nat_addr)
1915         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1916         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1917         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1918                                                   is_inside=0)
1919
1920         # between two NAT44 inside interfaces (no translation)
1921         pkts = self.create_stream_in(self.pg0, self.pg1)
1922         self.pg0.add_stream(pkts)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925         capture = self.pg1.get_capture(len(pkts))
1926         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1927
1928         # from NAT44 inside to interface without NAT44 feature (no translation)
1929         pkts = self.create_stream_in(self.pg0, self.pg2)
1930         self.pg0.add_stream(pkts)
1931         self.pg_enable_capture(self.pg_interfaces)
1932         self.pg_start()
1933         capture = self.pg2.get_capture(len(pkts))
1934         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1935
1936         # in2out 1st interface
1937         pkts = self.create_stream_in(self.pg0, self.pg3)
1938         self.pg0.add_stream(pkts)
1939         self.pg_enable_capture(self.pg_interfaces)
1940         self.pg_start()
1941         capture = self.pg3.get_capture(len(pkts))
1942         self.verify_capture_out(capture)
1943
1944         # out2in 1st interface
1945         pkts = self.create_stream_out(self.pg3)
1946         self.pg3.add_stream(pkts)
1947         self.pg_enable_capture(self.pg_interfaces)
1948         self.pg_start()
1949         capture = self.pg0.get_capture(len(pkts))
1950         self.verify_capture_in(capture, self.pg0)
1951
1952         # in2out 2nd interface
1953         pkts = self.create_stream_in(self.pg1, self.pg3)
1954         self.pg1.add_stream(pkts)
1955         self.pg_enable_capture(self.pg_interfaces)
1956         self.pg_start()
1957         capture = self.pg3.get_capture(len(pkts))
1958         self.verify_capture_out(capture)
1959
1960         # out2in 2nd interface
1961         pkts = self.create_stream_out(self.pg3)
1962         self.pg3.add_stream(pkts)
1963         self.pg_enable_capture(self.pg_interfaces)
1964         self.pg_start()
1965         capture = self.pg1.get_capture(len(pkts))
1966         self.verify_capture_in(capture, self.pg1)
1967
1968     def test_inside_overlapping_interfaces(self):
1969         """ NAT44 multiple inside interfaces with overlapping address space """
1970
1971         static_nat_ip = "10.0.0.10"
1972         self.nat44_add_address(self.nat_addr)
1973         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1974                                                   is_inside=0)
1975         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1976         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1977         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1978         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1979                                       vrf_id=20)
1980
1981         # between NAT44 inside interfaces with same VRF (no translation)
1982         pkts = self.create_stream_in(self.pg4, self.pg5)
1983         self.pg4.add_stream(pkts)
1984         self.pg_enable_capture(self.pg_interfaces)
1985         self.pg_start()
1986         capture = self.pg5.get_capture(len(pkts))
1987         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1988
1989         # between NAT44 inside interfaces with different VRF (hairpinning)
1990         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1991              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1992              TCP(sport=1234, dport=5678))
1993         self.pg4.add_stream(p)
1994         self.pg_enable_capture(self.pg_interfaces)
1995         self.pg_start()
1996         capture = self.pg6.get_capture(1)
1997         p = capture[0]
1998         try:
1999             ip = p[IP]
2000             tcp = p[TCP]
2001             self.assertEqual(ip.src, self.nat_addr)
2002             self.assertEqual(ip.dst, self.pg6.remote_ip4)
2003             self.assertNotEqual(tcp.sport, 1234)
2004             self.assertEqual(tcp.dport, 5678)
2005         except:
2006             self.logger.error(ppp("Unexpected or invalid packet:", p))
2007             raise
2008
2009         # in2out 1st interface
2010         pkts = self.create_stream_in(self.pg4, self.pg3)
2011         self.pg4.add_stream(pkts)
2012         self.pg_enable_capture(self.pg_interfaces)
2013         self.pg_start()
2014         capture = self.pg3.get_capture(len(pkts))
2015         self.verify_capture_out(capture)
2016
2017         # out2in 1st interface
2018         pkts = self.create_stream_out(self.pg3)
2019         self.pg3.add_stream(pkts)
2020         self.pg_enable_capture(self.pg_interfaces)
2021         self.pg_start()
2022         capture = self.pg4.get_capture(len(pkts))
2023         self.verify_capture_in(capture, self.pg4)
2024
2025         # in2out 2nd interface
2026         pkts = self.create_stream_in(self.pg5, self.pg3)
2027         self.pg5.add_stream(pkts)
2028         self.pg_enable_capture(self.pg_interfaces)
2029         self.pg_start()
2030         capture = self.pg3.get_capture(len(pkts))
2031         self.verify_capture_out(capture)
2032
2033         # out2in 2nd interface
2034         pkts = self.create_stream_out(self.pg3)
2035         self.pg3.add_stream(pkts)
2036         self.pg_enable_capture(self.pg_interfaces)
2037         self.pg_start()
2038         capture = self.pg5.get_capture(len(pkts))
2039         self.verify_capture_in(capture, self.pg5)
2040
2041         # pg5 session dump
2042         addresses = self.vapi.nat44_address_dump()
2043         self.assertEqual(len(addresses), 1)
2044         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2045         self.assertEqual(len(sessions), 3)
2046         for session in sessions:
2047             self.assertFalse(session.is_static)
2048             self.assertEqual(session.inside_ip_address[0:4],
2049                              self.pg5.remote_ip4n)
2050             self.assertEqual(session.outside_ip_address,
2051                              addresses[0].ip_address)
2052         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2053         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2054         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2055         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2056         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2057         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2058         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2059         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2060         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2061
2062         # in2out 3rd interface
2063         pkts = self.create_stream_in(self.pg6, self.pg3)
2064         self.pg6.add_stream(pkts)
2065         self.pg_enable_capture(self.pg_interfaces)
2066         self.pg_start()
2067         capture = self.pg3.get_capture(len(pkts))
2068         self.verify_capture_out(capture, static_nat_ip, True)
2069
2070         # out2in 3rd interface
2071         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2072         self.pg3.add_stream(pkts)
2073         self.pg_enable_capture(self.pg_interfaces)
2074         self.pg_start()
2075         capture = self.pg6.get_capture(len(pkts))
2076         self.verify_capture_in(capture, self.pg6)
2077
2078         # general user and session dump verifications
2079         users = self.vapi.nat44_user_dump()
2080         self.assertTrue(len(users) >= 3)
2081         addresses = self.vapi.nat44_address_dump()
2082         self.assertEqual(len(addresses), 1)
2083         for user in users:
2084             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2085                                                          user.vrf_id)
2086             for session in sessions:
2087                 self.assertEqual(user.ip_address, session.inside_ip_address)
2088                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2089                 self.assertTrue(session.protocol in
2090                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2091                                  IP_PROTOS.icmp])
2092
2093         # pg4 session dump
2094         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2095         self.assertTrue(len(sessions) >= 4)
2096         for session in sessions:
2097             self.assertFalse(session.is_static)
2098             self.assertEqual(session.inside_ip_address[0:4],
2099                              self.pg4.remote_ip4n)
2100             self.assertEqual(session.outside_ip_address,
2101                              addresses[0].ip_address)
2102
2103         # pg6 session dump
2104         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2105         self.assertTrue(len(sessions) >= 3)
2106         for session in sessions:
2107             self.assertTrue(session.is_static)
2108             self.assertEqual(session.inside_ip_address[0:4],
2109                              self.pg6.remote_ip4n)
2110             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2111                              map(int, static_nat_ip.split('.')))
2112             self.assertTrue(session.inside_port in
2113                             [self.tcp_port_in, self.udp_port_in,
2114                              self.icmp_id_in])
2115
2116     def test_hairpinning(self):
2117         """ NAT44 hairpinning - 1:1 NAPT """
2118
2119         host = self.pg0.remote_hosts[0]
2120         server = self.pg0.remote_hosts[1]
2121         host_in_port = 1234
2122         host_out_port = 0
2123         server_in_port = 5678
2124         server_out_port = 8765
2125
2126         self.nat44_add_address(self.nat_addr)
2127         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2128         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2129                                                   is_inside=0)
2130         # add static mapping for server
2131         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2132                                       server_in_port, server_out_port,
2133                                       proto=IP_PROTOS.tcp)
2134
2135         # send packet from host to server
2136         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2137              IP(src=host.ip4, dst=self.nat_addr) /
2138              TCP(sport=host_in_port, dport=server_out_port))
2139         self.pg0.add_stream(p)
2140         self.pg_enable_capture(self.pg_interfaces)
2141         self.pg_start()
2142         capture = self.pg0.get_capture(1)
2143         p = capture[0]
2144         try:
2145             ip = p[IP]
2146             tcp = p[TCP]
2147             self.assertEqual(ip.src, self.nat_addr)
2148             self.assertEqual(ip.dst, server.ip4)
2149             self.assertNotEqual(tcp.sport, host_in_port)
2150             self.assertEqual(tcp.dport, server_in_port)
2151             self.check_tcp_checksum(p)
2152             host_out_port = tcp.sport
2153         except:
2154             self.logger.error(ppp("Unexpected or invalid packet:", p))
2155             raise
2156
2157         # send reply from server to host
2158         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2159              IP(src=server.ip4, dst=self.nat_addr) /
2160              TCP(sport=server_in_port, dport=host_out_port))
2161         self.pg0.add_stream(p)
2162         self.pg_enable_capture(self.pg_interfaces)
2163         self.pg_start()
2164         capture = self.pg0.get_capture(1)
2165         p = capture[0]
2166         try:
2167             ip = p[IP]
2168             tcp = p[TCP]
2169             self.assertEqual(ip.src, self.nat_addr)
2170             self.assertEqual(ip.dst, host.ip4)
2171             self.assertEqual(tcp.sport, server_out_port)
2172             self.assertEqual(tcp.dport, host_in_port)
2173             self.check_tcp_checksum(p)
2174         except:
2175             self.logger.error(ppp("Unexpected or invalid packet:", p))
2176             raise
2177
2178     def test_hairpinning2(self):
2179         """ NAT44 hairpinning - 1:1 NAT"""
2180
2181         server1_nat_ip = "10.0.0.10"
2182         server2_nat_ip = "10.0.0.11"
2183         host = self.pg0.remote_hosts[0]
2184         server1 = self.pg0.remote_hosts[1]
2185         server2 = self.pg0.remote_hosts[2]
2186         server_tcp_port = 22
2187         server_udp_port = 20
2188
2189         self.nat44_add_address(self.nat_addr)
2190         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2191         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2192                                                   is_inside=0)
2193
2194         # add static mapping for servers
2195         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2196         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2197
2198         # host to server1
2199         pkts = []
2200         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2201              IP(src=host.ip4, dst=server1_nat_ip) /
2202              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2203         pkts.append(p)
2204         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2205              IP(src=host.ip4, dst=server1_nat_ip) /
2206              UDP(sport=self.udp_port_in, dport=server_udp_port))
2207         pkts.append(p)
2208         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2209              IP(src=host.ip4, dst=server1_nat_ip) /
2210              ICMP(id=self.icmp_id_in, type='echo-request'))
2211         pkts.append(p)
2212         self.pg0.add_stream(pkts)
2213         self.pg_enable_capture(self.pg_interfaces)
2214         self.pg_start()
2215         capture = self.pg0.get_capture(len(pkts))
2216         for packet in capture:
2217             try:
2218                 self.assertEqual(packet[IP].src, self.nat_addr)
2219                 self.assertEqual(packet[IP].dst, server1.ip4)
2220                 if packet.haslayer(TCP):
2221                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2222                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2223                     self.tcp_port_out = packet[TCP].sport
2224                     self.check_tcp_checksum(packet)
2225                 elif packet.haslayer(UDP):
2226                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2227                     self.assertEqual(packet[UDP].dport, server_udp_port)
2228                     self.udp_port_out = packet[UDP].sport
2229                 else:
2230                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2231                     self.icmp_id_out = packet[ICMP].id
2232             except:
2233                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2234                 raise
2235
2236         # server1 to host
2237         pkts = []
2238         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239              IP(src=server1.ip4, dst=self.nat_addr) /
2240              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2241         pkts.append(p)
2242         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2243              IP(src=server1.ip4, dst=self.nat_addr) /
2244              UDP(sport=server_udp_port, dport=self.udp_port_out))
2245         pkts.append(p)
2246         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2247              IP(src=server1.ip4, dst=self.nat_addr) /
2248              ICMP(id=self.icmp_id_out, type='echo-reply'))
2249         pkts.append(p)
2250         self.pg0.add_stream(pkts)
2251         self.pg_enable_capture(self.pg_interfaces)
2252         self.pg_start()
2253         capture = self.pg0.get_capture(len(pkts))
2254         for packet in capture:
2255             try:
2256                 self.assertEqual(packet[IP].src, server1_nat_ip)
2257                 self.assertEqual(packet[IP].dst, host.ip4)
2258                 if packet.haslayer(TCP):
2259                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2260                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2261                     self.check_tcp_checksum(packet)
2262                 elif packet.haslayer(UDP):
2263                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2264                     self.assertEqual(packet[UDP].sport, server_udp_port)
2265                 else:
2266                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2267             except:
2268                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2269                 raise
2270
2271         # server2 to server1
2272         pkts = []
2273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274              IP(src=server2.ip4, dst=server1_nat_ip) /
2275              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2276         pkts.append(p)
2277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278              IP(src=server2.ip4, dst=server1_nat_ip) /
2279              UDP(sport=self.udp_port_in, dport=server_udp_port))
2280         pkts.append(p)
2281         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282              IP(src=server2.ip4, dst=server1_nat_ip) /
2283              ICMP(id=self.icmp_id_in, type='echo-request'))
2284         pkts.append(p)
2285         self.pg0.add_stream(pkts)
2286         self.pg_enable_capture(self.pg_interfaces)
2287         self.pg_start()
2288         capture = self.pg0.get_capture(len(pkts))
2289         for packet in capture:
2290             try:
2291                 self.assertEqual(packet[IP].src, server2_nat_ip)
2292                 self.assertEqual(packet[IP].dst, server1.ip4)
2293                 if packet.haslayer(TCP):
2294                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2295                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2296                     self.tcp_port_out = packet[TCP].sport
2297                     self.check_tcp_checksum(packet)
2298                 elif packet.haslayer(UDP):
2299                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2300                     self.assertEqual(packet[UDP].dport, server_udp_port)
2301                     self.udp_port_out = packet[UDP].sport
2302                 else:
2303                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2304                     self.icmp_id_out = packet[ICMP].id
2305             except:
2306                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2307                 raise
2308
2309         # server1 to server2
2310         pkts = []
2311         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2312              IP(src=server1.ip4, dst=server2_nat_ip) /
2313              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2314         pkts.append(p)
2315         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2316              IP(src=server1.ip4, dst=server2_nat_ip) /
2317              UDP(sport=server_udp_port, dport=self.udp_port_out))
2318         pkts.append(p)
2319         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2320              IP(src=server1.ip4, dst=server2_nat_ip) /
2321              ICMP(id=self.icmp_id_out, type='echo-reply'))
2322         pkts.append(p)
2323         self.pg0.add_stream(pkts)
2324         self.pg_enable_capture(self.pg_interfaces)
2325         self.pg_start()
2326         capture = self.pg0.get_capture(len(pkts))
2327         for packet in capture:
2328             try:
2329                 self.assertEqual(packet[IP].src, server1_nat_ip)
2330                 self.assertEqual(packet[IP].dst, server2.ip4)
2331                 if packet.haslayer(TCP):
2332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2333                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2334                     self.check_tcp_checksum(packet)
2335                 elif packet.haslayer(UDP):
2336                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2337                     self.assertEqual(packet[UDP].sport, server_udp_port)
2338                 else:
2339                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2340             except:
2341                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2342                 raise
2343
2344     def test_max_translations_per_user(self):
2345         """ MAX translations per user - recycle the least recently used """
2346
2347         self.nat44_add_address(self.nat_addr)
2348         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2349         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2350                                                   is_inside=0)
2351
2352         # get maximum number of translations per user
2353         nat44_config = self.vapi.nat_show_config()
2354
2355         # send more than maximum number of translations per user packets
2356         pkts_num = nat44_config.max_translations_per_user + 5
2357         pkts = []
2358         for port in range(0, pkts_num):
2359             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2360                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2361                  TCP(sport=1025 + port))
2362             pkts.append(p)
2363         self.pg0.add_stream(pkts)
2364         self.pg_enable_capture(self.pg_interfaces)
2365         self.pg_start()
2366
2367         # verify number of translated packet
2368         self.pg1.get_capture(pkts_num)
2369
2370     def test_interface_addr(self):
2371         """ Acquire NAT44 addresses from interface """
2372         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2373
2374         # no address in NAT pool
2375         adresses = self.vapi.nat44_address_dump()
2376         self.assertEqual(0, len(adresses))
2377
2378         # configure interface address and check NAT address pool
2379         self.pg7.config_ip4()
2380         adresses = self.vapi.nat44_address_dump()
2381         self.assertEqual(1, len(adresses))
2382         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2383
2384         # remove interface address and check NAT address pool
2385         self.pg7.unconfig_ip4()
2386         adresses = self.vapi.nat44_address_dump()
2387         self.assertEqual(0, len(adresses))
2388
2389     def test_interface_addr_static_mapping(self):
2390         """ Static mapping with addresses from interface """
2391         tag = "testTAG"
2392
2393         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2394         self.nat44_add_static_mapping(
2395             '1.2.3.4',
2396             external_sw_if_index=self.pg7.sw_if_index,
2397             tag=tag)
2398
2399         # static mappings with external interface
2400         static_mappings = self.vapi.nat44_static_mapping_dump()
2401         self.assertEqual(1, len(static_mappings))
2402         self.assertEqual(self.pg7.sw_if_index,
2403                          static_mappings[0].external_sw_if_index)
2404         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2405
2406         # configure interface address and check static mappings
2407         self.pg7.config_ip4()
2408         static_mappings = self.vapi.nat44_static_mapping_dump()
2409         self.assertEqual(1, len(static_mappings))
2410         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2411                          self.pg7.local_ip4n)
2412         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2413         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2414
2415         # remove interface address and check static mappings
2416         self.pg7.unconfig_ip4()
2417         static_mappings = self.vapi.nat44_static_mapping_dump()
2418         self.assertEqual(0, len(static_mappings))
2419
2420     def test_interface_addr_identity_nat(self):
2421         """ Identity NAT with addresses from interface """
2422
2423         port = 53053
2424         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425         self.vapi.nat44_add_del_identity_mapping(
2426             sw_if_index=self.pg7.sw_if_index,
2427             port=port,
2428             protocol=IP_PROTOS.tcp,
2429             addr_only=0)
2430
2431         # identity mappings with external interface
2432         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2433         self.assertEqual(1, len(identity_mappings))
2434         self.assertEqual(self.pg7.sw_if_index,
2435                          identity_mappings[0].sw_if_index)
2436
2437         # configure interface address and check identity mappings
2438         self.pg7.config_ip4()
2439         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2440         self.assertEqual(1, len(identity_mappings))
2441         self.assertEqual(identity_mappings[0].ip_address,
2442                          self.pg7.local_ip4n)
2443         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2444         self.assertEqual(port, identity_mappings[0].port)
2445         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2446
2447         # remove interface address and check identity mappings
2448         self.pg7.unconfig_ip4()
2449         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2450         self.assertEqual(0, len(identity_mappings))
2451
2452     def test_ipfix_nat44_sess(self):
2453         """ IPFIX logging NAT44 session created/delted """
2454         self.ipfix_domain_id = 10
2455         self.ipfix_src_port = 20202
2456         colector_port = 30303
2457         bind_layers(UDP, IPFIX, dport=30303)
2458         self.nat44_add_address(self.nat_addr)
2459         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2460         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2461                                                   is_inside=0)
2462         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2463                                      src_address=self.pg3.local_ip4n,
2464                                      path_mtu=512,
2465                                      template_interval=10,
2466                                      collector_port=colector_port)
2467         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2468                             src_port=self.ipfix_src_port)
2469
2470         pkts = self.create_stream_in(self.pg0, self.pg1)
2471         self.pg0.add_stream(pkts)
2472         self.pg_enable_capture(self.pg_interfaces)
2473         self.pg_start()
2474         capture = self.pg1.get_capture(len(pkts))
2475         self.verify_capture_out(capture)
2476         self.nat44_add_address(self.nat_addr, is_add=0)
2477         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2478         capture = self.pg3.get_capture(9)
2479         ipfix = IPFIXDecoder()
2480         # first load template
2481         for p in capture:
2482             self.assertTrue(p.haslayer(IPFIX))
2483             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2484             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2485             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2486             self.assertEqual(p[UDP].dport, colector_port)
2487             self.assertEqual(p[IPFIX].observationDomainID,
2488                              self.ipfix_domain_id)
2489             if p.haslayer(Template):
2490                 ipfix.add_template(p.getlayer(Template))
2491         # verify events in data set
2492         for p in capture:
2493             if p.haslayer(Data):
2494                 data = ipfix.decode_data_set(p.getlayer(Set))
2495                 self.verify_ipfix_nat44_ses(data)
2496
2497     def test_ipfix_addr_exhausted(self):
2498         """ IPFIX logging NAT addresses exhausted """
2499         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2500         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2501                                                   is_inside=0)
2502         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2503                                      src_address=self.pg3.local_ip4n,
2504                                      path_mtu=512,
2505                                      template_interval=10)
2506         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2507                             src_port=self.ipfix_src_port)
2508
2509         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2510              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2511              TCP(sport=3025))
2512         self.pg0.add_stream(p)
2513         self.pg_enable_capture(self.pg_interfaces)
2514         self.pg_start()
2515         capture = self.pg1.get_capture(0)
2516         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2517         capture = self.pg3.get_capture(9)
2518         ipfix = IPFIXDecoder()
2519         # first load template
2520         for p in capture:
2521             self.assertTrue(p.haslayer(IPFIX))
2522             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2523             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2524             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2525             self.assertEqual(p[UDP].dport, 4739)
2526             self.assertEqual(p[IPFIX].observationDomainID,
2527                              self.ipfix_domain_id)
2528             if p.haslayer(Template):
2529                 ipfix.add_template(p.getlayer(Template))
2530         # verify events in data set
2531         for p in capture:
2532             if p.haslayer(Data):
2533                 data = ipfix.decode_data_set(p.getlayer(Set))
2534                 self.verify_ipfix_addr_exhausted(data)
2535
2536     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2537     def test_ipfix_max_sessions(self):
2538         """ IPFIX logging maximum session entries exceeded """
2539         self.nat44_add_address(self.nat_addr)
2540         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2541         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2542                                                   is_inside=0)
2543
2544         nat44_config = self.vapi.nat_show_config()
2545         max_sessions = 10 * nat44_config.translation_buckets
2546
2547         pkts = []
2548         for i in range(0, max_sessions):
2549             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2550             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2551                  IP(src=src, dst=self.pg1.remote_ip4) /
2552                  TCP(sport=1025))
2553             pkts.append(p)
2554         self.pg0.add_stream(pkts)
2555         self.pg_enable_capture(self.pg_interfaces)
2556         self.pg_start()
2557
2558         self.pg1.get_capture(max_sessions)
2559         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2560                                      src_address=self.pg3.local_ip4n,
2561                                      path_mtu=512,
2562                                      template_interval=10)
2563         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2564                             src_port=self.ipfix_src_port)
2565
2566         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2567              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2568              TCP(sport=1025))
2569         self.pg0.add_stream(p)
2570         self.pg_enable_capture(self.pg_interfaces)
2571         self.pg_start()
2572         self.pg1.get_capture(0)
2573         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2574         capture = self.pg3.get_capture(9)
2575         ipfix = IPFIXDecoder()
2576         # first load template
2577         for p in capture:
2578             self.assertTrue(p.haslayer(IPFIX))
2579             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2580             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2581             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2582             self.assertEqual(p[UDP].dport, 4739)
2583             self.assertEqual(p[IPFIX].observationDomainID,
2584                              self.ipfix_domain_id)
2585             if p.haslayer(Template):
2586                 ipfix.add_template(p.getlayer(Template))
2587         # verify events in data set
2588         for p in capture:
2589             if p.haslayer(Data):
2590                 data = ipfix.decode_data_set(p.getlayer(Set))
2591                 self.verify_ipfix_max_sessions(data, max_sessions)
2592
2593     def test_pool_addr_fib(self):
2594         """ NAT44 add pool addresses to FIB """
2595         static_addr = '10.0.0.10'
2596         self.nat44_add_address(self.nat_addr)
2597         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2599                                                   is_inside=0)
2600         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2601
2602         # NAT44 address
2603         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2604              ARP(op=ARP.who_has, pdst=self.nat_addr,
2605                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2606         self.pg1.add_stream(p)
2607         self.pg_enable_capture(self.pg_interfaces)
2608         self.pg_start()
2609         capture = self.pg1.get_capture(1)
2610         self.assertTrue(capture[0].haslayer(ARP))
2611         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2612
2613         # 1:1 NAT address
2614         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2615              ARP(op=ARP.who_has, pdst=static_addr,
2616                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2617         self.pg1.add_stream(p)
2618         self.pg_enable_capture(self.pg_interfaces)
2619         self.pg_start()
2620         capture = self.pg1.get_capture(1)
2621         self.assertTrue(capture[0].haslayer(ARP))
2622         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2623
2624         # send ARP to non-NAT44 interface
2625         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2626              ARP(op=ARP.who_has, pdst=self.nat_addr,
2627                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2628         self.pg2.add_stream(p)
2629         self.pg_enable_capture(self.pg_interfaces)
2630         self.pg_start()
2631         capture = self.pg1.get_capture(0)
2632
2633         # remove addresses and verify
2634         self.nat44_add_address(self.nat_addr, is_add=0)
2635         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2636                                       is_add=0)
2637
2638         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2639              ARP(op=ARP.who_has, pdst=self.nat_addr,
2640                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2641         self.pg1.add_stream(p)
2642         self.pg_enable_capture(self.pg_interfaces)
2643         self.pg_start()
2644         capture = self.pg1.get_capture(0)
2645
2646         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2647              ARP(op=ARP.who_has, pdst=static_addr,
2648                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2649         self.pg1.add_stream(p)
2650         self.pg_enable_capture(self.pg_interfaces)
2651         self.pg_start()
2652         capture = self.pg1.get_capture(0)
2653
2654     def test_vrf_mode(self):
2655         """ NAT44 tenant VRF aware address pool mode """
2656
2657         vrf_id1 = 1
2658         vrf_id2 = 2
2659         nat_ip1 = "10.0.0.10"
2660         nat_ip2 = "10.0.0.11"
2661
2662         self.pg0.unconfig_ip4()
2663         self.pg1.unconfig_ip4()
2664         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2665         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2666         self.pg0.set_table_ip4(vrf_id1)
2667         self.pg1.set_table_ip4(vrf_id2)
2668         self.pg0.config_ip4()
2669         self.pg1.config_ip4()
2670
2671         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2672         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2673         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2674         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2675         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2676                                                   is_inside=0)
2677
2678         # first VRF
2679         pkts = self.create_stream_in(self.pg0, self.pg2)
2680         self.pg0.add_stream(pkts)
2681         self.pg_enable_capture(self.pg_interfaces)
2682         self.pg_start()
2683         capture = self.pg2.get_capture(len(pkts))
2684         self.verify_capture_out(capture, nat_ip1)
2685
2686         # second VRF
2687         pkts = self.create_stream_in(self.pg1, self.pg2)
2688         self.pg1.add_stream(pkts)
2689         self.pg_enable_capture(self.pg_interfaces)
2690         self.pg_start()
2691         capture = self.pg2.get_capture(len(pkts))
2692         self.verify_capture_out(capture, nat_ip2)
2693
2694         self.pg0.unconfig_ip4()
2695         self.pg1.unconfig_ip4()
2696         self.pg0.set_table_ip4(0)
2697         self.pg1.set_table_ip4(0)
2698         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2699         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2700
2701     def test_vrf_feature_independent(self):
2702         """ NAT44 tenant VRF independent address pool mode """
2703
2704         nat_ip1 = "10.0.0.10"
2705         nat_ip2 = "10.0.0.11"
2706
2707         self.nat44_add_address(nat_ip1)
2708         self.nat44_add_address(nat_ip2, vrf_id=99)
2709         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2710         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2711         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2712                                                   is_inside=0)
2713
2714         # first VRF
2715         pkts = self.create_stream_in(self.pg0, self.pg2)
2716         self.pg0.add_stream(pkts)
2717         self.pg_enable_capture(self.pg_interfaces)
2718         self.pg_start()
2719         capture = self.pg2.get_capture(len(pkts))
2720         self.verify_capture_out(capture, nat_ip1)
2721
2722         # second VRF
2723         pkts = self.create_stream_in(self.pg1, self.pg2)
2724         self.pg1.add_stream(pkts)
2725         self.pg_enable_capture(self.pg_interfaces)
2726         self.pg_start()
2727         capture = self.pg2.get_capture(len(pkts))
2728         self.verify_capture_out(capture, nat_ip1)
2729
2730     def test_dynamic_ipless_interfaces(self):
2731         """ NAT44 interfaces without configured IP address """
2732
2733         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2734                                       mactobinary(self.pg7.remote_mac),
2735                                       self.pg7.remote_ip4n,
2736                                       is_static=1)
2737         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2738                                       mactobinary(self.pg8.remote_mac),
2739                                       self.pg8.remote_ip4n,
2740                                       is_static=1)
2741
2742         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2743                                    dst_address_length=32,
2744                                    next_hop_address=self.pg7.remote_ip4n,
2745                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2746         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2747                                    dst_address_length=32,
2748                                    next_hop_address=self.pg8.remote_ip4n,
2749                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2750
2751         self.nat44_add_address(self.nat_addr)
2752         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2753         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2754                                                   is_inside=0)
2755
2756         # in2out
2757         pkts = self.create_stream_in(self.pg7, self.pg8)
2758         self.pg7.add_stream(pkts)
2759         self.pg_enable_capture(self.pg_interfaces)
2760         self.pg_start()
2761         capture = self.pg8.get_capture(len(pkts))
2762         self.verify_capture_out(capture)
2763
2764         # out2in
2765         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2766         self.pg8.add_stream(pkts)
2767         self.pg_enable_capture(self.pg_interfaces)
2768         self.pg_start()
2769         capture = self.pg7.get_capture(len(pkts))
2770         self.verify_capture_in(capture, self.pg7)
2771
2772     def test_static_ipless_interfaces(self):
2773         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2774
2775         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2776                                       mactobinary(self.pg7.remote_mac),
2777                                       self.pg7.remote_ip4n,
2778                                       is_static=1)
2779         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2780                                       mactobinary(self.pg8.remote_mac),
2781                                       self.pg8.remote_ip4n,
2782                                       is_static=1)
2783
2784         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2785                                    dst_address_length=32,
2786                                    next_hop_address=self.pg7.remote_ip4n,
2787                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2788         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2789                                    dst_address_length=32,
2790                                    next_hop_address=self.pg8.remote_ip4n,
2791                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2792
2793         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2794         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2796                                                   is_inside=0)
2797
2798         # out2in
2799         pkts = self.create_stream_out(self.pg8)
2800         self.pg8.add_stream(pkts)
2801         self.pg_enable_capture(self.pg_interfaces)
2802         self.pg_start()
2803         capture = self.pg7.get_capture(len(pkts))
2804         self.verify_capture_in(capture, self.pg7)
2805
2806         # in2out
2807         pkts = self.create_stream_in(self.pg7, self.pg8)
2808         self.pg7.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         capture = self.pg8.get_capture(len(pkts))
2812         self.verify_capture_out(capture, self.nat_addr, True)
2813
2814     def test_static_with_port_ipless_interfaces(self):
2815         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2816
2817         self.tcp_port_out = 30606
2818         self.udp_port_out = 30607
2819         self.icmp_id_out = 30608
2820
2821         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2822                                       mactobinary(self.pg7.remote_mac),
2823                                       self.pg7.remote_ip4n,
2824                                       is_static=1)
2825         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2826                                       mactobinary(self.pg8.remote_mac),
2827                                       self.pg8.remote_ip4n,
2828                                       is_static=1)
2829
2830         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2831                                    dst_address_length=32,
2832                                    next_hop_address=self.pg7.remote_ip4n,
2833                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2834         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2835                                    dst_address_length=32,
2836                                    next_hop_address=self.pg8.remote_ip4n,
2837                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2838
2839         self.nat44_add_address(self.nat_addr)
2840         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2841                                       self.tcp_port_in, self.tcp_port_out,
2842                                       proto=IP_PROTOS.tcp)
2843         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2844                                       self.udp_port_in, self.udp_port_out,
2845                                       proto=IP_PROTOS.udp)
2846         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2847                                       self.icmp_id_in, self.icmp_id_out,
2848                                       proto=IP_PROTOS.icmp)
2849         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2850         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2851                                                   is_inside=0)
2852
2853         # out2in
2854         pkts = self.create_stream_out(self.pg8)
2855         self.pg8.add_stream(pkts)
2856         self.pg_enable_capture(self.pg_interfaces)
2857         self.pg_start()
2858         capture = self.pg7.get_capture(len(pkts))
2859         self.verify_capture_in(capture, self.pg7)
2860
2861         # in2out
2862         pkts = self.create_stream_in(self.pg7, self.pg8)
2863         self.pg7.add_stream(pkts)
2864         self.pg_enable_capture(self.pg_interfaces)
2865         self.pg_start()
2866         capture = self.pg8.get_capture(len(pkts))
2867         self.verify_capture_out(capture)
2868
2869     def test_static_unknown_proto(self):
2870         """ 1:1 NAT translate packet with unknown protocol """
2871         nat_ip = "10.0.0.10"
2872         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2873         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2875                                                   is_inside=0)
2876
2877         # in2out
2878         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2879              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2880              GRE() /
2881              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882              TCP(sport=1234, dport=1234))
2883         self.pg0.add_stream(p)
2884         self.pg_enable_capture(self.pg_interfaces)
2885         self.pg_start()
2886         p = self.pg1.get_capture(1)
2887         packet = p[0]
2888         try:
2889             self.assertEqual(packet[IP].src, nat_ip)
2890             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2891             self.assertTrue(packet.haslayer(GRE))
2892             self.check_ip_checksum(packet)
2893         except:
2894             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2895             raise
2896
2897         # out2in
2898         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2899              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2900              GRE() /
2901              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902              TCP(sport=1234, dport=1234))
2903         self.pg1.add_stream(p)
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906         p = self.pg0.get_capture(1)
2907         packet = p[0]
2908         try:
2909             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2910             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2911             self.assertTrue(packet.haslayer(GRE))
2912             self.check_ip_checksum(packet)
2913         except:
2914             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2915             raise
2916
2917     def test_hairpinning_static_unknown_proto(self):
2918         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2919
2920         host = self.pg0.remote_hosts[0]
2921         server = self.pg0.remote_hosts[1]
2922
2923         host_nat_ip = "10.0.0.10"
2924         server_nat_ip = "10.0.0.11"
2925
2926         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2927         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2928         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2929         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2930                                                   is_inside=0)
2931
2932         # host to server
2933         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2934              IP(src=host.ip4, dst=server_nat_ip) /
2935              GRE() /
2936              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937              TCP(sport=1234, dport=1234))
2938         self.pg0.add_stream(p)
2939         self.pg_enable_capture(self.pg_interfaces)
2940         self.pg_start()
2941         p = self.pg0.get_capture(1)
2942         packet = p[0]
2943         try:
2944             self.assertEqual(packet[IP].src, host_nat_ip)
2945             self.assertEqual(packet[IP].dst, server.ip4)
2946             self.assertTrue(packet.haslayer(GRE))
2947             self.check_ip_checksum(packet)
2948         except:
2949             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2950             raise
2951
2952         # server to host
2953         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2954              IP(src=server.ip4, dst=host_nat_ip) /
2955              GRE() /
2956              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957              TCP(sport=1234, dport=1234))
2958         self.pg0.add_stream(p)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         p = self.pg0.get_capture(1)
2962         packet = p[0]
2963         try:
2964             self.assertEqual(packet[IP].src, server_nat_ip)
2965             self.assertEqual(packet[IP].dst, host.ip4)
2966             self.assertTrue(packet.haslayer(GRE))
2967             self.check_ip_checksum(packet)
2968         except:
2969             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970             raise
2971
2972     def test_unknown_proto(self):
2973         """ NAT44 translate packet with unknown protocol """
2974         self.nat44_add_address(self.nat_addr)
2975         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2976         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2977                                                   is_inside=0)
2978
2979         # in2out
2980         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2981              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2982              TCP(sport=self.tcp_port_in, dport=20))
2983         self.pg0.add_stream(p)
2984         self.pg_enable_capture(self.pg_interfaces)
2985         self.pg_start()
2986         p = self.pg1.get_capture(1)
2987
2988         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2989              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2990              GRE() /
2991              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2992              TCP(sport=1234, dport=1234))
2993         self.pg0.add_stream(p)
2994         self.pg_enable_capture(self.pg_interfaces)
2995         self.pg_start()
2996         p = self.pg1.get_capture(1)
2997         packet = p[0]
2998         try:
2999             self.assertEqual(packet[IP].src, self.nat_addr)
3000             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3001             self.assertTrue(packet.haslayer(GRE))
3002             self.check_ip_checksum(packet)
3003         except:
3004             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3005             raise
3006
3007         # out2in
3008         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3009              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3010              GRE() /
3011              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3012              TCP(sport=1234, dport=1234))
3013         self.pg1.add_stream(p)
3014         self.pg_enable_capture(self.pg_interfaces)
3015         self.pg_start()
3016         p = self.pg0.get_capture(1)
3017         packet = p[0]
3018         try:
3019             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3020             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3021             self.assertTrue(packet.haslayer(GRE))
3022             self.check_ip_checksum(packet)
3023         except:
3024             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3025             raise
3026
3027     def test_hairpinning_unknown_proto(self):
3028         """ NAT44 translate packet with unknown protocol - hairpinning """
3029         host = self.pg0.remote_hosts[0]
3030         server = self.pg0.remote_hosts[1]
3031         host_in_port = 1234
3032         host_out_port = 0
3033         server_in_port = 5678
3034         server_out_port = 8765
3035         server_nat_ip = "10.0.0.11"
3036
3037         self.nat44_add_address(self.nat_addr)
3038         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3039         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3040                                                   is_inside=0)
3041
3042         # add static mapping for server
3043         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3044
3045         # host to server
3046         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3047              IP(src=host.ip4, dst=server_nat_ip) /
3048              TCP(sport=host_in_port, dport=server_out_port))
3049         self.pg0.add_stream(p)
3050         self.pg_enable_capture(self.pg_interfaces)
3051         self.pg_start()
3052         capture = self.pg0.get_capture(1)
3053
3054         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3055              IP(src=host.ip4, dst=server_nat_ip) /
3056              GRE() /
3057              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3058              TCP(sport=1234, dport=1234))
3059         self.pg0.add_stream(p)
3060         self.pg_enable_capture(self.pg_interfaces)
3061         self.pg_start()
3062         p = self.pg0.get_capture(1)
3063         packet = p[0]
3064         try:
3065             self.assertEqual(packet[IP].src, self.nat_addr)
3066             self.assertEqual(packet[IP].dst, server.ip4)
3067             self.assertTrue(packet.haslayer(GRE))
3068             self.check_ip_checksum(packet)
3069         except:
3070             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3071             raise
3072
3073         # server to host
3074         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3075              IP(src=server.ip4, dst=self.nat_addr) /
3076              GRE() /
3077              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3078              TCP(sport=1234, dport=1234))
3079         self.pg0.add_stream(p)
3080         self.pg_enable_capture(self.pg_interfaces)
3081         self.pg_start()
3082         p = self.pg0.get_capture(1)
3083         packet = p[0]
3084         try:
3085             self.assertEqual(packet[IP].src, server_nat_ip)
3086             self.assertEqual(packet[IP].dst, host.ip4)
3087             self.assertTrue(packet.haslayer(GRE))
3088             self.check_ip_checksum(packet)
3089         except:
3090             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3091             raise
3092
3093     def test_output_feature(self):
3094         """ NAT44 interface output feature (in2out postrouting) """
3095         self.nat44_add_address(self.nat_addr)
3096         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3097         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3098         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3099                                                          is_inside=0)
3100
3101         # in2out
3102         pkts = self.create_stream_in(self.pg0, self.pg3)
3103         self.pg0.add_stream(pkts)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg3.get_capture(len(pkts))
3107         self.verify_capture_out(capture)
3108
3109         # out2in
3110         pkts = self.create_stream_out(self.pg3)
3111         self.pg3.add_stream(pkts)
3112         self.pg_enable_capture(self.pg_interfaces)
3113         self.pg_start()
3114         capture = self.pg0.get_capture(len(pkts))
3115         self.verify_capture_in(capture, self.pg0)
3116
3117         # from non-NAT interface to NAT inside interface
3118         pkts = self.create_stream_in(self.pg2, self.pg0)
3119         self.pg2.add_stream(pkts)
3120         self.pg_enable_capture(self.pg_interfaces)
3121         self.pg_start()
3122         capture = self.pg0.get_capture(len(pkts))
3123         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3124
3125     def test_output_feature_vrf_aware(self):
3126         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3127         nat_ip_vrf10 = "10.0.0.10"
3128         nat_ip_vrf20 = "10.0.0.20"
3129
3130         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3131                                    dst_address_length=32,
3132                                    next_hop_address=self.pg3.remote_ip4n,
3133                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3134                                    table_id=10)
3135         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3136                                    dst_address_length=32,
3137                                    next_hop_address=self.pg3.remote_ip4n,
3138                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3139                                    table_id=20)
3140
3141         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3142         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3143         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3144         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3145         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3146                                                          is_inside=0)
3147
3148         # in2out VRF 10
3149         pkts = self.create_stream_in(self.pg4, self.pg3)
3150         self.pg4.add_stream(pkts)
3151         self.pg_enable_capture(self.pg_interfaces)
3152         self.pg_start()
3153         capture = self.pg3.get_capture(len(pkts))
3154         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3155
3156         # out2in VRF 10
3157         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3158         self.pg3.add_stream(pkts)
3159         self.pg_enable_capture(self.pg_interfaces)
3160         self.pg_start()
3161         capture = self.pg4.get_capture(len(pkts))
3162         self.verify_capture_in(capture, self.pg4)
3163
3164         # in2out VRF 20
3165         pkts = self.create_stream_in(self.pg6, self.pg3)
3166         self.pg6.add_stream(pkts)
3167         self.pg_enable_capture(self.pg_interfaces)
3168         self.pg_start()
3169         capture = self.pg3.get_capture(len(pkts))
3170         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3171
3172         # out2in VRF 20
3173         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3174         self.pg3.add_stream(pkts)
3175         self.pg_enable_capture(self.pg_interfaces)
3176         self.pg_start()
3177         capture = self.pg6.get_capture(len(pkts))
3178         self.verify_capture_in(capture, self.pg6)
3179
3180     def test_output_feature_hairpinning(self):
3181         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3182         host = self.pg0.remote_hosts[0]
3183         server = self.pg0.remote_hosts[1]
3184         host_in_port = 1234
3185         host_out_port = 0
3186         server_in_port = 5678
3187         server_out_port = 8765
3188
3189         self.nat44_add_address(self.nat_addr)
3190         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3191         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3192                                                          is_inside=0)
3193
3194         # add static mapping for server
3195         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3196                                       server_in_port, server_out_port,
3197                                       proto=IP_PROTOS.tcp)
3198
3199         # send packet from host to server
3200         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3201              IP(src=host.ip4, dst=self.nat_addr) /
3202              TCP(sport=host_in_port, dport=server_out_port))
3203         self.pg0.add_stream(p)
3204         self.pg_enable_capture(self.pg_interfaces)
3205         self.pg_start()
3206         capture = self.pg0.get_capture(1)
3207         p = capture[0]
3208         try:
3209             ip = p[IP]
3210             tcp = p[TCP]
3211             self.assertEqual(ip.src, self.nat_addr)
3212             self.assertEqual(ip.dst, server.ip4)
3213             self.assertNotEqual(tcp.sport, host_in_port)
3214             self.assertEqual(tcp.dport, server_in_port)
3215             self.check_tcp_checksum(p)
3216             host_out_port = tcp.sport
3217         except:
3218             self.logger.error(ppp("Unexpected or invalid packet:", p))
3219             raise
3220
3221         # send reply from server to host
3222         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3223              IP(src=server.ip4, dst=self.nat_addr) /
3224              TCP(sport=server_in_port, dport=host_out_port))
3225         self.pg0.add_stream(p)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         capture = self.pg0.get_capture(1)
3229         p = capture[0]
3230         try:
3231             ip = p[IP]
3232             tcp = p[TCP]
3233             self.assertEqual(ip.src, self.nat_addr)
3234             self.assertEqual(ip.dst, host.ip4)
3235             self.assertEqual(tcp.sport, server_out_port)
3236             self.assertEqual(tcp.dport, host_in_port)
3237             self.check_tcp_checksum(p)
3238         except:
3239             self.logger.error(ppp("Unexpected or invalid packet:", p))
3240             raise
3241
3242     def test_one_armed_nat44(self):
3243         """ One armed NAT44 """
3244         remote_host = self.pg9.remote_hosts[0]
3245         local_host = self.pg9.remote_hosts[1]
3246         external_port = 0
3247
3248         self.nat44_add_address(self.nat_addr)
3249         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3250         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3251                                                   is_inside=0)
3252
3253         # in2out
3254         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3255              IP(src=local_host.ip4, dst=remote_host.ip4) /
3256              TCP(sport=12345, dport=80))
3257         self.pg9.add_stream(p)
3258         self.pg_enable_capture(self.pg_interfaces)
3259         self.pg_start()
3260         capture = self.pg9.get_capture(1)
3261         p = capture[0]
3262         try:
3263             ip = p[IP]
3264             tcp = p[TCP]
3265             self.assertEqual(ip.src, self.nat_addr)
3266             self.assertEqual(ip.dst, remote_host.ip4)
3267             self.assertNotEqual(tcp.sport, 12345)
3268             external_port = tcp.sport
3269             self.assertEqual(tcp.dport, 80)
3270             self.check_tcp_checksum(p)
3271             self.check_ip_checksum(p)
3272         except:
3273             self.logger.error(ppp("Unexpected or invalid packet:", p))
3274             raise
3275
3276         # out2in
3277         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3278              IP(src=remote_host.ip4, dst=self.nat_addr) /
3279              TCP(sport=80, dport=external_port))
3280         self.pg9.add_stream(p)
3281         self.pg_enable_capture(self.pg_interfaces)
3282         self.pg_start()
3283         capture = self.pg9.get_capture(1)
3284         p = capture[0]
3285         try:
3286             ip = p[IP]
3287             tcp = p[TCP]
3288             self.assertEqual(ip.src, remote_host.ip4)
3289             self.assertEqual(ip.dst, local_host.ip4)
3290             self.assertEqual(tcp.sport, 80)
3291             self.assertEqual(tcp.dport, 12345)
3292             self.check_tcp_checksum(p)
3293             self.check_ip_checksum(p)
3294         except:
3295             self.logger.error(ppp("Unexpected or invalid packet:", p))
3296             raise
3297
3298     def test_one_armed_nat44_static(self):
3299         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3300         remote_host = self.pg9.remote_hosts[0]
3301         local_host = self.pg9.remote_hosts[1]
3302         external_port = 80
3303         local_port = 8080
3304         eh_port_in = 0
3305
3306         self.vapi.nat44_forwarding_enable_disable(1)
3307         self.nat44_add_address(self.nat_addr, twice_nat=1)
3308         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3309                                       local_port, external_port,
3310                                       proto=IP_PROTOS.tcp, out2in_only=1,
3311                                       twice_nat=1)
3312         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3313         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3314                                                   is_inside=0)
3315
3316         # from client to service
3317         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3318              IP(src=remote_host.ip4, dst=self.nat_addr) /
3319              TCP(sport=12345, dport=external_port))
3320         self.pg9.add_stream(p)
3321         self.pg_enable_capture(self.pg_interfaces)
3322         self.pg_start()
3323         capture = self.pg9.get_capture(1)
3324         p = capture[0]
3325         server = None
3326         try:
3327             ip = p[IP]
3328             tcp = p[TCP]
3329             self.assertEqual(ip.dst, local_host.ip4)
3330             self.assertEqual(ip.src, self.nat_addr)
3331             self.assertEqual(tcp.dport, local_port)
3332             self.assertNotEqual(tcp.sport, 12345)
3333             eh_port_in = tcp.sport
3334             self.check_tcp_checksum(p)
3335             self.check_ip_checksum(p)
3336         except:
3337             self.logger.error(ppp("Unexpected or invalid packet:", p))
3338             raise
3339
3340         # from service back to client
3341         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3342              IP(src=local_host.ip4, dst=self.nat_addr) /
3343              TCP(sport=local_port, dport=eh_port_in))
3344         self.pg9.add_stream(p)
3345         self.pg_enable_capture(self.pg_interfaces)
3346         self.pg_start()
3347         capture = self.pg9.get_capture(1)
3348         p = capture[0]
3349         try:
3350             ip = p[IP]
3351             tcp = p[TCP]
3352             self.assertEqual(ip.src, self.nat_addr)
3353             self.assertEqual(ip.dst, remote_host.ip4)
3354             self.assertEqual(tcp.sport, external_port)
3355             self.assertEqual(tcp.dport, 12345)
3356             self.check_tcp_checksum(p)
3357             self.check_ip_checksum(p)
3358         except:
3359             self.logger.error(ppp("Unexpected or invalid packet:", p))
3360             raise
3361
3362     def test_del_session(self):
3363         """ Delete NAT44 session """
3364         self.nat44_add_address(self.nat_addr)
3365         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3366         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3367                                                   is_inside=0)
3368
3369         pkts = self.create_stream_in(self.pg0, self.pg1)
3370         self.pg0.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         capture = self.pg1.get_capture(len(pkts))
3374
3375         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3376         nsessions = len(sessions)
3377
3378         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3379                                     sessions[0].inside_port,
3380                                     sessions[0].protocol)
3381         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3382                                     sessions[1].outside_port,
3383                                     sessions[1].protocol,
3384                                     is_in=0)
3385
3386         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3387         self.assertEqual(nsessions - len(sessions), 2)
3388
3389     def test_set_get_reass(self):
3390         """ NAT44 set/get virtual fragmentation reassembly """
3391         reas_cfg1 = self.vapi.nat_get_reass()
3392
3393         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3394                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3395                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3396
3397         reas_cfg2 = self.vapi.nat_get_reass()
3398
3399         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3400         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3401         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3402
3403         self.vapi.nat_set_reass(drop_frag=1)
3404         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3405
3406     def test_frag_in_order(self):
3407         """ NAT44 translate fragments arriving in order """
3408         self.nat44_add_address(self.nat_addr)
3409         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3410         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3411                                                   is_inside=0)
3412
3413         data = "A" * 4 + "B" * 16 + "C" * 3
3414         self.tcp_port_in = random.randint(1025, 65535)
3415
3416         reass = self.vapi.nat_reass_dump()
3417         reass_n_start = len(reass)
3418
3419         # in2out
3420         pkts = self.create_stream_frag(self.pg0,
3421                                        self.pg1.remote_ip4,
3422                                        self.tcp_port_in,
3423                                        20,
3424                                        data)
3425         self.pg0.add_stream(pkts)
3426         self.pg_enable_capture(self.pg_interfaces)
3427         self.pg_start()
3428         frags = self.pg1.get_capture(len(pkts))
3429         p = self.reass_frags_and_verify(frags,
3430                                         self.nat_addr,
3431                                         self.pg1.remote_ip4)
3432         self.assertEqual(p[TCP].dport, 20)
3433         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3434         self.tcp_port_out = p[TCP].sport
3435         self.assertEqual(data, p[Raw].load)
3436
3437         # out2in
3438         pkts = self.create_stream_frag(self.pg1,
3439                                        self.nat_addr,
3440                                        20,
3441                                        self.tcp_port_out,
3442                                        data)
3443         self.pg1.add_stream(pkts)
3444         self.pg_enable_capture(self.pg_interfaces)
3445         self.pg_start()
3446         frags = self.pg0.get_capture(len(pkts))
3447         p = self.reass_frags_and_verify(frags,
3448                                         self.pg1.remote_ip4,
3449                                         self.pg0.remote_ip4)
3450         self.assertEqual(p[TCP].sport, 20)
3451         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3452         self.assertEqual(data, p[Raw].load)
3453
3454         reass = self.vapi.nat_reass_dump()
3455         reass_n_end = len(reass)
3456
3457         self.assertEqual(reass_n_end - reass_n_start, 2)
3458
3459     def test_reass_hairpinning(self):
3460         """ NAT44 fragments hairpinning """
3461         host = self.pg0.remote_hosts[0]
3462         server = self.pg0.remote_hosts[1]
3463         host_in_port = random.randint(1025, 65535)
3464         host_out_port = 0
3465         server_in_port = random.randint(1025, 65535)
3466         server_out_port = random.randint(1025, 65535)
3467         data = "A" * 4 + "B" * 16 + "C" * 3
3468
3469         self.nat44_add_address(self.nat_addr)
3470         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3471         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3472                                                   is_inside=0)
3473         # add static mapping for server
3474         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3475                                       server_in_port, server_out_port,
3476                                       proto=IP_PROTOS.tcp)
3477
3478         # send packet from host to server
3479         pkts = self.create_stream_frag(self.pg0,
3480                                        self.nat_addr,
3481                                        host_in_port,
3482                                        server_out_port,
3483                                        data)
3484         self.pg0.add_stream(pkts)
3485         self.pg_enable_capture(self.pg_interfaces)
3486         self.pg_start()
3487         frags = self.pg0.get_capture(len(pkts))
3488         p = self.reass_frags_and_verify(frags,
3489                                         self.nat_addr,
3490                                         server.ip4)
3491         self.assertNotEqual(p[TCP].sport, host_in_port)
3492         self.assertEqual(p[TCP].dport, server_in_port)
3493         self.assertEqual(data, p[Raw].load)
3494
3495     def test_frag_out_of_order(self):
3496         """ NAT44 translate fragments arriving out of order """
3497         self.nat44_add_address(self.nat_addr)
3498         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3499         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3500                                                   is_inside=0)
3501
3502         data = "A" * 4 + "B" * 16 + "C" * 3
3503         random.randint(1025, 65535)
3504
3505         # in2out
3506         pkts = self.create_stream_frag(self.pg0,
3507                                        self.pg1.remote_ip4,
3508                                        self.tcp_port_in,
3509                                        20,
3510                                        data)
3511         pkts.reverse()
3512         self.pg0.add_stream(pkts)
3513         self.pg_enable_capture(self.pg_interfaces)
3514         self.pg_start()
3515         frags = self.pg1.get_capture(len(pkts))
3516         p = self.reass_frags_and_verify(frags,
3517                                         self.nat_addr,
3518                                         self.pg1.remote_ip4)
3519         self.assertEqual(p[TCP].dport, 20)
3520         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3521         self.tcp_port_out = p[TCP].sport
3522         self.assertEqual(data, p[Raw].load)
3523
3524         # out2in
3525         pkts = self.create_stream_frag(self.pg1,
3526                                        self.nat_addr,
3527                                        20,
3528                                        self.tcp_port_out,
3529                                        data)
3530         pkts.reverse()
3531         self.pg1.add_stream(pkts)
3532         self.pg_enable_capture(self.pg_interfaces)
3533         self.pg_start()
3534         frags = self.pg0.get_capture(len(pkts))
3535         p = self.reass_frags_and_verify(frags,
3536                                         self.pg1.remote_ip4,
3537                                         self.pg0.remote_ip4)
3538         self.assertEqual(p[TCP].sport, 20)
3539         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3540         self.assertEqual(data, p[Raw].load)
3541
3542     def test_port_restricted(self):
3543         """ Port restricted NAT44 (MAP-E CE) """
3544         self.nat44_add_address(self.nat_addr)
3545         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3546         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3547                                                   is_inside=0)
3548         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3549                       "psid-offset 6 psid-len 6")
3550
3551         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3552              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3553              TCP(sport=4567, dport=22))
3554         self.pg0.add_stream(p)
3555         self.pg_enable_capture(self.pg_interfaces)
3556         self.pg_start()
3557         capture = self.pg1.get_capture(1)
3558         p = capture[0]
3559         try:
3560             ip = p[IP]
3561             tcp = p[TCP]
3562             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3563             self.assertEqual(ip.src, self.nat_addr)
3564             self.assertEqual(tcp.dport, 22)
3565             self.assertNotEqual(tcp.sport, 4567)
3566             self.assertEqual((tcp.sport >> 6) & 63, 10)
3567             self.check_tcp_checksum(p)
3568             self.check_ip_checksum(p)
3569         except:
3570             self.logger.error(ppp("Unexpected or invalid packet:", p))
3571             raise
3572
3573     def test_twice_nat(self):
3574         """ Twice NAT44 """
3575         twice_nat_addr = '10.0.1.3'
3576         port_in = 8080
3577         port_out = 80
3578         eh_port_out = 4567
3579         eh_port_in = 0
3580         self.nat44_add_address(self.nat_addr)
3581         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3582         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3583                                       port_in, port_out, proto=IP_PROTOS.tcp,
3584                                       twice_nat=1)
3585         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3586         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3587                                                   is_inside=0)
3588
3589         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3590              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3591              TCP(sport=eh_port_out, dport=port_out))
3592         self.pg1.add_stream(p)
3593         self.pg_enable_capture(self.pg_interfaces)
3594         self.pg_start()
3595         capture = self.pg0.get_capture(1)
3596         p = capture[0]
3597         try:
3598             ip = p[IP]
3599             tcp = p[TCP]
3600             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3601             self.assertEqual(ip.src, twice_nat_addr)
3602             self.assertEqual(tcp.dport, port_in)
3603             self.assertNotEqual(tcp.sport, eh_port_out)
3604             eh_port_in = tcp.sport
3605             self.check_tcp_checksum(p)
3606             self.check_ip_checksum(p)
3607         except:
3608             self.logger.error(ppp("Unexpected or invalid packet:", p))
3609             raise
3610
3611         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3612              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3613              TCP(sport=port_in, dport=eh_port_in))
3614         self.pg0.add_stream(p)
3615         self.pg_enable_capture(self.pg_interfaces)
3616         self.pg_start()
3617         capture = self.pg1.get_capture(1)
3618         p = capture[0]
3619         try:
3620             ip = p[IP]
3621             tcp = p[TCP]
3622             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3623             self.assertEqual(ip.src, self.nat_addr)
3624             self.assertEqual(tcp.dport, eh_port_out)
3625             self.assertEqual(tcp.sport, port_out)
3626             self.check_tcp_checksum(p)
3627             self.check_ip_checksum(p)
3628         except:
3629             self.logger.error(ppp("Unexpected or invalid packet:", p))
3630             raise
3631
3632     def test_twice_nat_lb(self):
3633         """ Twice NAT44 local service load balancing """
3634         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3635         twice_nat_addr = '10.0.1.3'
3636         local_port = 8080
3637         external_port = 80
3638         eh_port_out = 4567
3639         eh_port_in = 0
3640         server1 = self.pg0.remote_hosts[0]
3641         server2 = self.pg0.remote_hosts[1]
3642
3643         locals = [{'addr': server1.ip4n,
3644                    'port': local_port,
3645                    'probability': 50},
3646                   {'addr': server2.ip4n,
3647                    'port': local_port,
3648                    'probability': 50}]
3649
3650         self.nat44_add_address(self.nat_addr)
3651         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3652
3653         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3654                                                   external_port,
3655                                                   IP_PROTOS.tcp,
3656                                                   twice_nat=1,
3657                                                   local_num=len(locals),
3658                                                   locals=locals)
3659         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3660         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3661                                                   is_inside=0)
3662
3663         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3664              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3665              TCP(sport=eh_port_out, dport=external_port))
3666         self.pg1.add_stream(p)
3667         self.pg_enable_capture(self.pg_interfaces)
3668         self.pg_start()
3669         capture = self.pg0.get_capture(1)
3670         p = capture[0]
3671         server = None
3672         try:
3673             ip = p[IP]
3674             tcp = p[TCP]
3675             self.assertEqual(ip.src, twice_nat_addr)
3676             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3677             if ip.dst == server1.ip4:
3678                 server = server1
3679             else:
3680                 server = server2
3681             self.assertNotEqual(tcp.sport, eh_port_out)
3682             eh_port_in = tcp.sport
3683             self.assertEqual(tcp.dport, local_port)
3684             self.check_tcp_checksum(p)
3685             self.check_ip_checksum(p)
3686         except:
3687             self.logger.error(ppp("Unexpected or invalid packet:", p))
3688             raise
3689
3690         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3691              IP(src=server.ip4, dst=twice_nat_addr) /
3692              TCP(sport=local_port, dport=eh_port_in))
3693         self.pg0.add_stream(p)
3694         self.pg_enable_capture(self.pg_interfaces)
3695         self.pg_start()
3696         capture = self.pg1.get_capture(1)
3697         p = capture[0]
3698         try:
3699             ip = p[IP]
3700             tcp = p[TCP]
3701             self.assertEqual(ip.src, self.nat_addr)
3702             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3703             self.assertEqual(tcp.sport, external_port)
3704             self.assertEqual(tcp.dport, eh_port_out)
3705             self.check_tcp_checksum(p)
3706             self.check_ip_checksum(p)
3707         except:
3708             self.logger.error(ppp("Unexpected or invalid packet:", p))
3709             raise
3710
3711     def test_twice_nat_interface_addr(self):
3712         """ Acquire twice NAT44 addresses from interface """
3713         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3714
3715         # no address in NAT pool
3716         adresses = self.vapi.nat44_address_dump()
3717         self.assertEqual(0, len(adresses))
3718
3719         # configure interface address and check NAT address pool
3720         self.pg7.config_ip4()
3721         adresses = self.vapi.nat44_address_dump()
3722         self.assertEqual(1, len(adresses))
3723         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3724         self.assertEqual(adresses[0].twice_nat, 1)
3725
3726         # remove interface address and check NAT address pool
3727         self.pg7.unconfig_ip4()
3728         adresses = self.vapi.nat44_address_dump()
3729         self.assertEqual(0, len(adresses))
3730
3731     def test_ipfix_max_frags(self):
3732         """ IPFIX logging maximum fragments pending reassembly exceeded """
3733         self.nat44_add_address(self.nat_addr)
3734         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3735         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3736                                                   is_inside=0)
3737         self.vapi.nat_set_reass(max_frag=0)
3738         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3739                                      src_address=self.pg3.local_ip4n,
3740                                      path_mtu=512,
3741                                      template_interval=10)
3742         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3743                             src_port=self.ipfix_src_port)
3744
3745         data = "A" * 4 + "B" * 16 + "C" * 3
3746         self.tcp_port_in = random.randint(1025, 65535)
3747         pkts = self.create_stream_frag(self.pg0,
3748                                        self.pg1.remote_ip4,
3749                                        self.tcp_port_in,
3750                                        20,
3751                                        data)
3752         self.pg0.add_stream(pkts[-1])
3753         self.pg_enable_capture(self.pg_interfaces)
3754         self.pg_start()
3755         frags = self.pg1.get_capture(0)
3756         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3757         capture = self.pg3.get_capture(9)
3758         ipfix = IPFIXDecoder()
3759         # first load template
3760         for p in capture:
3761             self.assertTrue(p.haslayer(IPFIX))
3762             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3763             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3764             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3765             self.assertEqual(p[UDP].dport, 4739)
3766             self.assertEqual(p[IPFIX].observationDomainID,
3767                              self.ipfix_domain_id)
3768             if p.haslayer(Template):
3769                 ipfix.add_template(p.getlayer(Template))
3770         # verify events in data set
3771         for p in capture:
3772             if p.haslayer(Data):
3773                 data = ipfix.decode_data_set(p.getlayer(Set))
3774                 self.verify_ipfix_max_fragments_ip4(data, 0,
3775                                                     self.pg0.remote_ip4n)
3776
3777     def tearDown(self):
3778         super(TestNAT44, self).tearDown()
3779         if not self.vpp_dead:
3780             self.logger.info(self.vapi.cli("show nat44 addresses"))
3781             self.logger.info(self.vapi.cli("show nat44 interfaces"))
3782             self.logger.info(self.vapi.cli("show nat44 static mappings"))
3783             self.logger.info(self.vapi.cli("show nat44 interface address"))
3784             self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3785             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3786             self.vapi.cli("nat addr-port-assignment-alg default")
3787             self.clear_nat44()
3788
3789
3790 class TestNAT44Out2InDPO(MethodHolder):
3791     """ NAT44 Test Cases using out2in DPO """
3792
3793     @classmethod
3794     def setUpConstants(cls):
3795         super(TestNAT44Out2InDPO, cls).setUpConstants()
3796         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3797
3798     @classmethod
3799     def setUpClass(cls):
3800         super(TestNAT44Out2InDPO, cls).setUpClass()
3801
3802         try:
3803             cls.tcp_port_in = 6303
3804             cls.tcp_port_out = 6303
3805             cls.udp_port_in = 6304
3806             cls.udp_port_out = 6304
3807             cls.icmp_id_in = 6305
3808             cls.icmp_id_out = 6305
3809             cls.nat_addr = '10.0.0.3'
3810             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3811             cls.dst_ip4 = '192.168.70.1'
3812
3813             cls.create_pg_interfaces(range(2))
3814
3815             cls.pg0.admin_up()
3816             cls.pg0.config_ip4()
3817             cls.pg0.resolve_arp()
3818
3819             cls.pg1.admin_up()
3820             cls.pg1.config_ip6()
3821             cls.pg1.resolve_ndp()
3822
3823             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3824                                       dst_address_length=0,
3825                                       next_hop_address=cls.pg1.remote_ip6n,
3826                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3827
3828         except Exception:
3829             super(TestNAT44Out2InDPO, cls).tearDownClass()
3830             raise
3831
3832     def configure_xlat(self):
3833         self.dst_ip6_pfx = '1:2:3::'
3834         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3835                                               self.dst_ip6_pfx)
3836         self.dst_ip6_pfx_len = 96
3837         self.src_ip6_pfx = '4:5:6::'
3838         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3839                                               self.src_ip6_pfx)
3840         self.src_ip6_pfx_len = 96
3841         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3842                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3843                                  '\x00\x00\x00\x00', 0, is_translation=1,
3844                                  is_rfc6052=1)
3845
3846     def test_464xlat_ce(self):
3847         """ Test 464XLAT CE with NAT44 """
3848
3849         self.configure_xlat()
3850
3851         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3852         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3853
3854         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3855                                        self.dst_ip6_pfx_len)
3856         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3857                                        self.src_ip6_pfx_len)
3858
3859         try:
3860             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3861             self.pg0.add_stream(pkts)
3862             self.pg_enable_capture(self.pg_interfaces)
3863             self.pg_start()
3864             capture = self.pg1.get_capture(len(pkts))
3865             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3866                                         dst_ip=out_src_ip6)
3867
3868             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3869                                               out_dst_ip6)
3870             self.pg1.add_stream(pkts)
3871             self.pg_enable_capture(self.pg_interfaces)
3872             self.pg_start()
3873             capture = self.pg0.get_capture(len(pkts))
3874             self.verify_capture_in(capture, self.pg0)
3875         finally:
3876             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3877                                                       is_add=0)
3878             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3879                                                   self.nat_addr_n, is_add=0)
3880
3881     def test_464xlat_ce_no_nat(self):
3882         """ Test 464XLAT CE without NAT44 """
3883
3884         self.configure_xlat()
3885
3886         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3887                                        self.dst_ip6_pfx_len)
3888         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3889                                        self.src_ip6_pfx_len)
3890
3891         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3892         self.pg0.add_stream(pkts)
3893         self.pg_enable_capture(self.pg_interfaces)
3894         self.pg_start()
3895         capture = self.pg1.get_capture(len(pkts))
3896         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3897                                     nat_ip=out_dst_ip6, same_port=True)
3898
3899         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3900         self.pg1.add_stream(pkts)
3901         self.pg_enable_capture(self.pg_interfaces)
3902         self.pg_start()
3903         capture = self.pg0.get_capture(len(pkts))
3904         self.verify_capture_in(capture, self.pg0)
3905
3906
3907 class TestDeterministicNAT(MethodHolder):
3908     """ Deterministic NAT Test Cases """
3909
3910     @classmethod
3911     def setUpConstants(cls):
3912         super(TestDeterministicNAT, cls).setUpConstants()
3913         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3914
3915     @classmethod
3916     def setUpClass(cls):
3917         super(TestDeterministicNAT, cls).setUpClass()
3918
3919         try:
3920             cls.tcp_port_in = 6303
3921             cls.tcp_external_port = 6303
3922             cls.udp_port_in = 6304
3923             cls.udp_external_port = 6304
3924             cls.icmp_id_in = 6305
3925             cls.nat_addr = '10.0.0.3'
3926
3927             cls.create_pg_interfaces(range(3))
3928             cls.interfaces = list(cls.pg_interfaces)
3929
3930             for i in cls.interfaces:
3931                 i.admin_up()
3932                 i.config_ip4()
3933                 i.resolve_arp()
3934
3935             cls.pg0.generate_remote_hosts(2)
3936             cls.pg0.configure_ipv4_neighbors()
3937
3938         except Exception:
3939             super(TestDeterministicNAT, cls).tearDownClass()
3940             raise
3941
3942     def create_stream_in(self, in_if, out_if, ttl=64):
3943         """
3944         Create packet stream for inside network
3945
3946         :param in_if: Inside interface
3947         :param out_if: Outside interface
3948         :param ttl: TTL of generated packets
3949         """
3950         pkts = []
3951         # TCP
3952         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3953              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3954              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3955         pkts.append(p)
3956
3957         # UDP
3958         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3959              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3960              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3961         pkts.append(p)
3962
3963         # ICMP
3964         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3965              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3966              ICMP(id=self.icmp_id_in, type='echo-request'))
3967         pkts.append(p)
3968
3969         return pkts
3970
3971     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3972         """
3973         Create packet stream for outside network
3974
3975         :param out_if: Outside interface
3976         :param dst_ip: Destination IP address (Default use global NAT address)
3977         :param ttl: TTL of generated packets
3978         """
3979         if dst_ip is None:
3980             dst_ip = self.nat_addr
3981         pkts = []
3982         # TCP
3983         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3984              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3985              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3986         pkts.append(p)
3987
3988         # UDP
3989         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3990              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3991              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3992         pkts.append(p)
3993
3994         # ICMP
3995         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3996              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3997              ICMP(id=self.icmp_external_id, type='echo-reply'))
3998         pkts.append(p)
3999
4000         return pkts
4001
4002     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4003         """
4004         Verify captured packets on outside network
4005
4006         :param capture: Captured packets
4007         :param nat_ip: Translated IP address (Default use global NAT address)
4008         :param same_port: Sorce port number is not translated (Default False)
4009         :param packet_num: Expected number of packets (Default 3)
4010         """
4011         if nat_ip is None:
4012             nat_ip = self.nat_addr
4013         self.assertEqual(packet_num, len(capture))
4014         for packet in capture:
4015             try:
4016                 self.assertEqual(packet[IP].src, nat_ip)
4017                 if packet.haslayer(TCP):
4018                     self.tcp_port_out = packet[TCP].sport
4019                 elif packet.haslayer(UDP):
4020                     self.udp_port_out = packet[UDP].sport
4021                 else:
4022                     self.icmp_external_id = packet[ICMP].id
4023             except:
4024                 self.logger.error(ppp("Unexpected or invalid packet "
4025                                       "(outside network):", packet))
4026                 raise
4027
4028     def initiate_tcp_session(self, in_if, out_if):
4029         """
4030         Initiates TCP session
4031
4032         :param in_if: Inside interface
4033         :param out_if: Outside interface
4034         """
4035         try:
4036             # SYN packet in->out
4037             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4038                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4039                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4040                      flags="S"))
4041             in_if.add_stream(p)
4042             self.pg_enable_capture(self.pg_interfaces)
4043             self.pg_start()
4044             capture = out_if.get_capture(1)
4045             p = capture[0]
4046             self.tcp_port_out = p[TCP].sport
4047
4048             # SYN + ACK packet out->in
4049             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4050                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4051                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4052                      flags="SA"))
4053             out_if.add_stream(p)
4054             self.pg_enable_capture(self.pg_interfaces)
4055             self.pg_start()
4056             in_if.get_capture(1)
4057
4058             # ACK packet in->out
4059             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4060                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4061                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4062                      flags="A"))
4063             in_if.add_stream(p)
4064             self.pg_enable_capture(self.pg_interfaces)
4065             self.pg_start()
4066             out_if.get_capture(1)
4067
4068         except:
4069             self.logger.error("TCP 3 way handshake failed")
4070             raise
4071
4072     def verify_ipfix_max_entries_per_user(self, data):
4073         """
4074         Verify IPFIX maximum entries per user exceeded event
4075
4076         :param data: Decoded IPFIX data records
4077         """
4078         self.assertEqual(1, len(data))
4079         record = data[0]
4080         # natEvent
4081         self.assertEqual(ord(record[230]), 13)
4082         # natQuotaExceededEvent
4083         self.assertEqual('\x03\x00\x00\x00', record[466])
4084         # maxEntriesPerUser
4085         self.assertEqual('\xe8\x03\x00\x00', record[473])
4086         # sourceIPv4Address
4087         self.assertEqual(self.pg0.remote_ip4n, record[8])
4088
4089     def test_deterministic_mode(self):
4090         """ NAT plugin run deterministic mode """
4091         in_addr = '172.16.255.0'
4092         out_addr = '172.17.255.50'
4093         in_addr_t = '172.16.255.20'
4094         in_addr_n = socket.inet_aton(in_addr)
4095         out_addr_n = socket.inet_aton(out_addr)
4096         in_addr_t_n = socket.inet_aton(in_addr_t)
4097         in_plen = 24
4098         out_plen = 32
4099
4100         nat_config = self.vapi.nat_show_config()
4101         self.assertEqual(1, nat_config.deterministic)
4102
4103         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4104
4105         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4106         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4107         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4108         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4109
4110         deterministic_mappings = self.vapi.nat_det_map_dump()
4111         self.assertEqual(len(deterministic_mappings), 1)
4112         dsm = deterministic_mappings[0]
4113         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4114         self.assertEqual(in_plen, dsm.in_plen)
4115         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4116         self.assertEqual(out_plen, dsm.out_plen)
4117
4118         self.clear_nat_det()
4119         deterministic_mappings = self.vapi.nat_det_map_dump()
4120         self.assertEqual(len(deterministic_mappings), 0)
4121
4122     def test_set_timeouts(self):
4123         """ Set deterministic NAT timeouts """
4124         timeouts_before = self.vapi.nat_det_get_timeouts()
4125
4126         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4127                                        timeouts_before.tcp_established + 10,
4128                                        timeouts_before.tcp_transitory + 10,
4129                                        timeouts_before.icmp + 10)
4130
4131         timeouts_after = self.vapi.nat_det_get_timeouts()
4132
4133         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4134         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4135         self.assertNotEqual(timeouts_before.tcp_established,
4136                             timeouts_after.tcp_established)
4137         self.assertNotEqual(timeouts_before.tcp_transitory,
4138                             timeouts_after.tcp_transitory)
4139
4140     def test_det_in(self):
4141         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4142
4143         nat_ip = "10.0.0.10"
4144
4145         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4146                                       32,
4147                                       socket.inet_aton(nat_ip),
4148                                       32)
4149         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4150         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4151                                                   is_inside=0)
4152
4153         # in2out
4154         pkts = self.create_stream_in(self.pg0, self.pg1)
4155         self.pg0.add_stream(pkts)
4156         self.pg_enable_capture(self.pg_interfaces)
4157         self.pg_start()
4158         capture = self.pg1.get_capture(len(pkts))
4159         self.verify_capture_out(capture, nat_ip)
4160
4161         # out2in
4162         pkts = self.create_stream_out(self.pg1, nat_ip)
4163         self.pg1.add_stream(pkts)
4164         self.pg_enable_capture(self.pg_interfaces)
4165         self.pg_start()
4166         capture = self.pg0.get_capture(len(pkts))
4167         self.verify_capture_in(capture, self.pg0)
4168
4169         # session dump test
4170         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4171         self.assertEqual(len(sessions), 3)
4172
4173         # TCP session
4174         s = sessions[0]
4175         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4176         self.assertEqual(s.in_port, self.tcp_port_in)
4177         self.assertEqual(s.out_port, self.tcp_port_out)
4178         self.assertEqual(s.ext_port, self.tcp_external_port)
4179
4180         # UDP session
4181         s = sessions[1]
4182         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4183         self.assertEqual(s.in_port, self.udp_port_in)
4184         self.assertEqual(s.out_port, self.udp_port_out)
4185         self.assertEqual(s.ext_port, self.udp_external_port)
4186
4187         # ICMP session
4188         s = sessions[2]
4189         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4190         self.assertEqual(s.in_port, self.icmp_id_in)
4191         self.assertEqual(s.out_port, self.icmp_external_id)
4192
4193     def test_multiple_users(self):
4194         """ Deterministic NAT multiple users """
4195
4196         nat_ip = "10.0.0.10"
4197         port_in = 80
4198         external_port = 6303
4199
4200         host0 = self.pg0.remote_hosts[0]
4201         host1 = self.pg0.remote_hosts[1]
4202
4203         self.vapi.nat_det_add_del_map(host0.ip4n,
4204                                       24,
4205                                       socket.inet_aton(nat_ip),
4206                                       32)
4207         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4208         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4209                                                   is_inside=0)
4210
4211         # host0 to out
4212         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4213              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4214              TCP(sport=port_in, dport=external_port))
4215         self.pg0.add_stream(p)
4216         self.pg_enable_capture(self.pg_interfaces)
4217         self.pg_start()
4218         capture = self.pg1.get_capture(1)
4219         p = capture[0]
4220         try:
4221             ip = p[IP]
4222             tcp = p[TCP]
4223             self.assertEqual(ip.src, nat_ip)
4224             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4225             self.assertEqual(tcp.dport, external_port)
4226             port_out0 = tcp.sport
4227         except:
4228             self.logger.error(ppp("Unexpected or invalid packet:", p))
4229             raise
4230
4231         # host1 to out
4232         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4233              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4234              TCP(sport=port_in, dport=external_port))
4235         self.pg0.add_stream(p)
4236         self.pg_enable_capture(self.pg_interfaces)
4237         self.pg_start()
4238         capture = self.pg1.get_capture(1)
4239         p = capture[0]
4240         try:
4241             ip = p[IP]
4242             tcp = p[TCP]
4243             self.assertEqual(ip.src, nat_ip)
4244             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4245             self.assertEqual(tcp.dport, external_port)
4246             port_out1 = tcp.sport
4247         except:
4248             self.logger.error(ppp("Unexpected or invalid packet:", p))
4249             raise
4250
4251         dms = self.vapi.nat_det_map_dump()
4252         self.assertEqual(1, len(dms))
4253         self.assertEqual(2, dms[0].ses_num)
4254
4255         # out to host0
4256         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4257              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4258              TCP(sport=external_port, dport=port_out0))
4259         self.pg1.add_stream(p)
4260         self.pg_enable_capture(self.pg_interfaces)
4261         self.pg_start()
4262         capture = self.pg0.get_capture(1)
4263         p = capture[0]
4264         try:
4265             ip = p[IP]
4266             tcp = p[TCP]
4267             self.assertEqual(ip.src, self.pg1.remote_ip4)
4268             self.assertEqual(ip.dst, host0.ip4)
4269             self.assertEqual(tcp.dport, port_in)
4270             self.assertEqual(tcp.sport, external_port)
4271         except:
4272             self.logger.error(ppp("Unexpected or invalid packet:", p))
4273             raise
4274
4275         # out to host1
4276         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4277              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4278              TCP(sport=external_port, dport=port_out1))
4279         self.pg1.add_stream(p)
4280         self.pg_enable_capture(self.pg_interfaces)
4281         self.pg_start()
4282         capture = self.pg0.get_capture(1)
4283         p = capture[0]
4284         try:
4285             ip = p[IP]
4286             tcp = p[TCP]
4287             self.assertEqual(ip.src, self.pg1.remote_ip4)
4288             self.assertEqual(ip.dst, host1.ip4)
4289             self.assertEqual(tcp.dport, port_in)
4290             self.assertEqual(tcp.sport, external_port)
4291         except:
4292             self.logger.error(ppp("Unexpected or invalid packet", p))
4293             raise
4294
4295         # session close api test
4296         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4297                                             port_out1,
4298                                             self.pg1.remote_ip4n,
4299                                             external_port)
4300         dms = self.vapi.nat_det_map_dump()
4301         self.assertEqual(dms[0].ses_num, 1)
4302
4303         self.vapi.nat_det_close_session_in(host0.ip4n,
4304                                            port_in,
4305                                            self.pg1.remote_ip4n,
4306                                            external_port)
4307         dms = self.vapi.nat_det_map_dump()
4308         self.assertEqual(dms[0].ses_num, 0)
4309
4310     def test_tcp_session_close_detection_in(self):
4311         """ Deterministic NAT TCP session close from inside network """
4312         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4313                                       32,
4314                                       socket.inet_aton(self.nat_addr),
4315                                       32)
4316         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4317         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4318                                                   is_inside=0)
4319
4320         self.initiate_tcp_session(self.pg0, self.pg1)
4321
4322         # close the session from inside
4323         try:
4324             # FIN packet in -> out
4325             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4326                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4327                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4328                      flags="F"))
4329             self.pg0.add_stream(p)
4330             self.pg_enable_capture(self.pg_interfaces)
4331             self.pg_start()
4332             self.pg1.get_capture(1)
4333
4334             pkts = []
4335
4336             # ACK packet out -> in
4337             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4338                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4339                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4340                      flags="A"))
4341             pkts.append(p)
4342
4343             # FIN packet out -> in
4344             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4345                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4346                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4347                      flags="F"))
4348             pkts.append(p)
4349
4350             self.pg1.add_stream(pkts)
4351             self.pg_enable_capture(self.pg_interfaces)
4352             self.pg_start()
4353             self.pg0.get_capture(2)
4354
4355             # ACK packet in -> out
4356             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4357                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4358                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4359                      flags="A"))
4360             self.pg0.add_stream(p)
4361             self.pg_enable_capture(self.pg_interfaces)
4362             self.pg_start()
4363             self.pg1.get_capture(1)
4364
4365             # Check if deterministic NAT44 closed the session
4366             dms = self.vapi.nat_det_map_dump()
4367             self.assertEqual(0, dms[0].ses_num)
4368         except:
4369             self.logger.error("TCP session termination failed")
4370             raise
4371
4372     def test_tcp_session_close_detection_out(self):
4373         """ Deterministic NAT TCP session close from outside network """
4374         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4375                                       32,
4376                                       socket.inet_aton(self.nat_addr),
4377                                       32)
4378         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4379         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4380                                                   is_inside=0)
4381
4382         self.initiate_tcp_session(self.pg0, self.pg1)
4383
4384         # close the session from outside
4385         try:
4386             # FIN packet out -> in
4387             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4388                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4389                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4390                      flags="F"))
4391             self.pg1.add_stream(p)
4392             self.pg_enable_capture(self.pg_interfaces)
4393             self.pg_start()
4394             self.pg0.get_capture(1)
4395
4396             pkts = []
4397
4398             # ACK packet in -> out
4399             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4400                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4401                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4402                      flags="A"))
4403             pkts.append(p)
4404
4405             # ACK packet in -> out
4406             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4407                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4408                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4409                      flags="F"))
4410             pkts.append(p)
4411
4412             self.pg0.add_stream(pkts)
4413             self.pg_enable_capture(self.pg_interfaces)
4414             self.pg_start()
4415             self.pg1.get_capture(2)
4416
4417             # ACK packet out -> in
4418             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4419                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4420                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4421                      flags="A"))
4422             self.pg1.add_stream(p)
4423             self.pg_enable_capture(self.pg_interfaces)
4424             self.pg_start()
4425             self.pg0.get_capture(1)
4426
4427             # Check if deterministic NAT44 closed the session
4428             dms = self.vapi.nat_det_map_dump()
4429             self.assertEqual(0, dms[0].ses_num)
4430         except:
4431             self.logger.error("TCP session termination failed")
4432             raise
4433
4434     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4435     def test_session_timeout(self):
4436         """ Deterministic NAT session timeouts """
4437         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4438                                       32,
4439                                       socket.inet_aton(self.nat_addr),
4440                                       32)
4441         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4442         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4443                                                   is_inside=0)
4444
4445         self.initiate_tcp_session(self.pg0, self.pg1)
4446         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4447         pkts = self.create_stream_in(self.pg0, self.pg1)
4448         self.pg0.add_stream(pkts)
4449         self.pg_enable_capture(self.pg_interfaces)
4450         self.pg_start()
4451         capture = self.pg1.get_capture(len(pkts))
4452         sleep(15)
4453
4454         dms = self.vapi.nat_det_map_dump()
4455         self.assertEqual(0, dms[0].ses_num)
4456
4457     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4458     def test_session_limit_per_user(self):
4459         """ Deterministic NAT maximum sessions per user limit """
4460         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4461                                       32,
4462                                       socket.inet_aton(self.nat_addr),
4463                                       32)
4464         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4465         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4466                                                   is_inside=0)
4467         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4468                                      src_address=self.pg2.local_ip4n,
4469                                      path_mtu=512,
4470                                      template_interval=10)
4471         self.vapi.nat_ipfix()
4472
4473         pkts = []
4474         for port in range(1025, 2025):
4475             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4476                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4477                  UDP(sport=port, dport=port))
4478             pkts.append(p)
4479
4480         self.pg0.add_stream(pkts)
4481         self.pg_enable_capture(self.pg_interfaces)
4482         self.pg_start()
4483         capture = self.pg1.get_capture(len(pkts))
4484
4485         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4486              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4487              UDP(sport=3001, dport=3002))
4488         self.pg0.add_stream(p)
4489         self.pg_enable_capture(self.pg_interfaces)
4490         self.pg_start()
4491         capture = self.pg1.assert_nothing_captured()
4492
4493         # verify ICMP error packet
4494         capture = self.pg0.get_capture(1)
4495         p = capture[0]
4496         self.assertTrue(p.haslayer(ICMP))
4497         icmp = p[ICMP]
4498         self.assertEqual(icmp.type, 3)
4499         self.assertEqual(icmp.code, 1)
4500         self.assertTrue(icmp.haslayer(IPerror))
4501         inner_ip = icmp[IPerror]
4502         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4503         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4504
4505         dms = self.vapi.nat_det_map_dump()
4506
4507         self.assertEqual(1000, dms[0].ses_num)
4508
4509         # verify IPFIX logging
4510         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4511         sleep(1)
4512         capture = self.pg2.get_capture(2)
4513         ipfix = IPFIXDecoder()
4514         # first load template
4515         for p in capture:
4516             self.assertTrue(p.haslayer(IPFIX))
4517             if p.haslayer(Template):
4518                 ipfix.add_template(p.getlayer(Template))
4519         # verify events in data set
4520         for p in capture:
4521             if p.haslayer(Data):
4522                 data = ipfix.decode_data_set(p.getlayer(Set))
4523                 self.verify_ipfix_max_entries_per_user(data)
4524
4525     def clear_nat_det(self):
4526         """
4527         Clear deterministic NAT configuration.
4528         """
4529         self.vapi.nat_ipfix(enable=0)
4530         self.vapi.nat_det_set_timeouts()
4531         deterministic_mappings = self.vapi.nat_det_map_dump()
4532         for dsm in deterministic_mappings:
4533             self.vapi.nat_det_add_del_map(dsm.in_addr,
4534                                           dsm.in_plen,
4535                                           dsm.out_addr,
4536                                           dsm.out_plen,
4537                                           is_add=0)
4538
4539         interfaces = self.vapi.nat44_interface_dump()
4540         for intf in interfaces:
4541             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4542                                                       intf.is_inside,
4543                                                       is_add=0)
4544
4545     def tearDown(self):
4546         super(TestDeterministicNAT, self).tearDown()
4547         if not self.vpp_dead:
4548             self.logger.info(self.vapi.cli("show nat44 interfaces"))
4549             self.logger.info(
4550                 self.vapi.cli("show nat44 deterministic mappings"))
4551             self.logger.info(
4552                 self.vapi.cli("show nat44 deterministic timeouts"))
4553             self.logger.info(
4554                 self.vapi.cli("show nat44 deterministic sessions"))
4555             self.clear_nat_det()
4556
4557
4558 class TestNAT64(MethodHolder):
4559     """ NAT64 Test Cases """
4560
4561     @classmethod
4562     def setUpConstants(cls):
4563         super(TestNAT64, cls).setUpConstants()
4564         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4565                                 "nat64 st hash buckets 256", "}"])
4566
4567     @classmethod
4568     def setUpClass(cls):
4569         super(TestNAT64, cls).setUpClass()
4570
4571         try:
4572             cls.tcp_port_in = 6303
4573             cls.tcp_port_out = 6303
4574             cls.udp_port_in = 6304
4575             cls.udp_port_out = 6304
4576             cls.icmp_id_in = 6305
4577             cls.icmp_id_out = 6305
4578             cls.nat_addr = '10.0.0.3'
4579             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4580             cls.vrf1_id = 10
4581             cls.vrf1_nat_addr = '10.0.10.3'
4582             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4583                                                    cls.vrf1_nat_addr)
4584             cls.ipfix_src_port = 4739
4585             cls.ipfix_domain_id = 1
4586
4587             cls.create_pg_interfaces(range(5))
4588             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4589             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4590             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4591
4592             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4593
4594             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4595
4596             cls.pg0.generate_remote_hosts(2)
4597
4598             for i in cls.ip6_interfaces:
4599                 i.admin_up()
4600                 i.config_ip6()
4601                 i.configure_ipv6_neighbors()
4602
4603             for i in cls.ip4_interfaces:
4604                 i.admin_up()
4605                 i.config_ip4()
4606                 i.resolve_arp()
4607
4608             cls.pg3.admin_up()
4609             cls.pg3.config_ip4()
4610             cls.pg3.resolve_arp()
4611             cls.pg3.config_ip6()
4612             cls.pg3.configure_ipv6_neighbors()
4613
4614         except Exception:
4615             super(TestNAT64, cls).tearDownClass()
4616             raise
4617
4618     def test_pool(self):
4619         """ Add/delete address to NAT64 pool """
4620         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4621
4622         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4623
4624         addresses = self.vapi.nat64_pool_addr_dump()
4625         self.assertEqual(len(addresses), 1)
4626         self.assertEqual(addresses[0].address, nat_addr)
4627
4628         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4629
4630         addresses = self.vapi.nat64_pool_addr_dump()
4631         self.assertEqual(len(addresses), 0)
4632
4633     def test_interface(self):
4634         """ Enable/disable NAT64 feature on the interface """
4635         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4636         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4637
4638         interfaces = self.vapi.nat64_interface_dump()
4639         self.assertEqual(len(interfaces), 2)
4640         pg0_found = False
4641         pg1_found = False
4642         for intf in interfaces:
4643             if intf.sw_if_index == self.pg0.sw_if_index:
4644                 self.assertEqual(intf.is_inside, 1)
4645                 pg0_found = True
4646             elif intf.sw_if_index == self.pg1.sw_if_index:
4647                 self.assertEqual(intf.is_inside, 0)
4648                 pg1_found = True
4649         self.assertTrue(pg0_found)
4650         self.assertTrue(pg1_found)
4651
4652         features = self.vapi.cli("show interface features pg0")
4653         self.assertNotEqual(features.find('nat64-in2out'), -1)
4654         features = self.vapi.cli("show interface features pg1")
4655         self.assertNotEqual(features.find('nat64-out2in'), -1)
4656
4657         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4658         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4659
4660         interfaces = self.vapi.nat64_interface_dump()
4661         self.assertEqual(len(interfaces), 0)
4662
4663     def test_static_bib(self):
4664         """ Add/delete static BIB entry """
4665         in_addr = socket.inet_pton(socket.AF_INET6,
4666                                    '2001:db8:85a3::8a2e:370:7334')
4667         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4668         in_port = 1234
4669         out_port = 5678
4670         proto = IP_PROTOS.tcp
4671
4672         self.vapi.nat64_add_del_static_bib(in_addr,
4673                                            out_addr,
4674                                            in_port,
4675                                            out_port,
4676                                            proto)
4677         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4678         static_bib_num = 0
4679         for bibe in bib:
4680             if bibe.is_static:
4681                 static_bib_num += 1
4682                 self.assertEqual(bibe.i_addr, in_addr)
4683                 self.assertEqual(bibe.o_addr, out_addr)
4684                 self.assertEqual(bibe.i_port, in_port)
4685                 self.assertEqual(bibe.o_port, out_port)
4686         self.assertEqual(static_bib_num, 1)
4687
4688         self.vapi.nat64_add_del_static_bib(in_addr,
4689                                            out_addr,
4690                                            in_port,
4691                                            out_port,
4692                                            proto,
4693                                            is_add=0)
4694         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4695         static_bib_num = 0
4696         for bibe in bib:
4697             if bibe.is_static:
4698                 static_bib_num += 1
4699         self.assertEqual(static_bib_num, 0)
4700
4701     def test_set_timeouts(self):
4702         """ Set NAT64 timeouts """
4703         # verify default values
4704         timeouts = self.vapi.nat64_get_timeouts()
4705         self.assertEqual(timeouts.udp, 300)
4706         self.assertEqual(timeouts.icmp, 60)
4707         self.assertEqual(timeouts.tcp_trans, 240)
4708         self.assertEqual(timeouts.tcp_est, 7440)
4709         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4710
4711         # set and verify custom values
4712         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4713                                      tcp_est=7450, tcp_incoming_syn=10)
4714         timeouts = self.vapi.nat64_get_timeouts()
4715         self.assertEqual(timeouts.udp, 200)
4716         self.assertEqual(timeouts.icmp, 30)
4717         self.assertEqual(timeouts.tcp_trans, 250)
4718         self.assertEqual(timeouts.tcp_est, 7450)
4719         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4720
4721     def test_dynamic(self):
4722         """ NAT64 dynamic translation test """
4723         self.tcp_port_in = 6303
4724         self.udp_port_in = 6304
4725         self.icmp_id_in = 6305
4726
4727         ses_num_start = self.nat64_get_ses_num()
4728
4729         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4730                                                 self.nat_addr_n)
4731         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4732         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4733
4734         # in2out
4735         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4736         self.pg0.add_stream(pkts)
4737         self.pg_enable_capture(self.pg_interfaces)
4738         self.pg_start()
4739         capture = self.pg1.get_capture(len(pkts))
4740         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4741                                 dst_ip=self.pg1.remote_ip4)
4742
4743         # out2in
4744         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4745         self.pg1.add_stream(pkts)
4746         self.pg_enable_capture(self.pg_interfaces)
4747         self.pg_start()
4748         capture = self.pg0.get_capture(len(pkts))
4749         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4750         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4751
4752         # in2out
4753         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4754         self.pg0.add_stream(pkts)
4755         self.pg_enable_capture(self.pg_interfaces)
4756         self.pg_start()
4757         capture = self.pg1.get_capture(len(pkts))
4758         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4759                                 dst_ip=self.pg1.remote_ip4)
4760
4761         # out2in
4762         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4763         self.pg1.add_stream(pkts)
4764         self.pg_enable_capture(self.pg_interfaces)
4765         self.pg_start()
4766         capture = self.pg0.get_capture(len(pkts))
4767         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4768
4769         ses_num_end = self.nat64_get_ses_num()
4770
4771         self.assertEqual(ses_num_end - ses_num_start, 3)
4772
4773         # tenant with specific VRF
4774         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4775                                                 self.vrf1_nat_addr_n,
4776                                                 vrf_id=self.vrf1_id)
4777         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4778
4779         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4780         self.pg2.add_stream(pkts)
4781         self.pg_enable_capture(self.pg_interfaces)
4782         self.pg_start()
4783         capture = self.pg1.get_capture(len(pkts))
4784         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4785                                 dst_ip=self.pg1.remote_ip4)
4786
4787         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4788         self.pg1.add_stream(pkts)
4789         self.pg_enable_capture(self.pg_interfaces)
4790         self.pg_start()
4791         capture = self.pg2.get_capture(len(pkts))
4792         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4793
4794     def test_static(self):
4795         """ NAT64 static translation test """
4796         self.tcp_port_in = 60303
4797         self.udp_port_in = 60304
4798         self.icmp_id_in = 60305
4799         self.tcp_port_out = 60303
4800         self.udp_port_out = 60304
4801         self.icmp_id_out = 60305
4802
4803         ses_num_start = self.nat64_get_ses_num()
4804
4805         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4806                                                 self.nat_addr_n)
4807         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4808         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4809
4810         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4811                                            self.nat_addr_n,
4812                                            self.tcp_port_in,
4813                                            self.tcp_port_out,
4814                                            IP_PROTOS.tcp)
4815         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4816                                            self.nat_addr_n,
4817                                            self.udp_port_in,
4818                                            self.udp_port_out,
4819                                            IP_PROTOS.udp)
4820         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4821                                            self.nat_addr_n,
4822                                            self.icmp_id_in,
4823                                            self.icmp_id_out,
4824                                            IP_PROTOS.icmp)
4825
4826         # in2out
4827         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4828         self.pg0.add_stream(pkts)
4829         self.pg_enable_capture(self.pg_interfaces)
4830         self.pg_start()
4831         capture = self.pg1.get_capture(len(pkts))
4832         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4833                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4834
4835         # out2in
4836         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4837         self.pg1.add_stream(pkts)
4838         self.pg_enable_capture(self.pg_interfaces)
4839         self.pg_start()
4840         capture = self.pg0.get_capture(len(pkts))
4841         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4842         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4843
4844         ses_num_end = self.nat64_get_ses_num()
4845
4846         self.assertEqual(ses_num_end - ses_num_start, 3)
4847
4848     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4849     def test_session_timeout(self):
4850         """ NAT64 session timeout """
4851         self.icmp_id_in = 1234
4852         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4853                                                 self.nat_addr_n)
4854         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4855         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4856         self.vapi.nat64_set_timeouts(icmp=5)
4857
4858         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4859         self.pg0.add_stream(pkts)
4860         self.pg_enable_capture(self.pg_interfaces)
4861         self.pg_start()
4862         capture = self.pg1.get_capture(len(pkts))
4863
4864         ses_num_before_timeout = self.nat64_get_ses_num()
4865
4866         sleep(15)
4867
4868         # ICMP session after timeout
4869         ses_num_after_timeout = self.nat64_get_ses_num()
4870         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4871
4872     def test_icmp_error(self):
4873         """ NAT64 ICMP Error message translation """
4874         self.tcp_port_in = 6303
4875         self.udp_port_in = 6304
4876         self.icmp_id_in = 6305
4877
4878         ses_num_start = self.nat64_get_ses_num()
4879
4880         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4881                                                 self.nat_addr_n)
4882         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4883         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4884
4885         # send some packets to create sessions
4886         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4887         self.pg0.add_stream(pkts)
4888         self.pg_enable_capture(self.pg_interfaces)
4889         self.pg_start()
4890         capture_ip4 = self.pg1.get_capture(len(pkts))
4891         self.verify_capture_out(capture_ip4,
4892                                 nat_ip=self.nat_addr,
4893                                 dst_ip=self.pg1.remote_ip4)
4894
4895         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4896         self.pg1.add_stream(pkts)
4897         self.pg_enable_capture(self.pg_interfaces)
4898         self.pg_start()
4899         capture_ip6 = self.pg0.get_capture(len(pkts))
4900         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4901         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4902                                    self.pg0.remote_ip6)
4903
4904         # in2out
4905         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4906                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4907                 ICMPv6DestUnreach(code=1) /
4908                 packet[IPv6] for packet in capture_ip6]
4909         self.pg0.add_stream(pkts)
4910         self.pg_enable_capture(self.pg_interfaces)
4911         self.pg_start()
4912         capture = self.pg1.get_capture(len(pkts))
4913         for packet in capture:
4914             try:
4915                 self.assertEqual(packet[IP].src, self.nat_addr)
4916                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4917                 self.assertEqual(packet[ICMP].type, 3)
4918                 self.assertEqual(packet[ICMP].code, 13)
4919                 inner = packet[IPerror]
4920                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4921                 self.assertEqual(inner.dst, self.nat_addr)
4922                 self.check_icmp_checksum(packet)
4923                 if inner.haslayer(TCPerror):
4924                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4925                 elif inner.haslayer(UDPerror):
4926                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4927                 else:
4928                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4929             except:
4930                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4931                 raise
4932
4933         # out2in
4934         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4935                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4936                 ICMP(type=3, code=13) /
4937                 packet[IP] for packet in capture_ip4]
4938         self.pg1.add_stream(pkts)
4939         self.pg_enable_capture(self.pg_interfaces)
4940         self.pg_start()
4941         capture = self.pg0.get_capture(len(pkts))
4942         for packet in capture:
4943             try:
4944                 self.assertEqual(packet[IPv6].src, ip.src)
4945                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4946                 icmp = packet[ICMPv6DestUnreach]
4947                 self.assertEqual(icmp.code, 1)
4948                 inner = icmp[IPerror6]
4949                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4950                 self.assertEqual(inner.dst, ip.src)
4951                 self.check_icmpv6_checksum(packet)
4952                 if inner.haslayer(TCPerror):
4953                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4954                 elif inner.haslayer(UDPerror):
4955                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4956                 else:
4957                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4958                                      self.icmp_id_in)
4959             except:
4960                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4961                 raise
4962
4963     def test_hairpinning(self):
4964         """ NAT64 hairpinning """
4965
4966         client = self.pg0.remote_hosts[0]
4967         server = self.pg0.remote_hosts[1]
4968         server_tcp_in_port = 22
4969         server_tcp_out_port = 4022
4970         server_udp_in_port = 23
4971         server_udp_out_port = 4023
4972         client_tcp_in_port = 1234
4973         client_udp_in_port = 1235
4974         client_tcp_out_port = 0
4975         client_udp_out_port = 0
4976         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4977         nat_addr_ip6 = ip.src
4978
4979         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4980                                                 self.nat_addr_n)
4981         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4982         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4983
4984         self.vapi.nat64_add_del_static_bib(server.ip6n,
4985                                            self.nat_addr_n,
4986                                            server_tcp_in_port,
4987                                            server_tcp_out_port,
4988                                            IP_PROTOS.tcp)
4989         self.vapi.nat64_add_del_static_bib(server.ip6n,
4990                                            self.nat_addr_n,
4991                                            server_udp_in_port,
4992                                            server_udp_out_port,
4993                                            IP_PROTOS.udp)
4994
4995         # client to server
4996         pkts = []
4997         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4998              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4999              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5000         pkts.append(p)
5001         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5002              IPv6(src=client.ip6, dst=nat_addr_ip6) /
5003              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5004         pkts.append(p)
5005         self.pg0.add_stream(pkts)
5006         self.pg_enable_capture(self.pg_interfaces)
5007         self.pg_start()
5008         capture = self.pg0.get_capture(len(pkts))
5009         for packet in capture:
5010             try:
5011                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5012                 self.assertEqual(packet[IPv6].dst, server.ip6)
5013                 if packet.haslayer(TCP):
5014                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5015                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5016                     self.check_tcp_checksum(packet)
5017                     client_tcp_out_port = packet[TCP].sport
5018                 else:
5019                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5020                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
5021                     self.check_udp_checksum(packet)
5022                     client_udp_out_port = packet[UDP].sport
5023             except:
5024                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5025                 raise
5026
5027         # server to client
5028         pkts = []
5029         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5030              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5031              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5032         pkts.append(p)
5033         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5034              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5035              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5036         pkts.append(p)
5037         self.pg0.add_stream(pkts)
5038         self.pg_enable_capture(self.pg_interfaces)
5039         self.pg_start()
5040         capture = self.pg0.get_capture(len(pkts))
5041         for packet in capture:
5042             try:
5043                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5044                 self.assertEqual(packet[IPv6].dst, client.ip6)
5045                 if packet.haslayer(TCP):
5046                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5047                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5048                     self.check_tcp_checksum(packet)
5049                 else:
5050                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
5051                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
5052                     self.check_udp_checksum(packet)
5053             except:
5054                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5055                 raise
5056
5057         # ICMP error
5058         pkts = []
5059         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5060                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5061                 ICMPv6DestUnreach(code=1) /
5062                 packet[IPv6] for packet in capture]
5063         self.pg0.add_stream(pkts)
5064         self.pg_enable_capture(self.pg_interfaces)
5065         self.pg_start()
5066         capture = self.pg0.get_capture(len(pkts))
5067         for packet in capture:
5068             try:
5069                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5070                 self.assertEqual(packet[IPv6].dst, server.ip6)
5071                 icmp = packet[ICMPv6DestUnreach]
5072                 self.assertEqual(icmp.code, 1)
5073                 inner = icmp[IPerror6]
5074                 self.assertEqual(inner.src, server.ip6)
5075                 self.assertEqual(inner.dst, nat_addr_ip6)
5076                 self.check_icmpv6_checksum(packet)
5077                 if inner.haslayer(TCPerror):
5078                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5079                     self.assertEqual(inner[TCPerror].dport,
5080                                      client_tcp_out_port)
5081                 else:
5082                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5083                     self.assertEqual(inner[UDPerror].dport,
5084                                      client_udp_out_port)
5085             except:
5086                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5087                 raise
5088
5089     def test_prefix(self):
5090         """ NAT64 Network-Specific Prefix """
5091
5092         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5093                                                 self.nat_addr_n)
5094         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5095         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5096         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5097                                                 self.vrf1_nat_addr_n,
5098                                                 vrf_id=self.vrf1_id)
5099         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5100
5101         # Add global prefix
5102         global_pref64 = "2001:db8::"
5103         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5104         global_pref64_len = 32
5105         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5106
5107         prefix = self.vapi.nat64_prefix_dump()
5108         self.assertEqual(len(prefix), 1)
5109         self.assertEqual(prefix[0].prefix, global_pref64_n)
5110         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5111         self.assertEqual(prefix[0].vrf_id, 0)
5112
5113         # Add tenant specific prefix
5114         vrf1_pref64 = "2001:db8:122:300::"
5115         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5116         vrf1_pref64_len = 56
5117         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5118                                        vrf1_pref64_len,
5119                                        vrf_id=self.vrf1_id)
5120         prefix = self.vapi.nat64_prefix_dump()
5121         self.assertEqual(len(prefix), 2)
5122
5123         # Global prefix
5124         pkts = self.create_stream_in_ip6(self.pg0,
5125                                          self.pg1,
5126                                          pref=global_pref64,
5127                                          plen=global_pref64_len)
5128         self.pg0.add_stream(pkts)
5129         self.pg_enable_capture(self.pg_interfaces)
5130         self.pg_start()
5131         capture = self.pg1.get_capture(len(pkts))
5132         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5133                                 dst_ip=self.pg1.remote_ip4)
5134
5135         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5136         self.pg1.add_stream(pkts)
5137         self.pg_enable_capture(self.pg_interfaces)
5138         self.pg_start()
5139         capture = self.pg0.get_capture(len(pkts))
5140         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5141                                   global_pref64,
5142                                   global_pref64_len)
5143         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5144
5145         # Tenant specific prefix
5146         pkts = self.create_stream_in_ip6(self.pg2,
5147                                          self.pg1,
5148                                          pref=vrf1_pref64,
5149                                          plen=vrf1_pref64_len)
5150         self.pg2.add_stream(pkts)
5151         self.pg_enable_capture(self.pg_interfaces)
5152         self.pg_start()
5153         capture = self.pg1.get_capture(len(pkts))
5154         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5155                                 dst_ip=self.pg1.remote_ip4)
5156
5157         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5158         self.pg1.add_stream(pkts)
5159         self.pg_enable_capture(self.pg_interfaces)
5160         self.pg_start()
5161         capture = self.pg2.get_capture(len(pkts))
5162         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5163                                   vrf1_pref64,
5164                                   vrf1_pref64_len)
5165         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5166
5167     def test_unknown_proto(self):
5168         """ NAT64 translate packet with unknown protocol """
5169
5170         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5171                                                 self.nat_addr_n)
5172         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5173         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5174         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5175
5176         # in2out
5177         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5178              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5179              TCP(sport=self.tcp_port_in, dport=20))
5180         self.pg0.add_stream(p)
5181         self.pg_enable_capture(self.pg_interfaces)
5182         self.pg_start()
5183         p = self.pg1.get_capture(1)
5184
5185         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5186              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5187              GRE() /
5188              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5189              TCP(sport=1234, dport=1234))
5190         self.pg0.add_stream(p)
5191         self.pg_enable_capture(self.pg_interfaces)
5192         self.pg_start()
5193         p = self.pg1.get_capture(1)
5194         packet = p[0]
5195         try:
5196             self.assertEqual(packet[IP].src, self.nat_addr)
5197             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5198             self.assertTrue(packet.haslayer(GRE))
5199             self.check_ip_checksum(packet)
5200         except:
5201             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5202             raise
5203
5204         # out2in
5205         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5206              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5207              GRE() /
5208              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5209              TCP(sport=1234, dport=1234))
5210         self.pg1.add_stream(p)
5211         self.pg_enable_capture(self.pg_interfaces)
5212         self.pg_start()
5213         p = self.pg0.get_capture(1)
5214         packet = p[0]
5215         try:
5216             self.assertEqual(packet[IPv6].src, remote_ip6)
5217             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5218             self.assertEqual(packet[IPv6].nh, 47)
5219         except:
5220             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5221             raise
5222
5223     def test_hairpinning_unknown_proto(self):
5224         """ NAT64 translate packet with unknown protocol - hairpinning """
5225
5226         client = self.pg0.remote_hosts[0]
5227         server = self.pg0.remote_hosts[1]
5228         server_tcp_in_port = 22
5229         server_tcp_out_port = 4022
5230         client_tcp_in_port = 1234
5231         client_tcp_out_port = 1235
5232         server_nat_ip = "10.0.0.100"
5233         client_nat_ip = "10.0.0.110"
5234         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5235         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5236         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5237         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5238
5239         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5240                                                 client_nat_ip_n)
5241         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5242         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5243
5244         self.vapi.nat64_add_del_static_bib(server.ip6n,
5245                                            server_nat_ip_n,
5246                                            server_tcp_in_port,
5247                                            server_tcp_out_port,
5248                                            IP_PROTOS.tcp)
5249
5250         self.vapi.nat64_add_del_static_bib(server.ip6n,
5251                                            server_nat_ip_n,
5252                                            0,
5253                                            0,
5254                                            IP_PROTOS.gre)
5255
5256         self.vapi.nat64_add_del_static_bib(client.ip6n,
5257                                            client_nat_ip_n,
5258                                            client_tcp_in_port,
5259                                            client_tcp_out_port,
5260                                            IP_PROTOS.tcp)
5261
5262         # client to server
5263         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5264              IPv6(src=client.ip6, dst=server_nat_ip6) /
5265              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5266         self.pg0.add_stream(p)
5267         self.pg_enable_capture(self.pg_interfaces)
5268         self.pg_start()
5269         p = self.pg0.get_capture(1)
5270
5271         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5272              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5273              GRE() /
5274              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5275              TCP(sport=1234, dport=1234))
5276         self.pg0.add_stream(p)
5277         self.pg_enable_capture(self.pg_interfaces)
5278         self.pg_start()
5279         p = self.pg0.get_capture(1)
5280         packet = p[0]
5281         try:
5282             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5283             self.assertEqual(packet[IPv6].dst, server.ip6)
5284             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5285         except:
5286             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5287             raise
5288
5289         # server to client
5290         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5291              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5292              GRE() /
5293              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5294              TCP(sport=1234, dport=1234))
5295         self.pg0.add_stream(p)
5296         self.pg_enable_capture(self.pg_interfaces)
5297         self.pg_start()
5298         p = self.pg0.get_capture(1)
5299         packet = p[0]
5300         try:
5301             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5302             self.assertEqual(packet[IPv6].dst, client.ip6)
5303             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5304         except:
5305             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5306             raise
5307
5308     def test_one_armed_nat64(self):
5309         """ One armed NAT64 """
5310         external_port = 0
5311         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5312                                            '64:ff9b::',
5313                                            96)
5314
5315         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5316                                                 self.nat_addr_n)
5317         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5318         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5319
5320         # in2out
5321         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5322              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5323              TCP(sport=12345, dport=80))
5324         self.pg3.add_stream(p)
5325         self.pg_enable_capture(self.pg_interfaces)
5326         self.pg_start()
5327         capture = self.pg3.get_capture(1)
5328         p = capture[0]
5329         try:
5330             ip = p[IP]
5331             tcp = p[TCP]
5332             self.assertEqual(ip.src, self.nat_addr)
5333             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5334             self.assertNotEqual(tcp.sport, 12345)
5335             external_port = tcp.sport
5336             self.assertEqual(tcp.dport, 80)
5337             self.check_tcp_checksum(p)
5338             self.check_ip_checksum(p)
5339         except:
5340             self.logger.error(ppp("Unexpected or invalid packet:", p))
5341             raise
5342
5343         # out2in
5344         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5345              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5346              TCP(sport=80, dport=external_port))
5347         self.pg3.add_stream(p)
5348         self.pg_enable_capture(self.pg_interfaces)
5349         self.pg_start()
5350         capture = self.pg3.get_capture(1)
5351         p = capture[0]
5352         try:
5353             ip = p[IPv6]
5354             tcp = p[TCP]
5355             self.assertEqual(ip.src, remote_host_ip6)
5356             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5357             self.assertEqual(tcp.sport, 80)
5358             self.assertEqual(tcp.dport, 12345)
5359             self.check_tcp_checksum(p)
5360         except:
5361             self.logger.error(ppp("Unexpected or invalid packet:", p))
5362             raise
5363
5364     def test_frag_in_order(self):
5365         """ NAT64 translate fragments arriving in order """
5366         self.tcp_port_in = random.randint(1025, 65535)
5367
5368         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5369                                                 self.nat_addr_n)
5370         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5371         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5372
5373         reass = self.vapi.nat_reass_dump()
5374         reass_n_start = len(reass)
5375
5376         # in2out
5377         data = 'a' * 200
5378         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5379                                            self.tcp_port_in, 20, data)
5380         self.pg0.add_stream(pkts)
5381         self.pg_enable_capture(self.pg_interfaces)
5382         self.pg_start()
5383         frags = self.pg1.get_capture(len(pkts))
5384         p = self.reass_frags_and_verify(frags,
5385                                         self.nat_addr,
5386                                         self.pg1.remote_ip4)
5387         self.assertEqual(p[TCP].dport, 20)
5388         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5389         self.tcp_port_out = p[TCP].sport
5390         self.assertEqual(data, p[Raw].load)
5391
5392         # out2in
5393         data = "A" * 4 + "b" * 16 + "C" * 3
5394         pkts = self.create_stream_frag(self.pg1,
5395                                        self.nat_addr,
5396                                        20,
5397                                        self.tcp_port_out,
5398                                        data)
5399         self.pg1.add_stream(pkts)
5400         self.pg_enable_capture(self.pg_interfaces)
5401         self.pg_start()
5402         frags = self.pg0.get_capture(len(pkts))
5403         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5404         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5405         self.assertEqual(p[TCP].sport, 20)
5406         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5407         self.assertEqual(data, p[Raw].load)
5408
5409         reass = self.vapi.nat_reass_dump()
5410         reass_n_end = len(reass)
5411
5412         self.assertEqual(reass_n_end - reass_n_start, 2)
5413
5414     def test_reass_hairpinning(self):
5415         """ NAT64 fragments hairpinning """
5416         data = 'a' * 200
5417         client = self.pg0.remote_hosts[0]
5418         server = self.pg0.remote_hosts[1]
5419         server_in_port = random.randint(1025, 65535)
5420         server_out_port = random.randint(1025, 65535)
5421         client_in_port = random.randint(1025, 65535)
5422         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5423         nat_addr_ip6 = ip.src
5424
5425         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5426                                                 self.nat_addr_n)
5427         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5428         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5429
5430         # add static BIB entry for server
5431         self.vapi.nat64_add_del_static_bib(server.ip6n,
5432                                            self.nat_addr_n,
5433                                            server_in_port,
5434                                            server_out_port,
5435                                            IP_PROTOS.tcp)
5436
5437         # send packet from host to server
5438         pkts = self.create_stream_frag_ip6(self.pg0,
5439                                            self.nat_addr,
5440                                            client_in_port,
5441                                            server_out_port,
5442                                            data)
5443         self.pg0.add_stream(pkts)
5444         self.pg_enable_capture(self.pg_interfaces)
5445         self.pg_start()
5446         frags = self.pg0.get_capture(len(pkts))
5447         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5448         self.assertNotEqual(p[TCP].sport, client_in_port)
5449         self.assertEqual(p[TCP].dport, server_in_port)
5450         self.assertEqual(data, p[Raw].load)
5451
5452     def test_frag_out_of_order(self):
5453         """ NAT64 translate fragments arriving out of order """
5454         self.tcp_port_in = random.randint(1025, 65535)
5455
5456         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5457                                                 self.nat_addr_n)
5458         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5459         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5460
5461         # in2out
5462         data = 'a' * 200
5463         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5464                                            self.tcp_port_in, 20, data)
5465         pkts.reverse()
5466         self.pg0.add_stream(pkts)
5467         self.pg_enable_capture(self.pg_interfaces)
5468         self.pg_start()
5469         frags = self.pg1.get_capture(len(pkts))
5470         p = self.reass_frags_and_verify(frags,
5471                                         self.nat_addr,
5472                                         self.pg1.remote_ip4)
5473         self.assertEqual(p[TCP].dport, 20)
5474         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5475         self.tcp_port_out = p[TCP].sport
5476         self.assertEqual(data, p[Raw].load)
5477
5478         # out2in
5479         data = "A" * 4 + "B" * 16 + "C" * 3
5480         pkts = self.create_stream_frag(self.pg1,
5481                                        self.nat_addr,
5482                                        20,
5483                                        self.tcp_port_out,
5484                                        data)
5485         pkts.reverse()
5486         self.pg1.add_stream(pkts)
5487         self.pg_enable_capture(self.pg_interfaces)
5488         self.pg_start()
5489         frags = self.pg0.get_capture(len(pkts))
5490         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5491         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5492         self.assertEqual(p[TCP].sport, 20)
5493         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5494         self.assertEqual(data, p[Raw].load)
5495
5496     def test_interface_addr(self):
5497         """ Acquire NAT64 pool addresses from interface """
5498         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5499
5500         # no address in NAT64 pool
5501         adresses = self.vapi.nat44_address_dump()
5502         self.assertEqual(0, len(adresses))
5503
5504         # configure interface address and check NAT64 address pool
5505         self.pg4.config_ip4()
5506         addresses = self.vapi.nat64_pool_addr_dump()
5507         self.assertEqual(len(addresses), 1)
5508         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5509
5510         # remove interface address and check NAT64 address pool
5511         self.pg4.unconfig_ip4()
5512         addresses = self.vapi.nat64_pool_addr_dump()
5513         self.assertEqual(0, len(adresses))
5514
5515     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5516     def test_ipfix_max_bibs_sessions(self):
5517         """ IPFIX logging maximum session and BIB entries exceeded """
5518         max_bibs = 1280
5519         max_sessions = 2560
5520         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5521                                            '64:ff9b::',
5522                                            96)
5523
5524         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5525                                                 self.nat_addr_n)
5526         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5527         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5528
5529         pkts = []
5530         src = ""
5531         for i in range(0, max_bibs):
5532             src = "fd01:aa::%x" % (i)
5533             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5534                  IPv6(src=src, dst=remote_host_ip6) /
5535                  TCP(sport=12345, dport=80))
5536             pkts.append(p)
5537             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5538                  IPv6(src=src, dst=remote_host_ip6) /
5539                  TCP(sport=12345, dport=22))
5540             pkts.append(p)
5541         self.pg0.add_stream(pkts)
5542         self.pg_enable_capture(self.pg_interfaces)
5543         self.pg_start()
5544         self.pg1.get_capture(max_sessions)
5545
5546         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5547                                      src_address=self.pg3.local_ip4n,
5548                                      path_mtu=512,
5549                                      template_interval=10)
5550         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5551                             src_port=self.ipfix_src_port)
5552
5553         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5554              IPv6(src=src, dst=remote_host_ip6) /
5555              TCP(sport=12345, dport=25))
5556         self.pg0.add_stream(p)
5557         self.pg_enable_capture(self.pg_interfaces)
5558         self.pg_start()
5559         self.pg1.get_capture(0)
5560         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5561         capture = self.pg3.get_capture(9)
5562         ipfix = IPFIXDecoder()
5563         # first load template
5564         for p in capture:
5565             self.assertTrue(p.haslayer(IPFIX))
5566             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5567             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5568             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5569             self.assertEqual(p[UDP].dport, 4739)
5570             self.assertEqual(p[IPFIX].observationDomainID,
5571                              self.ipfix_domain_id)
5572             if p.haslayer(Template):
5573                 ipfix.add_template(p.getlayer(Template))
5574         # verify events in data set
5575         for p in capture:
5576             if p.haslayer(Data):
5577                 data = ipfix.decode_data_set(p.getlayer(Set))
5578                 self.verify_ipfix_max_sessions(data, max_sessions)
5579
5580         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5581              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5582              TCP(sport=12345, dport=80))
5583         self.pg0.add_stream(p)
5584         self.pg_enable_capture(self.pg_interfaces)
5585         self.pg_start()
5586         self.pg1.get_capture(0)
5587         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5588         capture = self.pg3.get_capture(1)
5589         # verify events in data set
5590         for p in capture:
5591             self.assertTrue(p.haslayer(IPFIX))
5592             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5593             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5594             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5595             self.assertEqual(p[UDP].dport, 4739)
5596             self.assertEqual(p[IPFIX].observationDomainID,
5597                              self.ipfix_domain_id)
5598             if p.haslayer(Data):
5599                 data = ipfix.decode_data_set(p.getlayer(Set))
5600                 self.verify_ipfix_max_bibs(data, max_bibs)
5601
5602     def test_ipfix_max_frags(self):
5603         """ IPFIX logging maximum fragments pending reassembly exceeded """
5604         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5605                                                 self.nat_addr_n)
5606         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5607         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5608         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5609         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5610                                      src_address=self.pg3.local_ip4n,
5611                                      path_mtu=512,
5612                                      template_interval=10)
5613         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5614                             src_port=self.ipfix_src_port)
5615
5616         data = 'a' * 200
5617         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5618                                            self.tcp_port_in, 20, data)
5619         self.pg0.add_stream(pkts[-1])
5620         self.pg_enable_capture(self.pg_interfaces)
5621         self.pg_start()
5622         self.pg1.get_capture(0)
5623         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5624         capture = self.pg3.get_capture(9)
5625         ipfix = IPFIXDecoder()
5626         # first load template
5627         for p in capture:
5628             self.assertTrue(p.haslayer(IPFIX))
5629             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5630             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5631             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5632             self.assertEqual(p[UDP].dport, 4739)
5633             self.assertEqual(p[IPFIX].observationDomainID,
5634                              self.ipfix_domain_id)
5635             if p.haslayer(Template):
5636                 ipfix.add_template(p.getlayer(Template))
5637         # verify events in data set
5638         for p in capture:
5639             if p.haslayer(Data):
5640                 data = ipfix.decode_data_set(p.getlayer(Set))
5641                 self.verify_ipfix_max_fragments_ip6(data, 0,
5642                                                     self.pg0.remote_ip6n)
5643
5644     def test_ipfix_bib_ses(self):
5645         """ IPFIX logging NAT64 BIB/session create and delete events """
5646         self.tcp_port_in = random.randint(1025, 65535)
5647         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5648                                            '64:ff9b::',
5649                                            96)
5650
5651         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5652                                                 self.nat_addr_n)
5653         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5654         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5655         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5656                                      src_address=self.pg3.local_ip4n,
5657                                      path_mtu=512,
5658                                      template_interval=10)
5659         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5660                             src_port=self.ipfix_src_port)
5661
5662         # Create
5663         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5664              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5665              TCP(sport=self.tcp_port_in, dport=25))
5666         self.pg0.add_stream(p)
5667         self.pg_enable_capture(self.pg_interfaces)
5668         self.pg_start()
5669         p = self.pg1.get_capture(1)
5670         self.tcp_port_out = p[0][TCP].sport
5671         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5672         capture = self.pg3.get_capture(10)
5673         ipfix = IPFIXDecoder()
5674         # first load template
5675         for p in capture:
5676             self.assertTrue(p.haslayer(IPFIX))
5677             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5678             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5679             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5680             self.assertEqual(p[UDP].dport, 4739)
5681             self.assertEqual(p[IPFIX].observationDomainID,
5682                              self.ipfix_domain_id)
5683             if p.haslayer(Template):
5684                 ipfix.add_template(p.getlayer(Template))
5685         # verify events in data set
5686         for p in capture:
5687             if p.haslayer(Data):
5688                 data = ipfix.decode_data_set(p.getlayer(Set))
5689                 if ord(data[0][230]) == 10:
5690                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5691                 elif ord(data[0][230]) == 6:
5692                     self.verify_ipfix_nat64_ses(data,
5693                                                 1,
5694                                                 self.pg0.remote_ip6n,
5695                                                 self.pg1.remote_ip4,
5696                                                 25)
5697                 else:
5698                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5699
5700         # Delete
5701         self.pg_enable_capture(self.pg_interfaces)
5702         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5703                                                 self.nat_addr_n,
5704                                                 is_add=0)
5705         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5706         capture = self.pg3.get_capture(2)
5707         # verify events in data set
5708         for p in capture:
5709             self.assertTrue(p.haslayer(IPFIX))
5710             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5711             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5712             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5713             self.assertEqual(p[UDP].dport, 4739)
5714             self.assertEqual(p[IPFIX].observationDomainID,
5715                              self.ipfix_domain_id)
5716             if p.haslayer(Data):
5717                 data = ipfix.decode_data_set(p.getlayer(Set))
5718                 if ord(data[0][230]) == 11:
5719                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5720                 elif ord(data[0][230]) == 7:
5721                     self.verify_ipfix_nat64_ses(data,
5722                                                 0,
5723                                                 self.pg0.remote_ip6n,
5724                                                 self.pg1.remote_ip4,
5725                                                 25)
5726                 else:
5727                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5728
5729     def nat64_get_ses_num(self):
5730         """
5731         Return number of active NAT64 sessions.
5732         """
5733         st = self.vapi.nat64_st_dump()
5734         return len(st)
5735
5736     def clear_nat64(self):
5737         """
5738         Clear NAT64 configuration.
5739         """
5740         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5741                             domain_id=self.ipfix_domain_id)
5742         self.ipfix_src_port = 4739
5743         self.ipfix_domain_id = 1
5744
5745         self.vapi.nat64_set_timeouts()
5746
5747         interfaces = self.vapi.nat64_interface_dump()
5748         for intf in interfaces:
5749             if intf.is_inside > 1:
5750                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5751                                                   0,
5752                                                   is_add=0)
5753             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5754                                               intf.is_inside,
5755                                               is_add=0)
5756
5757         bib = self.vapi.nat64_bib_dump(255)
5758         for bibe in bib:
5759             if bibe.is_static:
5760                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5761                                                    bibe.o_addr,
5762                                                    bibe.i_port,
5763                                                    bibe.o_port,
5764                                                    bibe.proto,
5765                                                    bibe.vrf_id,
5766                                                    is_add=0)
5767
5768         adresses = self.vapi.nat64_pool_addr_dump()
5769         for addr in adresses:
5770             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5771                                                     addr.address,
5772                                                     vrf_id=addr.vrf_id,
5773                                                     is_add=0)
5774
5775         prefixes = self.vapi.nat64_prefix_dump()
5776         for prefix in prefixes:
5777             self.vapi.nat64_add_del_prefix(prefix.prefix,
5778                                            prefix.prefix_len,
5779                                            vrf_id=prefix.vrf_id,
5780                                            is_add=0)
5781
5782     def tearDown(self):
5783         super(TestNAT64, self).tearDown()
5784         if not self.vpp_dead:
5785             self.logger.info(self.vapi.cli("show nat64 pool"))
5786             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5787             self.logger.info(self.vapi.cli("show nat64 prefix"))
5788             self.logger.info(self.vapi.cli("show nat64 bib all"))
5789             self.logger.info(self.vapi.cli("show nat64 session table all"))
5790             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5791             self.clear_nat64()
5792
5793
5794 class TestDSlite(MethodHolder):
5795     """ DS-Lite Test Cases """
5796
5797     @classmethod
5798     def setUpClass(cls):
5799         super(TestDSlite, cls).setUpClass()
5800
5801         try:
5802             cls.nat_addr = '10.0.0.3'
5803             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5804
5805             cls.create_pg_interfaces(range(2))
5806             cls.pg0.admin_up()
5807             cls.pg0.config_ip4()
5808             cls.pg0.resolve_arp()
5809             cls.pg1.admin_up()
5810             cls.pg1.config_ip6()
5811             cls.pg1.generate_remote_hosts(2)
5812             cls.pg1.configure_ipv6_neighbors()
5813
5814         except Exception:
5815             super(TestDSlite, cls).tearDownClass()
5816             raise
5817
5818     def test_dslite(self):
5819         """ Test DS-Lite """
5820         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5821                                                  self.nat_addr_n)
5822         aftr_ip4 = '192.0.0.1'
5823         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5824         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5825         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5826         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5827
5828         # UDP
5829         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5830              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5831              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5832              UDP(sport=20000, dport=10000))
5833         self.pg1.add_stream(p)
5834         self.pg_enable_capture(self.pg_interfaces)
5835         self.pg_start()
5836         capture = self.pg0.get_capture(1)
5837         capture = capture[0]
5838         self.assertFalse(capture.haslayer(IPv6))
5839         self.assertEqual(capture[IP].src, self.nat_addr)
5840         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5841         self.assertNotEqual(capture[UDP].sport, 20000)
5842         self.assertEqual(capture[UDP].dport, 10000)
5843         self.check_ip_checksum(capture)
5844         out_port = capture[UDP].sport
5845
5846         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5847              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5848              UDP(sport=10000, dport=out_port))
5849         self.pg0.add_stream(p)
5850         self.pg_enable_capture(self.pg_interfaces)
5851         self.pg_start()
5852         capture = self.pg1.get_capture(1)
5853         capture = capture[0]
5854         self.assertEqual(capture[IPv6].src, aftr_ip6)
5855         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5856         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5857         self.assertEqual(capture[IP].dst, '192.168.1.1')
5858         self.assertEqual(capture[UDP].sport, 10000)
5859         self.assertEqual(capture[UDP].dport, 20000)
5860         self.check_ip_checksum(capture)
5861
5862         # TCP
5863         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5864              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5865              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5866              TCP(sport=20001, dport=10001))
5867         self.pg1.add_stream(p)
5868         self.pg_enable_capture(self.pg_interfaces)
5869         self.pg_start()
5870         capture = self.pg0.get_capture(1)
5871         capture = capture[0]
5872         self.assertFalse(capture.haslayer(IPv6))
5873         self.assertEqual(capture[IP].src, self.nat_addr)
5874         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5875         self.assertNotEqual(capture[TCP].sport, 20001)
5876         self.assertEqual(capture[TCP].dport, 10001)
5877         self.check_ip_checksum(capture)
5878         self.check_tcp_checksum(capture)
5879         out_port = capture[TCP].sport
5880
5881         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5882              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5883              TCP(sport=10001, dport=out_port))
5884         self.pg0.add_stream(p)
5885         self.pg_enable_capture(self.pg_interfaces)
5886         self.pg_start()
5887         capture = self.pg1.get_capture(1)
5888         capture = capture[0]
5889         self.assertEqual(capture[IPv6].src, aftr_ip6)
5890         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5891         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5892         self.assertEqual(capture[IP].dst, '192.168.1.1')
5893         self.assertEqual(capture[TCP].sport, 10001)
5894         self.assertEqual(capture[TCP].dport, 20001)
5895         self.check_ip_checksum(capture)
5896         self.check_tcp_checksum(capture)
5897
5898         # ICMP
5899         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5900              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5901              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5902              ICMP(id=4000, type='echo-request'))
5903         self.pg1.add_stream(p)
5904         self.pg_enable_capture(self.pg_interfaces)
5905         self.pg_start()
5906         capture = self.pg0.get_capture(1)
5907         capture = capture[0]
5908         self.assertFalse(capture.haslayer(IPv6))
5909         self.assertEqual(capture[IP].src, self.nat_addr)
5910         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5911         self.assertNotEqual(capture[ICMP].id, 4000)
5912         self.check_ip_checksum(capture)
5913         self.check_icmp_checksum(capture)
5914         out_id = capture[ICMP].id
5915
5916         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5917              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5918              ICMP(id=out_id, type='echo-reply'))
5919         self.pg0.add_stream(p)
5920         self.pg_enable_capture(self.pg_interfaces)
5921         self.pg_start()
5922         capture = self.pg1.get_capture(1)
5923         capture = capture[0]
5924         self.assertEqual(capture[IPv6].src, aftr_ip6)
5925         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5926         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5927         self.assertEqual(capture[IP].dst, '192.168.1.1')
5928         self.assertEqual(capture[ICMP].id, 4000)
5929         self.check_ip_checksum(capture)
5930         self.check_icmp_checksum(capture)
5931
5932         # ping DS-Lite AFTR tunnel endpoint address
5933         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5934              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5935              ICMPv6EchoRequest())
5936         self.pg1.add_stream(p)
5937         self.pg_enable_capture(self.pg_interfaces)
5938         self.pg_start()
5939         capture = self.pg1.get_capture(1)
5940         self.assertEqual(1, len(capture))
5941         capture = capture[0]
5942         self.assertEqual(capture[IPv6].src, aftr_ip6)
5943         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5944         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5945
5946     def tearDown(self):
5947         super(TestDSlite, self).tearDown()
5948         if not self.vpp_dead:
5949             self.logger.info(self.vapi.cli("show dslite pool"))
5950             self.logger.info(
5951                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5952             self.logger.info(self.vapi.cli("show dslite sessions"))
5953
5954
5955 class TestDSliteCE(MethodHolder):
5956     """ DS-Lite CE Test Cases """
5957
5958     @classmethod
5959     def setUpConstants(cls):
5960         super(TestDSliteCE, cls).setUpConstants()
5961         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5962
5963     @classmethod
5964     def setUpClass(cls):
5965         super(TestDSliteCE, cls).setUpClass()
5966
5967         try:
5968             cls.create_pg_interfaces(range(2))
5969             cls.pg0.admin_up()
5970             cls.pg0.config_ip4()
5971             cls.pg0.resolve_arp()
5972             cls.pg1.admin_up()
5973             cls.pg1.config_ip6()
5974             cls.pg1.generate_remote_hosts(1)
5975             cls.pg1.configure_ipv6_neighbors()
5976
5977         except Exception:
5978             super(TestDSliteCE, cls).tearDownClass()
5979             raise
5980
5981     def test_dslite_ce(self):
5982         """ Test DS-Lite CE """
5983
5984         b4_ip4 = '192.0.0.2'
5985         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5986         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5987         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5988         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5989
5990         aftr_ip4 = '192.0.0.1'
5991         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5992         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5993         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5994         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5995
5996         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5997                                    dst_address_length=128,
5998                                    next_hop_address=self.pg1.remote_ip6n,
5999                                    next_hop_sw_if_index=self.pg1.sw_if_index,
6000                                    is_ipv6=1)
6001
6002         # UDP encapsulation
6003         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6004              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6005              UDP(sport=10000, dport=20000))
6006         self.pg0.add_stream(p)
6007         self.pg_enable_capture(self.pg_interfaces)
6008         self.pg_start()
6009         capture = self.pg1.get_capture(1)
6010         capture = capture[0]
6011         self.assertEqual(capture[IPv6].src, b4_ip6)
6012         self.assertEqual(capture[IPv6].dst, aftr_ip6)
6013         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6014         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6015         self.assertEqual(capture[UDP].sport, 10000)
6016         self.assertEqual(capture[UDP].dport, 20000)
6017         self.check_ip_checksum(capture)
6018
6019         # UDP decapsulation
6020         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6021              IPv6(dst=b4_ip6, src=aftr_ip6) /
6022              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6023              UDP(sport=20000, dport=10000))
6024         self.pg1.add_stream(p)
6025         self.pg_enable_capture(self.pg_interfaces)
6026         self.pg_start()
6027         capture = self.pg0.get_capture(1)
6028         capture = capture[0]
6029         self.assertFalse(capture.haslayer(IPv6))
6030         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6031         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6032         self.assertEqual(capture[UDP].sport, 20000)
6033         self.assertEqual(capture[UDP].dport, 10000)
6034         self.check_ip_checksum(capture)
6035
6036         # ping DS-Lite B4 tunnel endpoint address
6037         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6038              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6039              ICMPv6EchoRequest())
6040         self.pg1.add_stream(p)
6041         self.pg_enable_capture(self.pg_interfaces)
6042         self.pg_start()
6043         capture = self.pg1.get_capture(1)
6044         self.assertEqual(1, len(capture))
6045         capture = capture[0]
6046         self.assertEqual(capture[IPv6].src, b4_ip6)
6047         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6048         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6049
6050     def tearDown(self):
6051         super(TestDSliteCE, self).tearDown()
6052         if not self.vpp_dead:
6053             self.logger.info(
6054                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6055             self.logger.info(
6056                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6057
6058 if __name__ == '__main__':
6059     unittest.main(testRunner=VppTestRunner)