NAT44: asymmetrical static mapping rule (VPP-1135)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 is_add=0)
1034
1035         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036         for lb_sm in lb_static_mappings:
1037             self.vapi.nat44_add_del_lb_static_mapping(
1038                 lb_sm.external_addr,
1039                 lb_sm.external_port,
1040                 lb_sm.protocol,
1041                 vrf_id=lb_sm.vrf_id,
1042                 twice_nat=lb_sm.twice_nat,
1043                 out2in_only=lb_sm.out2in_only,
1044                 is_add=0,
1045                 local_num=0,
1046                 locals=[])
1047
1048         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049         for id_m in identity_mappings:
1050             self.vapi.nat44_add_del_identity_mapping(
1051                 addr_only=id_m.addr_only,
1052                 ip=id_m.ip_address,
1053                 port=id_m.port,
1054                 sw_if_index=id_m.sw_if_index,
1055                 vrf_id=id_m.vrf_id,
1056                 protocol=id_m.protocol,
1057                 is_add=0)
1058
1059         adresses = self.vapi.nat44_address_dump()
1060         for addr in adresses:
1061             self.vapi.nat44_add_del_address_range(addr.ip_address,
1062                                                   addr.ip_address,
1063                                                   twice_nat=addr.twice_nat,
1064                                                   is_add=0)
1065
1066         self.vapi.nat_set_reass()
1067         self.vapi.nat_set_reass(is_ip6=1)
1068
1069     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070                                  local_port=0, external_port=0, vrf_id=0,
1071                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1072                                  proto=0, twice_nat=0, out2in_only=0):
1073         """
1074         Add/delete NAT44 static mapping
1075
1076         :param local_ip: Local IP address
1077         :param external_ip: External IP address
1078         :param local_port: Local port number (Optional)
1079         :param external_port: External port number (Optional)
1080         :param vrf_id: VRF ID (Default 0)
1081         :param is_add: 1 if add, 0 if delete (Default add)
1082         :param external_sw_if_index: External interface instead of IP address
1083         :param proto: IP protocol (Mandatory if port specified)
1084         :param twice_nat: 1 if translate external host address and port
1085         :param out2in_only: if 1 rule is matching only out2in direction
1086         """
1087         addr_only = 1
1088         if local_port and external_port:
1089             addr_only = 0
1090         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092         self.vapi.nat44_add_del_static_mapping(
1093             l_ip,
1094             e_ip,
1095             external_sw_if_index,
1096             local_port,
1097             external_port,
1098             addr_only,
1099             vrf_id,
1100             proto,
1101             twice_nat,
1102             out2in_only,
1103             is_add)
1104
1105     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1106         """
1107         Add/delete NAT44 address
1108
1109         :param ip: IP address
1110         :param is_add: 1 if add, 0 if delete (Default add)
1111         :param twice_nat: twice NAT address for extenal hosts
1112         """
1113         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1115                                               vrf_id=vrf_id,
1116                                               twice_nat=twice_nat)
1117
1118     def test_dynamic(self):
1119         """ NAT44 dynamic translation test """
1120
1121         self.nat44_add_address(self.nat_addr)
1122         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124                                                   is_inside=0)
1125
1126         # in2out
1127         pkts = self.create_stream_in(self.pg0, self.pg1)
1128         self.pg0.add_stream(pkts)
1129         self.pg_enable_capture(self.pg_interfaces)
1130         self.pg_start()
1131         capture = self.pg1.get_capture(len(pkts))
1132         self.verify_capture_out(capture)
1133
1134         # out2in
1135         pkts = self.create_stream_out(self.pg1)
1136         self.pg1.add_stream(pkts)
1137         self.pg_enable_capture(self.pg_interfaces)
1138         self.pg_start()
1139         capture = self.pg0.get_capture(len(pkts))
1140         self.verify_capture_in(capture, self.pg0)
1141
1142     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143         """ NAT44 handling of client packets with TTL=1 """
1144
1145         self.nat44_add_address(self.nat_addr)
1146         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1148                                                   is_inside=0)
1149
1150         # Client side - generate traffic
1151         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152         self.pg0.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155
1156         # Client side - verify ICMP type 11 packets
1157         capture = self.pg0.get_capture(len(pkts))
1158         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1159
1160     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161         """ NAT44 handling of server packets with TTL=1 """
1162
1163         self.nat44_add_address(self.nat_addr)
1164         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1166                                                   is_inside=0)
1167
1168         # Client side - create sessions
1169         pkts = self.create_stream_in(self.pg0, self.pg1)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173
1174         # Server side - generate traffic
1175         capture = self.pg1.get_capture(len(pkts))
1176         self.verify_capture_out(capture)
1177         pkts = self.create_stream_out(self.pg1, ttl=1)
1178         self.pg1.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181
1182         # Server side - verify ICMP type 11 packets
1183         capture = self.pg1.get_capture(len(pkts))
1184         self.verify_capture_out_with_icmp_errors(capture,
1185                                                  src_ip=self.pg1.local_ip4)
1186
1187     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188         """ NAT44 handling of error responses to client packets with TTL=2 """
1189
1190         self.nat44_add_address(self.nat_addr)
1191         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1193                                                   is_inside=0)
1194
1195         # Client side - generate traffic
1196         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197         self.pg0.add_stream(pkts)
1198         self.pg_enable_capture(self.pg_interfaces)
1199         self.pg_start()
1200
1201         # Server side - simulate ICMP type 11 response
1202         capture = self.pg1.get_capture(len(pkts))
1203         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205                 ICMP(type=11) / packet[IP] for packet in capture]
1206         self.pg1.add_stream(pkts)
1207         self.pg_enable_capture(self.pg_interfaces)
1208         self.pg_start()
1209
1210         # Client side - verify ICMP type 11 packets
1211         capture = self.pg0.get_capture(len(pkts))
1212         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1213
1214     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215         """ NAT44 handling of error responses to server packets with TTL=2 """
1216
1217         self.nat44_add_address(self.nat_addr)
1218         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1220                                                   is_inside=0)
1221
1222         # Client side - create sessions
1223         pkts = self.create_stream_in(self.pg0, self.pg1)
1224         self.pg0.add_stream(pkts)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227
1228         # Server side - generate traffic
1229         capture = self.pg1.get_capture(len(pkts))
1230         self.verify_capture_out(capture)
1231         pkts = self.create_stream_out(self.pg1, ttl=2)
1232         self.pg1.add_stream(pkts)
1233         self.pg_enable_capture(self.pg_interfaces)
1234         self.pg_start()
1235
1236         # Client side - simulate ICMP type 11 response
1237         capture = self.pg0.get_capture(len(pkts))
1238         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240                 ICMP(type=11) / packet[IP] for packet in capture]
1241         self.pg0.add_stream(pkts)
1242         self.pg_enable_capture(self.pg_interfaces)
1243         self.pg_start()
1244
1245         # Server side - verify ICMP type 11 packets
1246         capture = self.pg1.get_capture(len(pkts))
1247         self.verify_capture_out_with_icmp_errors(capture)
1248
1249     def test_ping_out_interface_from_outside(self):
1250         """ Ping NAT44 out interface from outside network """
1251
1252         self.nat44_add_address(self.nat_addr)
1253         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1255                                                   is_inside=0)
1256
1257         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259              ICMP(id=self.icmp_id_out, type='echo-request'))
1260         pkts = [p]
1261         self.pg1.add_stream(pkts)
1262         self.pg_enable_capture(self.pg_interfaces)
1263         self.pg_start()
1264         capture = self.pg1.get_capture(len(pkts))
1265         self.assertEqual(1, len(capture))
1266         packet = capture[0]
1267         try:
1268             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1272         except:
1273             self.logger.error(ppp("Unexpected or invalid packet "
1274                                   "(outside network):", packet))
1275             raise
1276
1277     def test_ping_internal_host_from_outside(self):
1278         """ Ping internal host from outside network """
1279
1280         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1283                                                   is_inside=0)
1284
1285         # out2in
1286         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288                ICMP(id=self.icmp_id_out, type='echo-request'))
1289         self.pg1.add_stream(pkt)
1290         self.pg_enable_capture(self.pg_interfaces)
1291         self.pg_start()
1292         capture = self.pg0.get_capture(1)
1293         self.verify_capture_in(capture, self.pg0, packet_num=1)
1294         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1295
1296         # in2out
1297         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299                ICMP(id=self.icmp_id_in, type='echo-reply'))
1300         self.pg0.add_stream(pkt)
1301         self.pg_enable_capture(self.pg_interfaces)
1302         self.pg_start()
1303         capture = self.pg1.get_capture(1)
1304         self.verify_capture_out(capture, same_port=True, packet_num=1)
1305         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1306
1307     def test_forwarding(self):
1308         """ NAT44 forwarding test """
1309
1310         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1312                                                   is_inside=0)
1313         self.vapi.nat44_forwarding_enable_disable(1)
1314
1315         real_ip = self.pg0.remote_ip4n
1316         alias_ip = self.nat_addr_n
1317         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318                                                external_ip=alias_ip)
1319
1320         try:
1321             # in2out - static mapping match
1322
1323             pkts = self.create_stream_out(self.pg1)
1324             self.pg1.add_stream(pkts)
1325             self.pg_enable_capture(self.pg_interfaces)
1326             self.pg_start()
1327             capture = self.pg0.get_capture(len(pkts))
1328             self.verify_capture_in(capture, self.pg0)
1329
1330             pkts = self.create_stream_in(self.pg0, self.pg1)
1331             self.pg0.add_stream(pkts)
1332             self.pg_enable_capture(self.pg_interfaces)
1333             self.pg_start()
1334             capture = self.pg1.get_capture(len(pkts))
1335             self.verify_capture_out(capture, same_port=True)
1336
1337             # in2out - no static mapping match
1338
1339             host0 = self.pg0.remote_hosts[0]
1340             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1341             try:
1342                 pkts = self.create_stream_out(self.pg1,
1343                                               dst_ip=self.pg0.remote_ip4,
1344                                               use_inside_ports=True)
1345                 self.pg1.add_stream(pkts)
1346                 self.pg_enable_capture(self.pg_interfaces)
1347                 self.pg_start()
1348                 capture = self.pg0.get_capture(len(pkts))
1349                 self.verify_capture_in(capture, self.pg0)
1350
1351                 pkts = self.create_stream_in(self.pg0, self.pg1)
1352                 self.pg0.add_stream(pkts)
1353                 self.pg_enable_capture(self.pg_interfaces)
1354                 self.pg_start()
1355                 capture = self.pg1.get_capture(len(pkts))
1356                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1357                                         same_port=True)
1358             finally:
1359                 self.pg0.remote_hosts[0] = host0
1360
1361         finally:
1362             self.vapi.nat44_forwarding_enable_disable(0)
1363             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364                                                    external_ip=alias_ip,
1365                                                    is_add=0)
1366
1367     def test_static_in(self):
1368         """ 1:1 NAT initialized from inside network """
1369
1370         nat_ip = "10.0.0.10"
1371         self.tcp_port_out = 6303
1372         self.udp_port_out = 6304
1373         self.icmp_id_out = 6305
1374
1375         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1378                                                   is_inside=0)
1379
1380         # in2out
1381         pkts = self.create_stream_in(self.pg0, self.pg1)
1382         self.pg0.add_stream(pkts)
1383         self.pg_enable_capture(self.pg_interfaces)
1384         self.pg_start()
1385         capture = self.pg1.get_capture(len(pkts))
1386         self.verify_capture_out(capture, nat_ip, True)
1387
1388         # out2in
1389         pkts = self.create_stream_out(self.pg1, nat_ip)
1390         self.pg1.add_stream(pkts)
1391         self.pg_enable_capture(self.pg_interfaces)
1392         self.pg_start()
1393         capture = self.pg0.get_capture(len(pkts))
1394         self.verify_capture_in(capture, self.pg0)
1395
1396     def test_static_out(self):
1397         """ 1:1 NAT initialized from outside network """
1398
1399         nat_ip = "10.0.0.20"
1400         self.tcp_port_out = 6303
1401         self.udp_port_out = 6304
1402         self.icmp_id_out = 6305
1403
1404         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1407                                                   is_inside=0)
1408
1409         # out2in
1410         pkts = self.create_stream_out(self.pg1, nat_ip)
1411         self.pg1.add_stream(pkts)
1412         self.pg_enable_capture(self.pg_interfaces)
1413         self.pg_start()
1414         capture = self.pg0.get_capture(len(pkts))
1415         self.verify_capture_in(capture, self.pg0)
1416
1417         # in2out
1418         pkts = self.create_stream_in(self.pg0, self.pg1)
1419         self.pg0.add_stream(pkts)
1420         self.pg_enable_capture(self.pg_interfaces)
1421         self.pg_start()
1422         capture = self.pg1.get_capture(len(pkts))
1423         self.verify_capture_out(capture, nat_ip, True)
1424
1425     def test_static_with_port_in(self):
1426         """ 1:1 NAPT initialized from inside network """
1427
1428         self.tcp_port_out = 3606
1429         self.udp_port_out = 3607
1430         self.icmp_id_out = 3608
1431
1432         self.nat44_add_address(self.nat_addr)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.tcp_port_in, self.tcp_port_out,
1435                                       proto=IP_PROTOS.tcp)
1436         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437                                       self.udp_port_in, self.udp_port_out,
1438                                       proto=IP_PROTOS.udp)
1439         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440                                       self.icmp_id_in, self.icmp_id_out,
1441                                       proto=IP_PROTOS.icmp)
1442         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1444                                                   is_inside=0)
1445
1446         # in2out
1447         pkts = self.create_stream_in(self.pg0, self.pg1)
1448         self.pg0.add_stream(pkts)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg1.get_capture(len(pkts))
1452         self.verify_capture_out(capture)
1453
1454         # out2in
1455         pkts = self.create_stream_out(self.pg1)
1456         self.pg1.add_stream(pkts)
1457         self.pg_enable_capture(self.pg_interfaces)
1458         self.pg_start()
1459         capture = self.pg0.get_capture(len(pkts))
1460         self.verify_capture_in(capture, self.pg0)
1461
1462     def test_static_with_port_out(self):
1463         """ 1:1 NAPT initialized from outside network """
1464
1465         self.tcp_port_out = 30606
1466         self.udp_port_out = 30607
1467         self.icmp_id_out = 30608
1468
1469         self.nat44_add_address(self.nat_addr)
1470         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471                                       self.tcp_port_in, self.tcp_port_out,
1472                                       proto=IP_PROTOS.tcp)
1473         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474                                       self.udp_port_in, self.udp_port_out,
1475                                       proto=IP_PROTOS.udp)
1476         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477                                       self.icmp_id_in, self.icmp_id_out,
1478                                       proto=IP_PROTOS.icmp)
1479         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481                                                   is_inside=0)
1482
1483         # out2in
1484         pkts = self.create_stream_out(self.pg1)
1485         self.pg1.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg0.get_capture(len(pkts))
1489         self.verify_capture_in(capture, self.pg0)
1490
1491         # in2out
1492         pkts = self.create_stream_in(self.pg0, self.pg1)
1493         self.pg0.add_stream(pkts)
1494         self.pg_enable_capture(self.pg_interfaces)
1495         self.pg_start()
1496         capture = self.pg1.get_capture(len(pkts))
1497         self.verify_capture_out(capture)
1498
1499     def test_static_with_port_out2(self):
1500         """ 1:1 NAPT symmetrical rule """
1501
1502         external_port = 80
1503         local_port = 8080
1504
1505         self.vapi.nat44_forwarding_enable_disable(1)
1506         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507                                       local_port, external_port,
1508                                       proto=IP_PROTOS.tcp, out2in_only=1)
1509         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1511                                                   is_inside=0)
1512
1513         # from client to service
1514         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516              TCP(sport=12345, dport=external_port))
1517         self.pg1.add_stream(p)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(1)
1521         p = capture[0]
1522         server = None
1523         try:
1524             ip = p[IP]
1525             tcp = p[TCP]
1526             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527             self.assertEqual(tcp.dport, local_port)
1528             self.check_tcp_checksum(p)
1529             self.check_ip_checksum(p)
1530         except:
1531             self.logger.error(ppp("Unexpected or invalid packet:", p))
1532             raise
1533
1534         # from service back to client
1535         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537              TCP(sport=local_port, dport=12345))
1538         self.pg0.add_stream(p)
1539         self.pg_enable_capture(self.pg_interfaces)
1540         self.pg_start()
1541         capture = self.pg1.get_capture(1)
1542         p = capture[0]
1543         try:
1544             ip = p[IP]
1545             tcp = p[TCP]
1546             self.assertEqual(ip.src, self.nat_addr)
1547             self.assertEqual(tcp.sport, external_port)
1548             self.check_tcp_checksum(p)
1549             self.check_ip_checksum(p)
1550         except:
1551             self.logger.error(ppp("Unexpected or invalid packet:", p))
1552             raise
1553
1554         # from client to server (no translation)
1555         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557              TCP(sport=12346, dport=local_port))
1558         self.pg1.add_stream(p)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg0.get_capture(1)
1562         p = capture[0]
1563         server = None
1564         try:
1565             ip = p[IP]
1566             tcp = p[TCP]
1567             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568             self.assertEqual(tcp.dport, local_port)
1569             self.check_tcp_checksum(p)
1570             self.check_ip_checksum(p)
1571         except:
1572             self.logger.error(ppp("Unexpected or invalid packet:", p))
1573             raise
1574
1575         # from service back to client (no translation)
1576         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578              TCP(sport=local_port, dport=12346))
1579         self.pg0.add_stream(p)
1580         self.pg_enable_capture(self.pg_interfaces)
1581         self.pg_start()
1582         capture = self.pg1.get_capture(1)
1583         p = capture[0]
1584         try:
1585             ip = p[IP]
1586             tcp = p[TCP]
1587             self.assertEqual(ip.src, self.pg0.remote_ip4)
1588             self.assertEqual(tcp.sport, local_port)
1589             self.check_tcp_checksum(p)
1590             self.check_ip_checksum(p)
1591         except:
1592             self.logger.error(ppp("Unexpected or invalid packet:", p))
1593             raise
1594
1595     def test_static_vrf_aware(self):
1596         """ 1:1 NAT VRF awareness """
1597
1598         nat_ip1 = "10.0.0.30"
1599         nat_ip2 = "10.0.0.40"
1600         self.tcp_port_out = 6303
1601         self.udp_port_out = 6304
1602         self.icmp_id_out = 6305
1603
1604         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1605                                       vrf_id=10)
1606         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1607                                       vrf_id=10)
1608         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1609                                                   is_inside=0)
1610         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1612
1613         # inside interface VRF match NAT44 static mapping VRF
1614         pkts = self.create_stream_in(self.pg4, self.pg3)
1615         self.pg4.add_stream(pkts)
1616         self.pg_enable_capture(self.pg_interfaces)
1617         self.pg_start()
1618         capture = self.pg3.get_capture(len(pkts))
1619         self.verify_capture_out(capture, nat_ip1, True)
1620
1621         # inside interface VRF don't match NAT44 static mapping VRF (packets
1622         # are dropped)
1623         pkts = self.create_stream_in(self.pg0, self.pg3)
1624         self.pg0.add_stream(pkts)
1625         self.pg_enable_capture(self.pg_interfaces)
1626         self.pg_start()
1627         self.pg3.assert_nothing_captured()
1628
1629     def test_identity_nat(self):
1630         """ Identity NAT """
1631
1632         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1635                                                   is_inside=0)
1636
1637         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639              TCP(sport=12345, dport=56789))
1640         self.pg1.add_stream(p)
1641         self.pg_enable_capture(self.pg_interfaces)
1642         self.pg_start()
1643         capture = self.pg0.get_capture(1)
1644         p = capture[0]
1645         try:
1646             ip = p[IP]
1647             tcp = p[TCP]
1648             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649             self.assertEqual(ip.src, self.pg1.remote_ip4)
1650             self.assertEqual(tcp.dport, 56789)
1651             self.assertEqual(tcp.sport, 12345)
1652             self.check_tcp_checksum(p)
1653             self.check_ip_checksum(p)
1654         except:
1655             self.logger.error(ppp("Unexpected or invalid packet:", p))
1656             raise
1657
1658     def test_static_lb(self):
1659         """ NAT44 local service load balancing """
1660         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1661         external_port = 80
1662         local_port = 8080
1663         server1 = self.pg0.remote_hosts[0]
1664         server2 = self.pg0.remote_hosts[1]
1665
1666         locals = [{'addr': server1.ip4n,
1667                    'port': local_port,
1668                    'probability': 70},
1669                   {'addr': server2.ip4n,
1670                    'port': local_port,
1671                    'probability': 30}]
1672
1673         self.nat44_add_address(self.nat_addr)
1674         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1675                                                   external_port,
1676                                                   IP_PROTOS.tcp,
1677                                                   local_num=len(locals),
1678                                                   locals=locals)
1679         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1681                                                   is_inside=0)
1682
1683         # from client to service
1684         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686              TCP(sport=12345, dport=external_port))
1687         self.pg1.add_stream(p)
1688         self.pg_enable_capture(self.pg_interfaces)
1689         self.pg_start()
1690         capture = self.pg0.get_capture(1)
1691         p = capture[0]
1692         server = None
1693         try:
1694             ip = p[IP]
1695             tcp = p[TCP]
1696             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697             if ip.dst == server1.ip4:
1698                 server = server1
1699             else:
1700                 server = server2
1701             self.assertEqual(tcp.dport, local_port)
1702             self.check_tcp_checksum(p)
1703             self.check_ip_checksum(p)
1704         except:
1705             self.logger.error(ppp("Unexpected or invalid packet:", p))
1706             raise
1707
1708         # from service back to client
1709         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711              TCP(sport=local_port, dport=12345))
1712         self.pg0.add_stream(p)
1713         self.pg_enable_capture(self.pg_interfaces)
1714         self.pg_start()
1715         capture = self.pg1.get_capture(1)
1716         p = capture[0]
1717         try:
1718             ip = p[IP]
1719             tcp = p[TCP]
1720             self.assertEqual(ip.src, self.nat_addr)
1721             self.assertEqual(tcp.sport, external_port)
1722             self.check_tcp_checksum(p)
1723             self.check_ip_checksum(p)
1724         except:
1725             self.logger.error(ppp("Unexpected or invalid packet:", p))
1726             raise
1727
1728         # multiple clients
1729         server1_n = 0
1730         server2_n = 0
1731         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1732         pkts = []
1733         for client in clients:
1734             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735                  IP(src=client, dst=self.nat_addr) /
1736                  TCP(sport=12345, dport=external_port))
1737             pkts.append(p)
1738         self.pg1.add_stream(pkts)
1739         self.pg_enable_capture(self.pg_interfaces)
1740         self.pg_start()
1741         capture = self.pg0.get_capture(len(pkts))
1742         for p in capture:
1743             if p[IP].dst == server1.ip4:
1744                 server1_n += 1
1745             else:
1746                 server2_n += 1
1747         self.assertTrue(server1_n > server2_n)
1748
1749     def test_static_lb_2(self):
1750         """ NAT44 local service load balancing (asymmetrical rule) """
1751         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1752         external_port = 80
1753         local_port = 8080
1754         server1 = self.pg0.remote_hosts[0]
1755         server2 = self.pg0.remote_hosts[1]
1756
1757         locals = [{'addr': server1.ip4n,
1758                    'port': local_port,
1759                    'probability': 70},
1760                   {'addr': server2.ip4n,
1761                    'port': local_port,
1762                    'probability': 30}]
1763
1764         self.vapi.nat44_forwarding_enable_disable(1)
1765         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1766                                                   external_port,
1767                                                   IP_PROTOS.tcp,
1768                                                   out2in_only=1,
1769                                                   local_num=len(locals),
1770                                                   locals=locals)
1771         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1773                                                   is_inside=0)
1774
1775         # from client to service
1776         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778              TCP(sport=12345, dport=external_port))
1779         self.pg1.add_stream(p)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782         capture = self.pg0.get_capture(1)
1783         p = capture[0]
1784         server = None
1785         try:
1786             ip = p[IP]
1787             tcp = p[TCP]
1788             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789             if ip.dst == server1.ip4:
1790                 server = server1
1791             else:
1792                 server = server2
1793             self.assertEqual(tcp.dport, local_port)
1794             self.check_tcp_checksum(p)
1795             self.check_ip_checksum(p)
1796         except:
1797             self.logger.error(ppp("Unexpected or invalid packet:", p))
1798             raise
1799
1800         # from service back to client
1801         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803              TCP(sport=local_port, dport=12345))
1804         self.pg0.add_stream(p)
1805         self.pg_enable_capture(self.pg_interfaces)
1806         self.pg_start()
1807         capture = self.pg1.get_capture(1)
1808         p = capture[0]
1809         try:
1810             ip = p[IP]
1811             tcp = p[TCP]
1812             self.assertEqual(ip.src, self.nat_addr)
1813             self.assertEqual(tcp.sport, external_port)
1814             self.check_tcp_checksum(p)
1815             self.check_ip_checksum(p)
1816         except:
1817             self.logger.error(ppp("Unexpected or invalid packet:", p))
1818             raise
1819
1820         # from client to server (no translation)
1821         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823              TCP(sport=12346, dport=local_port))
1824         self.pg1.add_stream(p)
1825         self.pg_enable_capture(self.pg_interfaces)
1826         self.pg_start()
1827         capture = self.pg0.get_capture(1)
1828         p = capture[0]
1829         server = None
1830         try:
1831             ip = p[IP]
1832             tcp = p[TCP]
1833             self.assertEqual(ip.dst, server1.ip4)
1834             self.assertEqual(tcp.dport, local_port)
1835             self.check_tcp_checksum(p)
1836             self.check_ip_checksum(p)
1837         except:
1838             self.logger.error(ppp("Unexpected or invalid packet:", p))
1839             raise
1840
1841         # from service back to client (no translation)
1842         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844              TCP(sport=local_port, dport=12346))
1845         self.pg0.add_stream(p)
1846         self.pg_enable_capture(self.pg_interfaces)
1847         self.pg_start()
1848         capture = self.pg1.get_capture(1)
1849         p = capture[0]
1850         try:
1851             ip = p[IP]
1852             tcp = p[TCP]
1853             self.assertEqual(ip.src, server1.ip4)
1854             self.assertEqual(tcp.sport, local_port)
1855             self.check_tcp_checksum(p)
1856             self.check_ip_checksum(p)
1857         except:
1858             self.logger.error(ppp("Unexpected or invalid packet:", p))
1859             raise
1860
1861     def test_multiple_inside_interfaces(self):
1862         """ NAT44 multiple non-overlapping address space inside interfaces """
1863
1864         self.nat44_add_address(self.nat_addr)
1865         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1868                                                   is_inside=0)
1869
1870         # between two NAT44 inside interfaces (no translation)
1871         pkts = self.create_stream_in(self.pg0, self.pg1)
1872         self.pg0.add_stream(pkts)
1873         self.pg_enable_capture(self.pg_interfaces)
1874         self.pg_start()
1875         capture = self.pg1.get_capture(len(pkts))
1876         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1877
1878         # from NAT44 inside to interface without NAT44 feature (no translation)
1879         pkts = self.create_stream_in(self.pg0, self.pg2)
1880         self.pg0.add_stream(pkts)
1881         self.pg_enable_capture(self.pg_interfaces)
1882         self.pg_start()
1883         capture = self.pg2.get_capture(len(pkts))
1884         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1885
1886         # in2out 1st interface
1887         pkts = self.create_stream_in(self.pg0, self.pg3)
1888         self.pg0.add_stream(pkts)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         capture = self.pg3.get_capture(len(pkts))
1892         self.verify_capture_out(capture)
1893
1894         # out2in 1st interface
1895         pkts = self.create_stream_out(self.pg3)
1896         self.pg3.add_stream(pkts)
1897         self.pg_enable_capture(self.pg_interfaces)
1898         self.pg_start()
1899         capture = self.pg0.get_capture(len(pkts))
1900         self.verify_capture_in(capture, self.pg0)
1901
1902         # in2out 2nd interface
1903         pkts = self.create_stream_in(self.pg1, self.pg3)
1904         self.pg1.add_stream(pkts)
1905         self.pg_enable_capture(self.pg_interfaces)
1906         self.pg_start()
1907         capture = self.pg3.get_capture(len(pkts))
1908         self.verify_capture_out(capture)
1909
1910         # out2in 2nd interface
1911         pkts = self.create_stream_out(self.pg3)
1912         self.pg3.add_stream(pkts)
1913         self.pg_enable_capture(self.pg_interfaces)
1914         self.pg_start()
1915         capture = self.pg1.get_capture(len(pkts))
1916         self.verify_capture_in(capture, self.pg1)
1917
1918     def test_inside_overlapping_interfaces(self):
1919         """ NAT44 multiple inside interfaces with overlapping address space """
1920
1921         static_nat_ip = "10.0.0.10"
1922         self.nat44_add_address(self.nat_addr)
1923         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1924                                                   is_inside=0)
1925         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1929                                       vrf_id=20)
1930
1931         # between NAT44 inside interfaces with same VRF (no translation)
1932         pkts = self.create_stream_in(self.pg4, self.pg5)
1933         self.pg4.add_stream(pkts)
1934         self.pg_enable_capture(self.pg_interfaces)
1935         self.pg_start()
1936         capture = self.pg5.get_capture(len(pkts))
1937         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1938
1939         # between NAT44 inside interfaces with different VRF (hairpinning)
1940         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942              TCP(sport=1234, dport=5678))
1943         self.pg4.add_stream(p)
1944         self.pg_enable_capture(self.pg_interfaces)
1945         self.pg_start()
1946         capture = self.pg6.get_capture(1)
1947         p = capture[0]
1948         try:
1949             ip = p[IP]
1950             tcp = p[TCP]
1951             self.assertEqual(ip.src, self.nat_addr)
1952             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953             self.assertNotEqual(tcp.sport, 1234)
1954             self.assertEqual(tcp.dport, 5678)
1955         except:
1956             self.logger.error(ppp("Unexpected or invalid packet:", p))
1957             raise
1958
1959         # in2out 1st interface
1960         pkts = self.create_stream_in(self.pg4, self.pg3)
1961         self.pg4.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg3.get_capture(len(pkts))
1965         self.verify_capture_out(capture)
1966
1967         # out2in 1st interface
1968         pkts = self.create_stream_out(self.pg3)
1969         self.pg3.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg4.get_capture(len(pkts))
1973         self.verify_capture_in(capture, self.pg4)
1974
1975         # in2out 2nd interface
1976         pkts = self.create_stream_in(self.pg5, self.pg3)
1977         self.pg5.add_stream(pkts)
1978         self.pg_enable_capture(self.pg_interfaces)
1979         self.pg_start()
1980         capture = self.pg3.get_capture(len(pkts))
1981         self.verify_capture_out(capture)
1982
1983         # out2in 2nd interface
1984         pkts = self.create_stream_out(self.pg3)
1985         self.pg3.add_stream(pkts)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg5.get_capture(len(pkts))
1989         self.verify_capture_in(capture, self.pg5)
1990
1991         # pg5 session dump
1992         addresses = self.vapi.nat44_address_dump()
1993         self.assertEqual(len(addresses), 1)
1994         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995         self.assertEqual(len(sessions), 3)
1996         for session in sessions:
1997             self.assertFalse(session.is_static)
1998             self.assertEqual(session.inside_ip_address[0:4],
1999                              self.pg5.remote_ip4n)
2000             self.assertEqual(session.outside_ip_address,
2001                              addresses[0].ip_address)
2002         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2011
2012         # in2out 3rd interface
2013         pkts = self.create_stream_in(self.pg6, self.pg3)
2014         self.pg6.add_stream(pkts)
2015         self.pg_enable_capture(self.pg_interfaces)
2016         self.pg_start()
2017         capture = self.pg3.get_capture(len(pkts))
2018         self.verify_capture_out(capture, static_nat_ip, True)
2019
2020         # out2in 3rd interface
2021         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022         self.pg3.add_stream(pkts)
2023         self.pg_enable_capture(self.pg_interfaces)
2024         self.pg_start()
2025         capture = self.pg6.get_capture(len(pkts))
2026         self.verify_capture_in(capture, self.pg6)
2027
2028         # general user and session dump verifications
2029         users = self.vapi.nat44_user_dump()
2030         self.assertTrue(len(users) >= 3)
2031         addresses = self.vapi.nat44_address_dump()
2032         self.assertEqual(len(addresses), 1)
2033         for user in users:
2034             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2035                                                          user.vrf_id)
2036             for session in sessions:
2037                 self.assertEqual(user.ip_address, session.inside_ip_address)
2038                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039                 self.assertTrue(session.protocol in
2040                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2041                                  IP_PROTOS.icmp])
2042
2043         # pg4 session dump
2044         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045         self.assertTrue(len(sessions) >= 4)
2046         for session in sessions:
2047             self.assertFalse(session.is_static)
2048             self.assertEqual(session.inside_ip_address[0:4],
2049                              self.pg4.remote_ip4n)
2050             self.assertEqual(session.outside_ip_address,
2051                              addresses[0].ip_address)
2052
2053         # pg6 session dump
2054         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055         self.assertTrue(len(sessions) >= 3)
2056         for session in sessions:
2057             self.assertTrue(session.is_static)
2058             self.assertEqual(session.inside_ip_address[0:4],
2059                              self.pg6.remote_ip4n)
2060             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061                              map(int, static_nat_ip.split('.')))
2062             self.assertTrue(session.inside_port in
2063                             [self.tcp_port_in, self.udp_port_in,
2064                              self.icmp_id_in])
2065
2066     def test_hairpinning(self):
2067         """ NAT44 hairpinning - 1:1 NAPT """
2068
2069         host = self.pg0.remote_hosts[0]
2070         server = self.pg0.remote_hosts[1]
2071         host_in_port = 1234
2072         host_out_port = 0
2073         server_in_port = 5678
2074         server_out_port = 8765
2075
2076         self.nat44_add_address(self.nat_addr)
2077         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2079                                                   is_inside=0)
2080         # add static mapping for server
2081         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082                                       server_in_port, server_out_port,
2083                                       proto=IP_PROTOS.tcp)
2084
2085         # send packet from host to server
2086         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087              IP(src=host.ip4, dst=self.nat_addr) /
2088              TCP(sport=host_in_port, dport=server_out_port))
2089         self.pg0.add_stream(p)
2090         self.pg_enable_capture(self.pg_interfaces)
2091         self.pg_start()
2092         capture = self.pg0.get_capture(1)
2093         p = capture[0]
2094         try:
2095             ip = p[IP]
2096             tcp = p[TCP]
2097             self.assertEqual(ip.src, self.nat_addr)
2098             self.assertEqual(ip.dst, server.ip4)
2099             self.assertNotEqual(tcp.sport, host_in_port)
2100             self.assertEqual(tcp.dport, server_in_port)
2101             self.check_tcp_checksum(p)
2102             host_out_port = tcp.sport
2103         except:
2104             self.logger.error(ppp("Unexpected or invalid packet:", p))
2105             raise
2106
2107         # send reply from server to host
2108         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109              IP(src=server.ip4, dst=self.nat_addr) /
2110              TCP(sport=server_in_port, dport=host_out_port))
2111         self.pg0.add_stream(p)
2112         self.pg_enable_capture(self.pg_interfaces)
2113         self.pg_start()
2114         capture = self.pg0.get_capture(1)
2115         p = capture[0]
2116         try:
2117             ip = p[IP]
2118             tcp = p[TCP]
2119             self.assertEqual(ip.src, self.nat_addr)
2120             self.assertEqual(ip.dst, host.ip4)
2121             self.assertEqual(tcp.sport, server_out_port)
2122             self.assertEqual(tcp.dport, host_in_port)
2123             self.check_tcp_checksum(p)
2124         except:
2125             self.logger.error(ppp("Unexpected or invalid packet:", p))
2126             raise
2127
2128     def test_hairpinning2(self):
2129         """ NAT44 hairpinning - 1:1 NAT"""
2130
2131         server1_nat_ip = "10.0.0.10"
2132         server2_nat_ip = "10.0.0.11"
2133         host = self.pg0.remote_hosts[0]
2134         server1 = self.pg0.remote_hosts[1]
2135         server2 = self.pg0.remote_hosts[2]
2136         server_tcp_port = 22
2137         server_udp_port = 20
2138
2139         self.nat44_add_address(self.nat_addr)
2140         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2142                                                   is_inside=0)
2143
2144         # add static mapping for servers
2145         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2147
2148         # host to server1
2149         pkts = []
2150         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151              IP(src=host.ip4, dst=server1_nat_ip) /
2152              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2153         pkts.append(p)
2154         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155              IP(src=host.ip4, dst=server1_nat_ip) /
2156              UDP(sport=self.udp_port_in, dport=server_udp_port))
2157         pkts.append(p)
2158         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159              IP(src=host.ip4, dst=server1_nat_ip) /
2160              ICMP(id=self.icmp_id_in, type='echo-request'))
2161         pkts.append(p)
2162         self.pg0.add_stream(pkts)
2163         self.pg_enable_capture(self.pg_interfaces)
2164         self.pg_start()
2165         capture = self.pg0.get_capture(len(pkts))
2166         for packet in capture:
2167             try:
2168                 self.assertEqual(packet[IP].src, self.nat_addr)
2169                 self.assertEqual(packet[IP].dst, server1.ip4)
2170                 if packet.haslayer(TCP):
2171                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2173                     self.tcp_port_out = packet[TCP].sport
2174                     self.check_tcp_checksum(packet)
2175                 elif packet.haslayer(UDP):
2176                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177                     self.assertEqual(packet[UDP].dport, server_udp_port)
2178                     self.udp_port_out = packet[UDP].sport
2179                 else:
2180                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181                     self.icmp_id_out = packet[ICMP].id
2182             except:
2183                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2184                 raise
2185
2186         # server1 to host
2187         pkts = []
2188         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189              IP(src=server1.ip4, dst=self.nat_addr) /
2190              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2191         pkts.append(p)
2192         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193              IP(src=server1.ip4, dst=self.nat_addr) /
2194              UDP(sport=server_udp_port, dport=self.udp_port_out))
2195         pkts.append(p)
2196         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197              IP(src=server1.ip4, dst=self.nat_addr) /
2198              ICMP(id=self.icmp_id_out, type='echo-reply'))
2199         pkts.append(p)
2200         self.pg0.add_stream(pkts)
2201         self.pg_enable_capture(self.pg_interfaces)
2202         self.pg_start()
2203         capture = self.pg0.get_capture(len(pkts))
2204         for packet in capture:
2205             try:
2206                 self.assertEqual(packet[IP].src, server1_nat_ip)
2207                 self.assertEqual(packet[IP].dst, host.ip4)
2208                 if packet.haslayer(TCP):
2209                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2211                     self.check_tcp_checksum(packet)
2212                 elif packet.haslayer(UDP):
2213                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214                     self.assertEqual(packet[UDP].sport, server_udp_port)
2215                 else:
2216                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2217             except:
2218                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2219                 raise
2220
2221         # server2 to server1
2222         pkts = []
2223         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224              IP(src=server2.ip4, dst=server1_nat_ip) /
2225              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2226         pkts.append(p)
2227         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228              IP(src=server2.ip4, dst=server1_nat_ip) /
2229              UDP(sport=self.udp_port_in, dport=server_udp_port))
2230         pkts.append(p)
2231         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232              IP(src=server2.ip4, dst=server1_nat_ip) /
2233              ICMP(id=self.icmp_id_in, type='echo-request'))
2234         pkts.append(p)
2235         self.pg0.add_stream(pkts)
2236         self.pg_enable_capture(self.pg_interfaces)
2237         self.pg_start()
2238         capture = self.pg0.get_capture(len(pkts))
2239         for packet in capture:
2240             try:
2241                 self.assertEqual(packet[IP].src, server2_nat_ip)
2242                 self.assertEqual(packet[IP].dst, server1.ip4)
2243                 if packet.haslayer(TCP):
2244                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2246                     self.tcp_port_out = packet[TCP].sport
2247                     self.check_tcp_checksum(packet)
2248                 elif packet.haslayer(UDP):
2249                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250                     self.assertEqual(packet[UDP].dport, server_udp_port)
2251                     self.udp_port_out = packet[UDP].sport
2252                 else:
2253                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254                     self.icmp_id_out = packet[ICMP].id
2255             except:
2256                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2257                 raise
2258
2259         # server1 to server2
2260         pkts = []
2261         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262              IP(src=server1.ip4, dst=server2_nat_ip) /
2263              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2264         pkts.append(p)
2265         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266              IP(src=server1.ip4, dst=server2_nat_ip) /
2267              UDP(sport=server_udp_port, dport=self.udp_port_out))
2268         pkts.append(p)
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=server1.ip4, dst=server2_nat_ip) /
2271              ICMP(id=self.icmp_id_out, type='echo-reply'))
2272         pkts.append(p)
2273         self.pg0.add_stream(pkts)
2274         self.pg_enable_capture(self.pg_interfaces)
2275         self.pg_start()
2276         capture = self.pg0.get_capture(len(pkts))
2277         for packet in capture:
2278             try:
2279                 self.assertEqual(packet[IP].src, server1_nat_ip)
2280                 self.assertEqual(packet[IP].dst, server2.ip4)
2281                 if packet.haslayer(TCP):
2282                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2284                     self.check_tcp_checksum(packet)
2285                 elif packet.haslayer(UDP):
2286                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287                     self.assertEqual(packet[UDP].sport, server_udp_port)
2288                 else:
2289                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2290             except:
2291                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2292                 raise
2293
2294     def test_max_translations_per_user(self):
2295         """ MAX translations per user - recycle the least recently used """
2296
2297         self.nat44_add_address(self.nat_addr)
2298         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2300                                                   is_inside=0)
2301
2302         # get maximum number of translations per user
2303         nat44_config = self.vapi.nat_show_config()
2304
2305         # send more than maximum number of translations per user packets
2306         pkts_num = nat44_config.max_translations_per_user + 5
2307         pkts = []
2308         for port in range(0, pkts_num):
2309             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311                  TCP(sport=1025 + port))
2312             pkts.append(p)
2313         self.pg0.add_stream(pkts)
2314         self.pg_enable_capture(self.pg_interfaces)
2315         self.pg_start()
2316
2317         # verify number of translated packet
2318         self.pg1.get_capture(pkts_num)
2319
2320     def test_interface_addr(self):
2321         """ Acquire NAT44 addresses from interface """
2322         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2323
2324         # no address in NAT pool
2325         adresses = self.vapi.nat44_address_dump()
2326         self.assertEqual(0, len(adresses))
2327
2328         # configure interface address and check NAT address pool
2329         self.pg7.config_ip4()
2330         adresses = self.vapi.nat44_address_dump()
2331         self.assertEqual(1, len(adresses))
2332         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2333
2334         # remove interface address and check NAT address pool
2335         self.pg7.unconfig_ip4()
2336         adresses = self.vapi.nat44_address_dump()
2337         self.assertEqual(0, len(adresses))
2338
2339     def test_interface_addr_static_mapping(self):
2340         """ Static mapping with addresses from interface """
2341         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342         self.nat44_add_static_mapping(
2343             '1.2.3.4',
2344             external_sw_if_index=self.pg7.sw_if_index)
2345
2346         # static mappings with external interface
2347         static_mappings = self.vapi.nat44_static_mapping_dump()
2348         self.assertEqual(1, len(static_mappings))
2349         self.assertEqual(self.pg7.sw_if_index,
2350                          static_mappings[0].external_sw_if_index)
2351
2352         # configure interface address and check static mappings
2353         self.pg7.config_ip4()
2354         static_mappings = self.vapi.nat44_static_mapping_dump()
2355         self.assertEqual(1, len(static_mappings))
2356         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357                          self.pg7.local_ip4n)
2358         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2359
2360         # remove interface address and check static mappings
2361         self.pg7.unconfig_ip4()
2362         static_mappings = self.vapi.nat44_static_mapping_dump()
2363         self.assertEqual(0, len(static_mappings))
2364
2365     def test_interface_addr_identity_nat(self):
2366         """ Identity NAT with addresses from interface """
2367
2368         port = 53053
2369         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370         self.vapi.nat44_add_del_identity_mapping(
2371             sw_if_index=self.pg7.sw_if_index,
2372             port=port,
2373             protocol=IP_PROTOS.tcp,
2374             addr_only=0)
2375
2376         # identity mappings with external interface
2377         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378         self.assertEqual(1, len(identity_mappings))
2379         self.assertEqual(self.pg7.sw_if_index,
2380                          identity_mappings[0].sw_if_index)
2381
2382         # configure interface address and check identity mappings
2383         self.pg7.config_ip4()
2384         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385         self.assertEqual(1, len(identity_mappings))
2386         self.assertEqual(identity_mappings[0].ip_address,
2387                          self.pg7.local_ip4n)
2388         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389         self.assertEqual(port, identity_mappings[0].port)
2390         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2391
2392         # remove interface address and check identity mappings
2393         self.pg7.unconfig_ip4()
2394         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395         self.assertEqual(0, len(identity_mappings))
2396
2397     def test_ipfix_nat44_sess(self):
2398         """ IPFIX logging NAT44 session created/delted """
2399         self.ipfix_domain_id = 10
2400         self.ipfix_src_port = 20202
2401         colector_port = 30303
2402         bind_layers(UDP, IPFIX, dport=30303)
2403         self.nat44_add_address(self.nat_addr)
2404         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2406                                                   is_inside=0)
2407         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408                                      src_address=self.pg3.local_ip4n,
2409                                      path_mtu=512,
2410                                      template_interval=10,
2411                                      collector_port=colector_port)
2412         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413                             src_port=self.ipfix_src_port)
2414
2415         pkts = self.create_stream_in(self.pg0, self.pg1)
2416         self.pg0.add_stream(pkts)
2417         self.pg_enable_capture(self.pg_interfaces)
2418         self.pg_start()
2419         capture = self.pg1.get_capture(len(pkts))
2420         self.verify_capture_out(capture)
2421         self.nat44_add_address(self.nat_addr, is_add=0)
2422         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2423         capture = self.pg3.get_capture(9)
2424         ipfix = IPFIXDecoder()
2425         # first load template
2426         for p in capture:
2427             self.assertTrue(p.haslayer(IPFIX))
2428             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431             self.assertEqual(p[UDP].dport, colector_port)
2432             self.assertEqual(p[IPFIX].observationDomainID,
2433                              self.ipfix_domain_id)
2434             if p.haslayer(Template):
2435                 ipfix.add_template(p.getlayer(Template))
2436         # verify events in data set
2437         for p in capture:
2438             if p.haslayer(Data):
2439                 data = ipfix.decode_data_set(p.getlayer(Set))
2440                 self.verify_ipfix_nat44_ses(data)
2441
2442     def test_ipfix_addr_exhausted(self):
2443         """ IPFIX logging NAT addresses exhausted """
2444         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2446                                                   is_inside=0)
2447         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448                                      src_address=self.pg3.local_ip4n,
2449                                      path_mtu=512,
2450                                      template_interval=10)
2451         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452                             src_port=self.ipfix_src_port)
2453
2454         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2456              TCP(sport=3025))
2457         self.pg0.add_stream(p)
2458         self.pg_enable_capture(self.pg_interfaces)
2459         self.pg_start()
2460         capture = self.pg1.get_capture(0)
2461         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2462         capture = self.pg3.get_capture(9)
2463         ipfix = IPFIXDecoder()
2464         # first load template
2465         for p in capture:
2466             self.assertTrue(p.haslayer(IPFIX))
2467             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470             self.assertEqual(p[UDP].dport, 4739)
2471             self.assertEqual(p[IPFIX].observationDomainID,
2472                              self.ipfix_domain_id)
2473             if p.haslayer(Template):
2474                 ipfix.add_template(p.getlayer(Template))
2475         # verify events in data set
2476         for p in capture:
2477             if p.haslayer(Data):
2478                 data = ipfix.decode_data_set(p.getlayer(Set))
2479                 self.verify_ipfix_addr_exhausted(data)
2480
2481     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482     def test_ipfix_max_sessions(self):
2483         """ IPFIX logging maximum session entries exceeded """
2484         self.nat44_add_address(self.nat_addr)
2485         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2487                                                   is_inside=0)
2488
2489         nat44_config = self.vapi.nat_show_config()
2490         max_sessions = 10 * nat44_config.translation_buckets
2491
2492         pkts = []
2493         for i in range(0, max_sessions):
2494             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496                  IP(src=src, dst=self.pg1.remote_ip4) /
2497                  TCP(sport=1025))
2498             pkts.append(p)
2499         self.pg0.add_stream(pkts)
2500         self.pg_enable_capture(self.pg_interfaces)
2501         self.pg_start()
2502
2503         self.pg1.get_capture(max_sessions)
2504         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505                                      src_address=self.pg3.local_ip4n,
2506                                      path_mtu=512,
2507                                      template_interval=10)
2508         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509                             src_port=self.ipfix_src_port)
2510
2511         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2513              TCP(sport=1025))
2514         self.pg0.add_stream(p)
2515         self.pg_enable_capture(self.pg_interfaces)
2516         self.pg_start()
2517         self.pg1.get_capture(0)
2518         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2519         capture = self.pg3.get_capture(9)
2520         ipfix = IPFIXDecoder()
2521         # first load template
2522         for p in capture:
2523             self.assertTrue(p.haslayer(IPFIX))
2524             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527             self.assertEqual(p[UDP].dport, 4739)
2528             self.assertEqual(p[IPFIX].observationDomainID,
2529                              self.ipfix_domain_id)
2530             if p.haslayer(Template):
2531                 ipfix.add_template(p.getlayer(Template))
2532         # verify events in data set
2533         for p in capture:
2534             if p.haslayer(Data):
2535                 data = ipfix.decode_data_set(p.getlayer(Set))
2536                 self.verify_ipfix_max_sessions(data, max_sessions)
2537
2538     def test_pool_addr_fib(self):
2539         """ NAT44 add pool addresses to FIB """
2540         static_addr = '10.0.0.10'
2541         self.nat44_add_address(self.nat_addr)
2542         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2544                                                   is_inside=0)
2545         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2546
2547         # NAT44 address
2548         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549              ARP(op=ARP.who_has, pdst=self.nat_addr,
2550                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551         self.pg1.add_stream(p)
2552         self.pg_enable_capture(self.pg_interfaces)
2553         self.pg_start()
2554         capture = self.pg1.get_capture(1)
2555         self.assertTrue(capture[0].haslayer(ARP))
2556         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2557
2558         # 1:1 NAT address
2559         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560              ARP(op=ARP.who_has, pdst=static_addr,
2561                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562         self.pg1.add_stream(p)
2563         self.pg_enable_capture(self.pg_interfaces)
2564         self.pg_start()
2565         capture = self.pg1.get_capture(1)
2566         self.assertTrue(capture[0].haslayer(ARP))
2567         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2568
2569         # send ARP to non-NAT44 interface
2570         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571              ARP(op=ARP.who_has, pdst=self.nat_addr,
2572                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573         self.pg2.add_stream(p)
2574         self.pg_enable_capture(self.pg_interfaces)
2575         self.pg_start()
2576         capture = self.pg1.get_capture(0)
2577
2578         # remove addresses and verify
2579         self.nat44_add_address(self.nat_addr, is_add=0)
2580         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2581                                       is_add=0)
2582
2583         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584              ARP(op=ARP.who_has, pdst=self.nat_addr,
2585                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586         self.pg1.add_stream(p)
2587         self.pg_enable_capture(self.pg_interfaces)
2588         self.pg_start()
2589         capture = self.pg1.get_capture(0)
2590
2591         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592              ARP(op=ARP.who_has, pdst=static_addr,
2593                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594         self.pg1.add_stream(p)
2595         self.pg_enable_capture(self.pg_interfaces)
2596         self.pg_start()
2597         capture = self.pg1.get_capture(0)
2598
2599     def test_vrf_mode(self):
2600         """ NAT44 tenant VRF aware address pool mode """
2601
2602         vrf_id1 = 1
2603         vrf_id2 = 2
2604         nat_ip1 = "10.0.0.10"
2605         nat_ip2 = "10.0.0.11"
2606
2607         self.pg0.unconfig_ip4()
2608         self.pg1.unconfig_ip4()
2609         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611         self.pg0.set_table_ip4(vrf_id1)
2612         self.pg1.set_table_ip4(vrf_id2)
2613         self.pg0.config_ip4()
2614         self.pg1.config_ip4()
2615
2616         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2621                                                   is_inside=0)
2622
2623         # first VRF
2624         pkts = self.create_stream_in(self.pg0, self.pg2)
2625         self.pg0.add_stream(pkts)
2626         self.pg_enable_capture(self.pg_interfaces)
2627         self.pg_start()
2628         capture = self.pg2.get_capture(len(pkts))
2629         self.verify_capture_out(capture, nat_ip1)
2630
2631         # second VRF
2632         pkts = self.create_stream_in(self.pg1, self.pg2)
2633         self.pg1.add_stream(pkts)
2634         self.pg_enable_capture(self.pg_interfaces)
2635         self.pg_start()
2636         capture = self.pg2.get_capture(len(pkts))
2637         self.verify_capture_out(capture, nat_ip2)
2638
2639         self.pg0.unconfig_ip4()
2640         self.pg1.unconfig_ip4()
2641         self.pg0.set_table_ip4(0)
2642         self.pg1.set_table_ip4(0)
2643         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2645
2646     def test_vrf_feature_independent(self):
2647         """ NAT44 tenant VRF independent address pool mode """
2648
2649         nat_ip1 = "10.0.0.10"
2650         nat_ip2 = "10.0.0.11"
2651
2652         self.nat44_add_address(nat_ip1)
2653         self.nat44_add_address(nat_ip2, vrf_id=99)
2654         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2657                                                   is_inside=0)
2658
2659         # first VRF
2660         pkts = self.create_stream_in(self.pg0, self.pg2)
2661         self.pg0.add_stream(pkts)
2662         self.pg_enable_capture(self.pg_interfaces)
2663         self.pg_start()
2664         capture = self.pg2.get_capture(len(pkts))
2665         self.verify_capture_out(capture, nat_ip1)
2666
2667         # second VRF
2668         pkts = self.create_stream_in(self.pg1, self.pg2)
2669         self.pg1.add_stream(pkts)
2670         self.pg_enable_capture(self.pg_interfaces)
2671         self.pg_start()
2672         capture = self.pg2.get_capture(len(pkts))
2673         self.verify_capture_out(capture, nat_ip1)
2674
2675     def test_dynamic_ipless_interfaces(self):
2676         """ NAT44 interfaces without configured IP address """
2677
2678         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679                                       mactobinary(self.pg7.remote_mac),
2680                                       self.pg7.remote_ip4n,
2681                                       is_static=1)
2682         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683                                       mactobinary(self.pg8.remote_mac),
2684                                       self.pg8.remote_ip4n,
2685                                       is_static=1)
2686
2687         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688                                    dst_address_length=32,
2689                                    next_hop_address=self.pg7.remote_ip4n,
2690                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2691         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692                                    dst_address_length=32,
2693                                    next_hop_address=self.pg8.remote_ip4n,
2694                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2695
2696         self.nat44_add_address(self.nat_addr)
2697         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2699                                                   is_inside=0)
2700
2701         # in2out
2702         pkts = self.create_stream_in(self.pg7, self.pg8)
2703         self.pg7.add_stream(pkts)
2704         self.pg_enable_capture(self.pg_interfaces)
2705         self.pg_start()
2706         capture = self.pg8.get_capture(len(pkts))
2707         self.verify_capture_out(capture)
2708
2709         # out2in
2710         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711         self.pg8.add_stream(pkts)
2712         self.pg_enable_capture(self.pg_interfaces)
2713         self.pg_start()
2714         capture = self.pg7.get_capture(len(pkts))
2715         self.verify_capture_in(capture, self.pg7)
2716
2717     def test_static_ipless_interfaces(self):
2718         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2719
2720         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721                                       mactobinary(self.pg7.remote_mac),
2722                                       self.pg7.remote_ip4n,
2723                                       is_static=1)
2724         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725                                       mactobinary(self.pg8.remote_mac),
2726                                       self.pg8.remote_ip4n,
2727                                       is_static=1)
2728
2729         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730                                    dst_address_length=32,
2731                                    next_hop_address=self.pg7.remote_ip4n,
2732                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2733         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734                                    dst_address_length=32,
2735                                    next_hop_address=self.pg8.remote_ip4n,
2736                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2737
2738         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2741                                                   is_inside=0)
2742
2743         # out2in
2744         pkts = self.create_stream_out(self.pg8)
2745         self.pg8.add_stream(pkts)
2746         self.pg_enable_capture(self.pg_interfaces)
2747         self.pg_start()
2748         capture = self.pg7.get_capture(len(pkts))
2749         self.verify_capture_in(capture, self.pg7)
2750
2751         # in2out
2752         pkts = self.create_stream_in(self.pg7, self.pg8)
2753         self.pg7.add_stream(pkts)
2754         self.pg_enable_capture(self.pg_interfaces)
2755         self.pg_start()
2756         capture = self.pg8.get_capture(len(pkts))
2757         self.verify_capture_out(capture, self.nat_addr, True)
2758
2759     def test_static_with_port_ipless_interfaces(self):
2760         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2761
2762         self.tcp_port_out = 30606
2763         self.udp_port_out = 30607
2764         self.icmp_id_out = 30608
2765
2766         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767                                       mactobinary(self.pg7.remote_mac),
2768                                       self.pg7.remote_ip4n,
2769                                       is_static=1)
2770         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771                                       mactobinary(self.pg8.remote_mac),
2772                                       self.pg8.remote_ip4n,
2773                                       is_static=1)
2774
2775         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776                                    dst_address_length=32,
2777                                    next_hop_address=self.pg7.remote_ip4n,
2778                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2779         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780                                    dst_address_length=32,
2781                                    next_hop_address=self.pg8.remote_ip4n,
2782                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2783
2784         self.nat44_add_address(self.nat_addr)
2785         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786                                       self.tcp_port_in, self.tcp_port_out,
2787                                       proto=IP_PROTOS.tcp)
2788         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789                                       self.udp_port_in, self.udp_port_out,
2790                                       proto=IP_PROTOS.udp)
2791         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792                                       self.icmp_id_in, self.icmp_id_out,
2793                                       proto=IP_PROTOS.icmp)
2794         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2796                                                   is_inside=0)
2797
2798         # out2in
2799         pkts = self.create_stream_out(self.pg8)
2800         self.pg8.add_stream(pkts)
2801         self.pg_enable_capture(self.pg_interfaces)
2802         self.pg_start()
2803         capture = self.pg7.get_capture(len(pkts))
2804         self.verify_capture_in(capture, self.pg7)
2805
2806         # in2out
2807         pkts = self.create_stream_in(self.pg7, self.pg8)
2808         self.pg7.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         capture = self.pg8.get_capture(len(pkts))
2812         self.verify_capture_out(capture)
2813
2814     def test_static_unknown_proto(self):
2815         """ 1:1 NAT translate packet with unknown protocol """
2816         nat_ip = "10.0.0.10"
2817         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2820                                                   is_inside=0)
2821
2822         # in2out
2823         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2825              GRE() /
2826              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827              TCP(sport=1234, dport=1234))
2828         self.pg0.add_stream(p)
2829         self.pg_enable_capture(self.pg_interfaces)
2830         self.pg_start()
2831         p = self.pg1.get_capture(1)
2832         packet = p[0]
2833         try:
2834             self.assertEqual(packet[IP].src, nat_ip)
2835             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836             self.assertTrue(packet.haslayer(GRE))
2837             self.check_ip_checksum(packet)
2838         except:
2839             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2840             raise
2841
2842         # out2in
2843         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2845              GRE() /
2846              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847              TCP(sport=1234, dport=1234))
2848         self.pg1.add_stream(p)
2849         self.pg_enable_capture(self.pg_interfaces)
2850         self.pg_start()
2851         p = self.pg0.get_capture(1)
2852         packet = p[0]
2853         try:
2854             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856             self.assertTrue(packet.haslayer(GRE))
2857             self.check_ip_checksum(packet)
2858         except:
2859             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2860             raise
2861
2862     def test_hairpinning_static_unknown_proto(self):
2863         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2864
2865         host = self.pg0.remote_hosts[0]
2866         server = self.pg0.remote_hosts[1]
2867
2868         host_nat_ip = "10.0.0.10"
2869         server_nat_ip = "10.0.0.11"
2870
2871         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2875                                                   is_inside=0)
2876
2877         # host to server
2878         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879              IP(src=host.ip4, dst=server_nat_ip) /
2880              GRE() /
2881              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882              TCP(sport=1234, dport=1234))
2883         self.pg0.add_stream(p)
2884         self.pg_enable_capture(self.pg_interfaces)
2885         self.pg_start()
2886         p = self.pg0.get_capture(1)
2887         packet = p[0]
2888         try:
2889             self.assertEqual(packet[IP].src, host_nat_ip)
2890             self.assertEqual(packet[IP].dst, server.ip4)
2891             self.assertTrue(packet.haslayer(GRE))
2892             self.check_ip_checksum(packet)
2893         except:
2894             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2895             raise
2896
2897         # server to host
2898         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899              IP(src=server.ip4, dst=host_nat_ip) /
2900              GRE() /
2901              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902              TCP(sport=1234, dport=1234))
2903         self.pg0.add_stream(p)
2904         self.pg_enable_capture(self.pg_interfaces)
2905         self.pg_start()
2906         p = self.pg0.get_capture(1)
2907         packet = p[0]
2908         try:
2909             self.assertEqual(packet[IP].src, server_nat_ip)
2910             self.assertEqual(packet[IP].dst, host.ip4)
2911             self.assertTrue(packet.haslayer(GRE))
2912             self.check_ip_checksum(packet)
2913         except:
2914             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2915             raise
2916
2917     def test_unknown_proto(self):
2918         """ NAT44 translate packet with unknown protocol """
2919         self.nat44_add_address(self.nat_addr)
2920         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2922                                                   is_inside=0)
2923
2924         # in2out
2925         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927              TCP(sport=self.tcp_port_in, dport=20))
2928         self.pg0.add_stream(p)
2929         self.pg_enable_capture(self.pg_interfaces)
2930         self.pg_start()
2931         p = self.pg1.get_capture(1)
2932
2933         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2935              GRE() /
2936              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937              TCP(sport=1234, dport=1234))
2938         self.pg0.add_stream(p)
2939         self.pg_enable_capture(self.pg_interfaces)
2940         self.pg_start()
2941         p = self.pg1.get_capture(1)
2942         packet = p[0]
2943         try:
2944             self.assertEqual(packet[IP].src, self.nat_addr)
2945             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946             self.assertTrue(packet.haslayer(GRE))
2947             self.check_ip_checksum(packet)
2948         except:
2949             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2950             raise
2951
2952         # out2in
2953         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2955              GRE() /
2956              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957              TCP(sport=1234, dport=1234))
2958         self.pg1.add_stream(p)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         p = self.pg0.get_capture(1)
2962         packet = p[0]
2963         try:
2964             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966             self.assertTrue(packet.haslayer(GRE))
2967             self.check_ip_checksum(packet)
2968         except:
2969             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2970             raise
2971
2972     def test_hairpinning_unknown_proto(self):
2973         """ NAT44 translate packet with unknown protocol - hairpinning """
2974         host = self.pg0.remote_hosts[0]
2975         server = self.pg0.remote_hosts[1]
2976         host_in_port = 1234
2977         host_out_port = 0
2978         server_in_port = 5678
2979         server_out_port = 8765
2980         server_nat_ip = "10.0.0.11"
2981
2982         self.nat44_add_address(self.nat_addr)
2983         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2985                                                   is_inside=0)
2986
2987         # add static mapping for server
2988         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2989
2990         # host to server
2991         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992              IP(src=host.ip4, dst=server_nat_ip) /
2993              TCP(sport=host_in_port, dport=server_out_port))
2994         self.pg0.add_stream(p)
2995         self.pg_enable_capture(self.pg_interfaces)
2996         self.pg_start()
2997         capture = self.pg0.get_capture(1)
2998
2999         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000              IP(src=host.ip4, dst=server_nat_ip) /
3001              GRE() /
3002              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003              TCP(sport=1234, dport=1234))
3004         self.pg0.add_stream(p)
3005         self.pg_enable_capture(self.pg_interfaces)
3006         self.pg_start()
3007         p = self.pg0.get_capture(1)
3008         packet = p[0]
3009         try:
3010             self.assertEqual(packet[IP].src, self.nat_addr)
3011             self.assertEqual(packet[IP].dst, server.ip4)
3012             self.assertTrue(packet.haslayer(GRE))
3013             self.check_ip_checksum(packet)
3014         except:
3015             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3016             raise
3017
3018         # server to host
3019         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020              IP(src=server.ip4, dst=self.nat_addr) /
3021              GRE() /
3022              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023              TCP(sport=1234, dport=1234))
3024         self.pg0.add_stream(p)
3025         self.pg_enable_capture(self.pg_interfaces)
3026         self.pg_start()
3027         p = self.pg0.get_capture(1)
3028         packet = p[0]
3029         try:
3030             self.assertEqual(packet[IP].src, server_nat_ip)
3031             self.assertEqual(packet[IP].dst, host.ip4)
3032             self.assertTrue(packet.haslayer(GRE))
3033             self.check_ip_checksum(packet)
3034         except:
3035             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3036             raise
3037
3038     def test_output_feature(self):
3039         """ NAT44 interface output feature (in2out postrouting) """
3040         self.nat44_add_address(self.nat_addr)
3041         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3044                                                          is_inside=0)
3045
3046         # in2out
3047         pkts = self.create_stream_in(self.pg0, self.pg3)
3048         self.pg0.add_stream(pkts)
3049         self.pg_enable_capture(self.pg_interfaces)
3050         self.pg_start()
3051         capture = self.pg3.get_capture(len(pkts))
3052         self.verify_capture_out(capture)
3053
3054         # out2in
3055         pkts = self.create_stream_out(self.pg3)
3056         self.pg3.add_stream(pkts)
3057         self.pg_enable_capture(self.pg_interfaces)
3058         self.pg_start()
3059         capture = self.pg0.get_capture(len(pkts))
3060         self.verify_capture_in(capture, self.pg0)
3061
3062         # from non-NAT interface to NAT inside interface
3063         pkts = self.create_stream_in(self.pg2, self.pg0)
3064         self.pg2.add_stream(pkts)
3065         self.pg_enable_capture(self.pg_interfaces)
3066         self.pg_start()
3067         capture = self.pg0.get_capture(len(pkts))
3068         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3069
3070     def test_output_feature_vrf_aware(self):
3071         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072         nat_ip_vrf10 = "10.0.0.10"
3073         nat_ip_vrf20 = "10.0.0.20"
3074
3075         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076                                    dst_address_length=32,
3077                                    next_hop_address=self.pg3.remote_ip4n,
3078                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3079                                    table_id=10)
3080         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081                                    dst_address_length=32,
3082                                    next_hop_address=self.pg3.remote_ip4n,
3083                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3084                                    table_id=20)
3085
3086         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3091                                                          is_inside=0)
3092
3093         # in2out VRF 10
3094         pkts = self.create_stream_in(self.pg4, self.pg3)
3095         self.pg4.add_stream(pkts)
3096         self.pg_enable_capture(self.pg_interfaces)
3097         self.pg_start()
3098         capture = self.pg3.get_capture(len(pkts))
3099         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3100
3101         # out2in VRF 10
3102         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103         self.pg3.add_stream(pkts)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg4.get_capture(len(pkts))
3107         self.verify_capture_in(capture, self.pg4)
3108
3109         # in2out VRF 20
3110         pkts = self.create_stream_in(self.pg6, self.pg3)
3111         self.pg6.add_stream(pkts)
3112         self.pg_enable_capture(self.pg_interfaces)
3113         self.pg_start()
3114         capture = self.pg3.get_capture(len(pkts))
3115         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3116
3117         # out2in VRF 20
3118         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119         self.pg3.add_stream(pkts)
3120         self.pg_enable_capture(self.pg_interfaces)
3121         self.pg_start()
3122         capture = self.pg6.get_capture(len(pkts))
3123         self.verify_capture_in(capture, self.pg6)
3124
3125     def test_output_feature_hairpinning(self):
3126         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127         host = self.pg0.remote_hosts[0]
3128         server = self.pg0.remote_hosts[1]
3129         host_in_port = 1234
3130         host_out_port = 0
3131         server_in_port = 5678
3132         server_out_port = 8765
3133
3134         self.nat44_add_address(self.nat_addr)
3135         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3137                                                          is_inside=0)
3138
3139         # add static mapping for server
3140         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141                                       server_in_port, server_out_port,
3142                                       proto=IP_PROTOS.tcp)
3143
3144         # send packet from host to server
3145         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146              IP(src=host.ip4, dst=self.nat_addr) /
3147              TCP(sport=host_in_port, dport=server_out_port))
3148         self.pg0.add_stream(p)
3149         self.pg_enable_capture(self.pg_interfaces)
3150         self.pg_start()
3151         capture = self.pg0.get_capture(1)
3152         p = capture[0]
3153         try:
3154             ip = p[IP]
3155             tcp = p[TCP]
3156             self.assertEqual(ip.src, self.nat_addr)
3157             self.assertEqual(ip.dst, server.ip4)
3158             self.assertNotEqual(tcp.sport, host_in_port)
3159             self.assertEqual(tcp.dport, server_in_port)
3160             self.check_tcp_checksum(p)
3161             host_out_port = tcp.sport
3162         except:
3163             self.logger.error(ppp("Unexpected or invalid packet:", p))
3164             raise
3165
3166         # send reply from server to host
3167         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168              IP(src=server.ip4, dst=self.nat_addr) /
3169              TCP(sport=server_in_port, dport=host_out_port))
3170         self.pg0.add_stream(p)
3171         self.pg_enable_capture(self.pg_interfaces)
3172         self.pg_start()
3173         capture = self.pg0.get_capture(1)
3174         p = capture[0]
3175         try:
3176             ip = p[IP]
3177             tcp = p[TCP]
3178             self.assertEqual(ip.src, self.nat_addr)
3179             self.assertEqual(ip.dst, host.ip4)
3180             self.assertEqual(tcp.sport, server_out_port)
3181             self.assertEqual(tcp.dport, host_in_port)
3182             self.check_tcp_checksum(p)
3183         except:
3184             self.logger.error(ppp("Unexpected or invalid packet:", p))
3185             raise
3186
3187     def test_one_armed_nat44(self):
3188         """ One armed NAT44 """
3189         remote_host = self.pg9.remote_hosts[0]
3190         local_host = self.pg9.remote_hosts[1]
3191         external_port = 0
3192
3193         self.nat44_add_address(self.nat_addr)
3194         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3196                                                   is_inside=0)
3197
3198         # in2out
3199         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200              IP(src=local_host.ip4, dst=remote_host.ip4) /
3201              TCP(sport=12345, dport=80))
3202         self.pg9.add_stream(p)
3203         self.pg_enable_capture(self.pg_interfaces)
3204         self.pg_start()
3205         capture = self.pg9.get_capture(1)
3206         p = capture[0]
3207         try:
3208             ip = p[IP]
3209             tcp = p[TCP]
3210             self.assertEqual(ip.src, self.nat_addr)
3211             self.assertEqual(ip.dst, remote_host.ip4)
3212             self.assertNotEqual(tcp.sport, 12345)
3213             external_port = tcp.sport
3214             self.assertEqual(tcp.dport, 80)
3215             self.check_tcp_checksum(p)
3216             self.check_ip_checksum(p)
3217         except:
3218             self.logger.error(ppp("Unexpected or invalid packet:", p))
3219             raise
3220
3221         # out2in
3222         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223              IP(src=remote_host.ip4, dst=self.nat_addr) /
3224              TCP(sport=80, dport=external_port))
3225         self.pg9.add_stream(p)
3226         self.pg_enable_capture(self.pg_interfaces)
3227         self.pg_start()
3228         capture = self.pg9.get_capture(1)
3229         p = capture[0]
3230         try:
3231             ip = p[IP]
3232             tcp = p[TCP]
3233             self.assertEqual(ip.src, remote_host.ip4)
3234             self.assertEqual(ip.dst, local_host.ip4)
3235             self.assertEqual(tcp.sport, 80)
3236             self.assertEqual(tcp.dport, 12345)
3237             self.check_tcp_checksum(p)
3238             self.check_ip_checksum(p)
3239         except:
3240             self.logger.error(ppp("Unexpected or invalid packet:", p))
3241             raise
3242
3243     def test_del_session(self):
3244         """ Delete NAT44 session """
3245         self.nat44_add_address(self.nat_addr)
3246         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3247         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3248                                                   is_inside=0)
3249
3250         pkts = self.create_stream_in(self.pg0, self.pg1)
3251         self.pg0.add_stream(pkts)
3252         self.pg_enable_capture(self.pg_interfaces)
3253         self.pg_start()
3254         capture = self.pg1.get_capture(len(pkts))
3255
3256         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3257         nsessions = len(sessions)
3258
3259         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3260                                     sessions[0].inside_port,
3261                                     sessions[0].protocol)
3262         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3263                                     sessions[1].outside_port,
3264                                     sessions[1].protocol,
3265                                     is_in=0)
3266
3267         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3268         self.assertEqual(nsessions - len(sessions), 2)
3269
3270     def test_set_get_reass(self):
3271         """ NAT44 set/get virtual fragmentation reassembly """
3272         reas_cfg1 = self.vapi.nat_get_reass()
3273
3274         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3275                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3276                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3277
3278         reas_cfg2 = self.vapi.nat_get_reass()
3279
3280         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3281         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3282         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3283
3284         self.vapi.nat_set_reass(drop_frag=1)
3285         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3286
3287     def test_frag_in_order(self):
3288         """ NAT44 translate fragments arriving in order """
3289         self.nat44_add_address(self.nat_addr)
3290         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3291         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3292                                                   is_inside=0)
3293
3294         data = "A" * 4 + "B" * 16 + "C" * 3
3295         self.tcp_port_in = random.randint(1025, 65535)
3296
3297         reass = self.vapi.nat_reass_dump()
3298         reass_n_start = len(reass)
3299
3300         # in2out
3301         pkts = self.create_stream_frag(self.pg0,
3302                                        self.pg1.remote_ip4,
3303                                        self.tcp_port_in,
3304                                        20,
3305                                        data)
3306         self.pg0.add_stream(pkts)
3307         self.pg_enable_capture(self.pg_interfaces)
3308         self.pg_start()
3309         frags = self.pg1.get_capture(len(pkts))
3310         p = self.reass_frags_and_verify(frags,
3311                                         self.nat_addr,
3312                                         self.pg1.remote_ip4)
3313         self.assertEqual(p[TCP].dport, 20)
3314         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3315         self.tcp_port_out = p[TCP].sport
3316         self.assertEqual(data, p[Raw].load)
3317
3318         # out2in
3319         pkts = self.create_stream_frag(self.pg1,
3320                                        self.nat_addr,
3321                                        20,
3322                                        self.tcp_port_out,
3323                                        data)
3324         self.pg1.add_stream(pkts)
3325         self.pg_enable_capture(self.pg_interfaces)
3326         self.pg_start()
3327         frags = self.pg0.get_capture(len(pkts))
3328         p = self.reass_frags_and_verify(frags,
3329                                         self.pg1.remote_ip4,
3330                                         self.pg0.remote_ip4)
3331         self.assertEqual(p[TCP].sport, 20)
3332         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3333         self.assertEqual(data, p[Raw].load)
3334
3335         reass = self.vapi.nat_reass_dump()
3336         reass_n_end = len(reass)
3337
3338         self.assertEqual(reass_n_end - reass_n_start, 2)
3339
3340     def test_reass_hairpinning(self):
3341         """ NAT44 fragments hairpinning """
3342         host = self.pg0.remote_hosts[0]
3343         server = self.pg0.remote_hosts[1]
3344         host_in_port = random.randint(1025, 65535)
3345         host_out_port = 0
3346         server_in_port = random.randint(1025, 65535)
3347         server_out_port = random.randint(1025, 65535)
3348         data = "A" * 4 + "B" * 16 + "C" * 3
3349
3350         self.nat44_add_address(self.nat_addr)
3351         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3352         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3353                                                   is_inside=0)
3354         # add static mapping for server
3355         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3356                                       server_in_port, server_out_port,
3357                                       proto=IP_PROTOS.tcp)
3358
3359         # send packet from host to server
3360         pkts = self.create_stream_frag(self.pg0,
3361                                        self.nat_addr,
3362                                        host_in_port,
3363                                        server_out_port,
3364                                        data)
3365         self.pg0.add_stream(pkts)
3366         self.pg_enable_capture(self.pg_interfaces)
3367         self.pg_start()
3368         frags = self.pg0.get_capture(len(pkts))
3369         p = self.reass_frags_and_verify(frags,
3370                                         self.nat_addr,
3371                                         server.ip4)
3372         self.assertNotEqual(p[TCP].sport, host_in_port)
3373         self.assertEqual(p[TCP].dport, server_in_port)
3374         self.assertEqual(data, p[Raw].load)
3375
3376     def test_frag_out_of_order(self):
3377         """ NAT44 translate fragments arriving out of order """
3378         self.nat44_add_address(self.nat_addr)
3379         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3380         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3381                                                   is_inside=0)
3382
3383         data = "A" * 4 + "B" * 16 + "C" * 3
3384         random.randint(1025, 65535)
3385
3386         # in2out
3387         pkts = self.create_stream_frag(self.pg0,
3388                                        self.pg1.remote_ip4,
3389                                        self.tcp_port_in,
3390                                        20,
3391                                        data)
3392         pkts.reverse()
3393         self.pg0.add_stream(pkts)
3394         self.pg_enable_capture(self.pg_interfaces)
3395         self.pg_start()
3396         frags = self.pg1.get_capture(len(pkts))
3397         p = self.reass_frags_and_verify(frags,
3398                                         self.nat_addr,
3399                                         self.pg1.remote_ip4)
3400         self.assertEqual(p[TCP].dport, 20)
3401         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3402         self.tcp_port_out = p[TCP].sport
3403         self.assertEqual(data, p[Raw].load)
3404
3405         # out2in
3406         pkts = self.create_stream_frag(self.pg1,
3407                                        self.nat_addr,
3408                                        20,
3409                                        self.tcp_port_out,
3410                                        data)
3411         pkts.reverse()
3412         self.pg1.add_stream(pkts)
3413         self.pg_enable_capture(self.pg_interfaces)
3414         self.pg_start()
3415         frags = self.pg0.get_capture(len(pkts))
3416         p = self.reass_frags_and_verify(frags,
3417                                         self.pg1.remote_ip4,
3418                                         self.pg0.remote_ip4)
3419         self.assertEqual(p[TCP].sport, 20)
3420         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3421         self.assertEqual(data, p[Raw].load)
3422
3423     def test_port_restricted(self):
3424         """ Port restricted NAT44 (MAP-E CE) """
3425         self.nat44_add_address(self.nat_addr)
3426         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3427         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3428                                                   is_inside=0)
3429         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3430                       "psid-offset 6 psid-len 6")
3431
3432         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3433              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3434              TCP(sport=4567, dport=22))
3435         self.pg0.add_stream(p)
3436         self.pg_enable_capture(self.pg_interfaces)
3437         self.pg_start()
3438         capture = self.pg1.get_capture(1)
3439         p = capture[0]
3440         try:
3441             ip = p[IP]
3442             tcp = p[TCP]
3443             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3444             self.assertEqual(ip.src, self.nat_addr)
3445             self.assertEqual(tcp.dport, 22)
3446             self.assertNotEqual(tcp.sport, 4567)
3447             self.assertEqual((tcp.sport >> 6) & 63, 10)
3448             self.check_tcp_checksum(p)
3449             self.check_ip_checksum(p)
3450         except:
3451             self.logger.error(ppp("Unexpected or invalid packet:", p))
3452             raise
3453
3454     def test_twice_nat(self):
3455         """ Twice NAT44 """
3456         twice_nat_addr = '10.0.1.3'
3457         port_in = 8080
3458         port_out = 80
3459         eh_port_out = 4567
3460         eh_port_in = 0
3461         self.nat44_add_address(self.nat_addr)
3462         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3463         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3464                                       port_in, port_out, proto=IP_PROTOS.tcp,
3465                                       twice_nat=1)
3466         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3467         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3468                                                   is_inside=0)
3469
3470         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3471              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3472              TCP(sport=eh_port_out, dport=port_out))
3473         self.pg1.add_stream(p)
3474         self.pg_enable_capture(self.pg_interfaces)
3475         self.pg_start()
3476         capture = self.pg0.get_capture(1)
3477         p = capture[0]
3478         try:
3479             ip = p[IP]
3480             tcp = p[TCP]
3481             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3482             self.assertEqual(ip.src, twice_nat_addr)
3483             self.assertEqual(tcp.dport, port_in)
3484             self.assertNotEqual(tcp.sport, eh_port_out)
3485             eh_port_in = tcp.sport
3486             self.check_tcp_checksum(p)
3487             self.check_ip_checksum(p)
3488         except:
3489             self.logger.error(ppp("Unexpected or invalid packet:", p))
3490             raise
3491
3492         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3493              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3494              TCP(sport=port_in, dport=eh_port_in))
3495         self.pg0.add_stream(p)
3496         self.pg_enable_capture(self.pg_interfaces)
3497         self.pg_start()
3498         capture = self.pg1.get_capture(1)
3499         p = capture[0]
3500         try:
3501             ip = p[IP]
3502             tcp = p[TCP]
3503             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3504             self.assertEqual(ip.src, self.nat_addr)
3505             self.assertEqual(tcp.dport, eh_port_out)
3506             self.assertEqual(tcp.sport, port_out)
3507             self.check_tcp_checksum(p)
3508             self.check_ip_checksum(p)
3509         except:
3510             self.logger.error(ppp("Unexpected or invalid packet:", p))
3511             raise
3512
3513     def test_twice_nat_lb(self):
3514         """ Twice NAT44 local service load balancing """
3515         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3516         twice_nat_addr = '10.0.1.3'
3517         local_port = 8080
3518         external_port = 80
3519         eh_port_out = 4567
3520         eh_port_in = 0
3521         server1 = self.pg0.remote_hosts[0]
3522         server2 = self.pg0.remote_hosts[1]
3523
3524         locals = [{'addr': server1.ip4n,
3525                    'port': local_port,
3526                    'probability': 50},
3527                   {'addr': server2.ip4n,
3528                    'port': local_port,
3529                    'probability': 50}]
3530
3531         self.nat44_add_address(self.nat_addr)
3532         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3533
3534         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3535                                                   external_port,
3536                                                   IP_PROTOS.tcp,
3537                                                   twice_nat=1,
3538                                                   local_num=len(locals),
3539                                                   locals=locals)
3540         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3541         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3542                                                   is_inside=0)
3543
3544         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3545              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3546              TCP(sport=eh_port_out, dport=external_port))
3547         self.pg1.add_stream(p)
3548         self.pg_enable_capture(self.pg_interfaces)
3549         self.pg_start()
3550         capture = self.pg0.get_capture(1)
3551         p = capture[0]
3552         server = None
3553         try:
3554             ip = p[IP]
3555             tcp = p[TCP]
3556             self.assertEqual(ip.src, twice_nat_addr)
3557             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3558             if ip.dst == server1.ip4:
3559                 server = server1
3560             else:
3561                 server = server2
3562             self.assertNotEqual(tcp.sport, eh_port_out)
3563             eh_port_in = tcp.sport
3564             self.assertEqual(tcp.dport, local_port)
3565             self.check_tcp_checksum(p)
3566             self.check_ip_checksum(p)
3567         except:
3568             self.logger.error(ppp("Unexpected or invalid packet:", p))
3569             raise
3570
3571         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3572              IP(src=server.ip4, dst=twice_nat_addr) /
3573              TCP(sport=local_port, dport=eh_port_in))
3574         self.pg0.add_stream(p)
3575         self.pg_enable_capture(self.pg_interfaces)
3576         self.pg_start()
3577         capture = self.pg1.get_capture(1)
3578         p = capture[0]
3579         try:
3580             ip = p[IP]
3581             tcp = p[TCP]
3582             self.assertEqual(ip.src, self.nat_addr)
3583             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3584             self.assertEqual(tcp.sport, external_port)
3585             self.assertEqual(tcp.dport, eh_port_out)
3586             self.check_tcp_checksum(p)
3587             self.check_ip_checksum(p)
3588         except:
3589             self.logger.error(ppp("Unexpected or invalid packet:", p))
3590             raise
3591
3592     def test_twice_nat_interface_addr(self):
3593         """ Acquire twice NAT44 addresses from interface """
3594         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3595
3596         # no address in NAT pool
3597         adresses = self.vapi.nat44_address_dump()
3598         self.assertEqual(0, len(adresses))
3599
3600         # configure interface address and check NAT address pool
3601         self.pg7.config_ip4()
3602         adresses = self.vapi.nat44_address_dump()
3603         self.assertEqual(1, len(adresses))
3604         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3605         self.assertEqual(adresses[0].twice_nat, 1)
3606
3607         # remove interface address and check NAT address pool
3608         self.pg7.unconfig_ip4()
3609         adresses = self.vapi.nat44_address_dump()
3610         self.assertEqual(0, len(adresses))
3611
3612     def test_ipfix_max_frags(self):
3613         """ IPFIX logging maximum fragments pending reassembly exceeded """
3614         self.nat44_add_address(self.nat_addr)
3615         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3616         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3617                                                   is_inside=0)
3618         self.vapi.nat_set_reass(max_frag=0)
3619         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3620                                      src_address=self.pg3.local_ip4n,
3621                                      path_mtu=512,
3622                                      template_interval=10)
3623         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3624                             src_port=self.ipfix_src_port)
3625
3626         data = "A" * 4 + "B" * 16 + "C" * 3
3627         self.tcp_port_in = random.randint(1025, 65535)
3628         pkts = self.create_stream_frag(self.pg0,
3629                                        self.pg1.remote_ip4,
3630                                        self.tcp_port_in,
3631                                        20,
3632                                        data)
3633         self.pg0.add_stream(pkts[-1])
3634         self.pg_enable_capture(self.pg_interfaces)
3635         self.pg_start()
3636         frags = self.pg1.get_capture(0)
3637         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3638         capture = self.pg3.get_capture(9)
3639         ipfix = IPFIXDecoder()
3640         # first load template
3641         for p in capture:
3642             self.assertTrue(p.haslayer(IPFIX))
3643             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3644             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3645             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3646             self.assertEqual(p[UDP].dport, 4739)
3647             self.assertEqual(p[IPFIX].observationDomainID,
3648                              self.ipfix_domain_id)
3649             if p.haslayer(Template):
3650                 ipfix.add_template(p.getlayer(Template))
3651         # verify events in data set
3652         for p in capture:
3653             if p.haslayer(Data):
3654                 data = ipfix.decode_data_set(p.getlayer(Set))
3655                 self.verify_ipfix_max_fragments_ip4(data, 0,
3656                                                     self.pg0.remote_ip4n)
3657
3658     def tearDown(self):
3659         super(TestNAT44, self).tearDown()
3660         if not self.vpp_dead:
3661             self.logger.info(self.vapi.cli("show nat44 verbose"))
3662             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3663             self.vapi.cli("nat addr-port-assignment-alg default")
3664             self.clear_nat44()
3665
3666
3667 class TestNAT44Out2InDPO(MethodHolder):
3668     """ NAT44 Test Cases using out2in DPO """
3669
3670     @classmethod
3671     def setUpConstants(cls):
3672         super(TestNAT44Out2InDPO, cls).setUpConstants()
3673         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3674
3675     @classmethod
3676     def setUpClass(cls):
3677         super(TestNAT44Out2InDPO, cls).setUpClass()
3678
3679         try:
3680             cls.tcp_port_in = 6303
3681             cls.tcp_port_out = 6303
3682             cls.udp_port_in = 6304
3683             cls.udp_port_out = 6304
3684             cls.icmp_id_in = 6305
3685             cls.icmp_id_out = 6305
3686             cls.nat_addr = '10.0.0.3'
3687             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3688             cls.dst_ip4 = '192.168.70.1'
3689
3690             cls.create_pg_interfaces(range(2))
3691
3692             cls.pg0.admin_up()
3693             cls.pg0.config_ip4()
3694             cls.pg0.resolve_arp()
3695
3696             cls.pg1.admin_up()
3697             cls.pg1.config_ip6()
3698             cls.pg1.resolve_ndp()
3699
3700             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3701                                       dst_address_length=0,
3702                                       next_hop_address=cls.pg1.remote_ip6n,
3703                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3704
3705         except Exception:
3706             super(TestNAT44Out2InDPO, cls).tearDownClass()
3707             raise
3708
3709     def configure_xlat(self):
3710         self.dst_ip6_pfx = '1:2:3::'
3711         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3712                                               self.dst_ip6_pfx)
3713         self.dst_ip6_pfx_len = 96
3714         self.src_ip6_pfx = '4:5:6::'
3715         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3716                                               self.src_ip6_pfx)
3717         self.src_ip6_pfx_len = 96
3718         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3719                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3720                                  '\x00\x00\x00\x00', 0, is_translation=1,
3721                                  is_rfc6052=1)
3722
3723     def test_464xlat_ce(self):
3724         """ Test 464XLAT CE with NAT44 """
3725
3726         self.configure_xlat()
3727
3728         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3729         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3730
3731         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3732                                        self.dst_ip6_pfx_len)
3733         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3734                                        self.src_ip6_pfx_len)
3735
3736         try:
3737             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3738             self.pg0.add_stream(pkts)
3739             self.pg_enable_capture(self.pg_interfaces)
3740             self.pg_start()
3741             capture = self.pg1.get_capture(len(pkts))
3742             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3743                                         dst_ip=out_src_ip6)
3744
3745             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3746                                               out_dst_ip6)
3747             self.pg1.add_stream(pkts)
3748             self.pg_enable_capture(self.pg_interfaces)
3749             self.pg_start()
3750             capture = self.pg0.get_capture(len(pkts))
3751             self.verify_capture_in(capture, self.pg0)
3752         finally:
3753             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3754                                                       is_add=0)
3755             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3756                                                   self.nat_addr_n, is_add=0)
3757
3758     def test_464xlat_ce_no_nat(self):
3759         """ Test 464XLAT CE without NAT44 """
3760
3761         self.configure_xlat()
3762
3763         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3764                                        self.dst_ip6_pfx_len)
3765         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3766                                        self.src_ip6_pfx_len)
3767
3768         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3769         self.pg0.add_stream(pkts)
3770         self.pg_enable_capture(self.pg_interfaces)
3771         self.pg_start()
3772         capture = self.pg1.get_capture(len(pkts))
3773         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3774                                     nat_ip=out_dst_ip6, same_port=True)
3775
3776         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3777         self.pg1.add_stream(pkts)
3778         self.pg_enable_capture(self.pg_interfaces)
3779         self.pg_start()
3780         capture = self.pg0.get_capture(len(pkts))
3781         self.verify_capture_in(capture, self.pg0)
3782
3783
3784 class TestDeterministicNAT(MethodHolder):
3785     """ Deterministic NAT Test Cases """
3786
3787     @classmethod
3788     def setUpConstants(cls):
3789         super(TestDeterministicNAT, cls).setUpConstants()
3790         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3791
3792     @classmethod
3793     def setUpClass(cls):
3794         super(TestDeterministicNAT, cls).setUpClass()
3795
3796         try:
3797             cls.tcp_port_in = 6303
3798             cls.tcp_external_port = 6303
3799             cls.udp_port_in = 6304
3800             cls.udp_external_port = 6304
3801             cls.icmp_id_in = 6305
3802             cls.nat_addr = '10.0.0.3'
3803
3804             cls.create_pg_interfaces(range(3))
3805             cls.interfaces = list(cls.pg_interfaces)
3806
3807             for i in cls.interfaces:
3808                 i.admin_up()
3809                 i.config_ip4()
3810                 i.resolve_arp()
3811
3812             cls.pg0.generate_remote_hosts(2)
3813             cls.pg0.configure_ipv4_neighbors()
3814
3815         except Exception:
3816             super(TestDeterministicNAT, cls).tearDownClass()
3817             raise
3818
3819     def create_stream_in(self, in_if, out_if, ttl=64):
3820         """
3821         Create packet stream for inside network
3822
3823         :param in_if: Inside interface
3824         :param out_if: Outside interface
3825         :param ttl: TTL of generated packets
3826         """
3827         pkts = []
3828         # TCP
3829         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3830              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3831              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3832         pkts.append(p)
3833
3834         # UDP
3835         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3836              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3837              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3838         pkts.append(p)
3839
3840         # ICMP
3841         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3842              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3843              ICMP(id=self.icmp_id_in, type='echo-request'))
3844         pkts.append(p)
3845
3846         return pkts
3847
3848     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3849         """
3850         Create packet stream for outside network
3851
3852         :param out_if: Outside interface
3853         :param dst_ip: Destination IP address (Default use global NAT address)
3854         :param ttl: TTL of generated packets
3855         """
3856         if dst_ip is None:
3857             dst_ip = self.nat_addr
3858         pkts = []
3859         # TCP
3860         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3861              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3862              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3863         pkts.append(p)
3864
3865         # UDP
3866         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3867              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3868              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3869         pkts.append(p)
3870
3871         # ICMP
3872         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3873              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3874              ICMP(id=self.icmp_external_id, type='echo-reply'))
3875         pkts.append(p)
3876
3877         return pkts
3878
3879     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3880         """
3881         Verify captured packets on outside network
3882
3883         :param capture: Captured packets
3884         :param nat_ip: Translated IP address (Default use global NAT address)
3885         :param same_port: Sorce port number is not translated (Default False)
3886         :param packet_num: Expected number of packets (Default 3)
3887         """
3888         if nat_ip is None:
3889             nat_ip = self.nat_addr
3890         self.assertEqual(packet_num, len(capture))
3891         for packet in capture:
3892             try:
3893                 self.assertEqual(packet[IP].src, nat_ip)
3894                 if packet.haslayer(TCP):
3895                     self.tcp_port_out = packet[TCP].sport
3896                 elif packet.haslayer(UDP):
3897                     self.udp_port_out = packet[UDP].sport
3898                 else:
3899                     self.icmp_external_id = packet[ICMP].id
3900             except:
3901                 self.logger.error(ppp("Unexpected or invalid packet "
3902                                       "(outside network):", packet))
3903                 raise
3904
3905     def initiate_tcp_session(self, in_if, out_if):
3906         """
3907         Initiates TCP session
3908
3909         :param in_if: Inside interface
3910         :param out_if: Outside interface
3911         """
3912         try:
3913             # SYN packet in->out
3914             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3915                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3916                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3917                      flags="S"))
3918             in_if.add_stream(p)
3919             self.pg_enable_capture(self.pg_interfaces)
3920             self.pg_start()
3921             capture = out_if.get_capture(1)
3922             p = capture[0]
3923             self.tcp_port_out = p[TCP].sport
3924
3925             # SYN + ACK packet out->in
3926             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3927                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3928                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3929                      flags="SA"))
3930             out_if.add_stream(p)
3931             self.pg_enable_capture(self.pg_interfaces)
3932             self.pg_start()
3933             in_if.get_capture(1)
3934
3935             # ACK packet in->out
3936             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3937                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3938                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3939                      flags="A"))
3940             in_if.add_stream(p)
3941             self.pg_enable_capture(self.pg_interfaces)
3942             self.pg_start()
3943             out_if.get_capture(1)
3944
3945         except:
3946             self.logger.error("TCP 3 way handshake failed")
3947             raise
3948
3949     def verify_ipfix_max_entries_per_user(self, data):
3950         """
3951         Verify IPFIX maximum entries per user exceeded event
3952
3953         :param data: Decoded IPFIX data records
3954         """
3955         self.assertEqual(1, len(data))
3956         record = data[0]
3957         # natEvent
3958         self.assertEqual(ord(record[230]), 13)
3959         # natQuotaExceededEvent
3960         self.assertEqual('\x03\x00\x00\x00', record[466])
3961         # maxEntriesPerUser
3962         self.assertEqual('\xe8\x03\x00\x00', record[473])
3963         # sourceIPv4Address
3964         self.assertEqual(self.pg0.remote_ip4n, record[8])
3965
3966     def test_deterministic_mode(self):
3967         """ NAT plugin run deterministic mode """
3968         in_addr = '172.16.255.0'
3969         out_addr = '172.17.255.50'
3970         in_addr_t = '172.16.255.20'
3971         in_addr_n = socket.inet_aton(in_addr)
3972         out_addr_n = socket.inet_aton(out_addr)
3973         in_addr_t_n = socket.inet_aton(in_addr_t)
3974         in_plen = 24
3975         out_plen = 32
3976
3977         nat_config = self.vapi.nat_show_config()
3978         self.assertEqual(1, nat_config.deterministic)
3979
3980         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3981
3982         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3983         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3984         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3985         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3986
3987         deterministic_mappings = self.vapi.nat_det_map_dump()
3988         self.assertEqual(len(deterministic_mappings), 1)
3989         dsm = deterministic_mappings[0]
3990         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3991         self.assertEqual(in_plen, dsm.in_plen)
3992         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3993         self.assertEqual(out_plen, dsm.out_plen)
3994
3995         self.clear_nat_det()
3996         deterministic_mappings = self.vapi.nat_det_map_dump()
3997         self.assertEqual(len(deterministic_mappings), 0)
3998
3999     def test_set_timeouts(self):
4000         """ Set deterministic NAT timeouts """
4001         timeouts_before = self.vapi.nat_det_get_timeouts()
4002
4003         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4004                                        timeouts_before.tcp_established + 10,
4005                                        timeouts_before.tcp_transitory + 10,
4006                                        timeouts_before.icmp + 10)
4007
4008         timeouts_after = self.vapi.nat_det_get_timeouts()
4009
4010         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4011         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4012         self.assertNotEqual(timeouts_before.tcp_established,
4013                             timeouts_after.tcp_established)
4014         self.assertNotEqual(timeouts_before.tcp_transitory,
4015                             timeouts_after.tcp_transitory)
4016
4017     def test_det_in(self):
4018         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4019
4020         nat_ip = "10.0.0.10"
4021
4022         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4023                                       32,
4024                                       socket.inet_aton(nat_ip),
4025                                       32)
4026         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4027         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4028                                                   is_inside=0)
4029
4030         # in2out
4031         pkts = self.create_stream_in(self.pg0, self.pg1)
4032         self.pg0.add_stream(pkts)
4033         self.pg_enable_capture(self.pg_interfaces)
4034         self.pg_start()
4035         capture = self.pg1.get_capture(len(pkts))
4036         self.verify_capture_out(capture, nat_ip)
4037
4038         # out2in
4039         pkts = self.create_stream_out(self.pg1, nat_ip)
4040         self.pg1.add_stream(pkts)
4041         self.pg_enable_capture(self.pg_interfaces)
4042         self.pg_start()
4043         capture = self.pg0.get_capture(len(pkts))
4044         self.verify_capture_in(capture, self.pg0)
4045
4046         # session dump test
4047         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4048         self.assertEqual(len(sessions), 3)
4049
4050         # TCP session
4051         s = sessions[0]
4052         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4053         self.assertEqual(s.in_port, self.tcp_port_in)
4054         self.assertEqual(s.out_port, self.tcp_port_out)
4055         self.assertEqual(s.ext_port, self.tcp_external_port)
4056
4057         # UDP session
4058         s = sessions[1]
4059         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4060         self.assertEqual(s.in_port, self.udp_port_in)
4061         self.assertEqual(s.out_port, self.udp_port_out)
4062         self.assertEqual(s.ext_port, self.udp_external_port)
4063
4064         # ICMP session
4065         s = sessions[2]
4066         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4067         self.assertEqual(s.in_port, self.icmp_id_in)
4068         self.assertEqual(s.out_port, self.icmp_external_id)
4069
4070     def test_multiple_users(self):
4071         """ Deterministic NAT multiple users """
4072
4073         nat_ip = "10.0.0.10"
4074         port_in = 80
4075         external_port = 6303
4076
4077         host0 = self.pg0.remote_hosts[0]
4078         host1 = self.pg0.remote_hosts[1]
4079
4080         self.vapi.nat_det_add_del_map(host0.ip4n,
4081                                       24,
4082                                       socket.inet_aton(nat_ip),
4083                                       32)
4084         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4085         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4086                                                   is_inside=0)
4087
4088         # host0 to out
4089         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4090              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4091              TCP(sport=port_in, dport=external_port))
4092         self.pg0.add_stream(p)
4093         self.pg_enable_capture(self.pg_interfaces)
4094         self.pg_start()
4095         capture = self.pg1.get_capture(1)
4096         p = capture[0]
4097         try:
4098             ip = p[IP]
4099             tcp = p[TCP]
4100             self.assertEqual(ip.src, nat_ip)
4101             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4102             self.assertEqual(tcp.dport, external_port)
4103             port_out0 = tcp.sport
4104         except:
4105             self.logger.error(ppp("Unexpected or invalid packet:", p))
4106             raise
4107
4108         # host1 to out
4109         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4110              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4111              TCP(sport=port_in, dport=external_port))
4112         self.pg0.add_stream(p)
4113         self.pg_enable_capture(self.pg_interfaces)
4114         self.pg_start()
4115         capture = self.pg1.get_capture(1)
4116         p = capture[0]
4117         try:
4118             ip = p[IP]
4119             tcp = p[TCP]
4120             self.assertEqual(ip.src, nat_ip)
4121             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4122             self.assertEqual(tcp.dport, external_port)
4123             port_out1 = tcp.sport
4124         except:
4125             self.logger.error(ppp("Unexpected or invalid packet:", p))
4126             raise
4127
4128         dms = self.vapi.nat_det_map_dump()
4129         self.assertEqual(1, len(dms))
4130         self.assertEqual(2, dms[0].ses_num)
4131
4132         # out to host0
4133         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4134              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4135              TCP(sport=external_port, dport=port_out0))
4136         self.pg1.add_stream(p)
4137         self.pg_enable_capture(self.pg_interfaces)
4138         self.pg_start()
4139         capture = self.pg0.get_capture(1)
4140         p = capture[0]
4141         try:
4142             ip = p[IP]
4143             tcp = p[TCP]
4144             self.assertEqual(ip.src, self.pg1.remote_ip4)
4145             self.assertEqual(ip.dst, host0.ip4)
4146             self.assertEqual(tcp.dport, port_in)
4147             self.assertEqual(tcp.sport, external_port)
4148         except:
4149             self.logger.error(ppp("Unexpected or invalid packet:", p))
4150             raise
4151
4152         # out to host1
4153         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4154              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4155              TCP(sport=external_port, dport=port_out1))
4156         self.pg1.add_stream(p)
4157         self.pg_enable_capture(self.pg_interfaces)
4158         self.pg_start()
4159         capture = self.pg0.get_capture(1)
4160         p = capture[0]
4161         try:
4162             ip = p[IP]
4163             tcp = p[TCP]
4164             self.assertEqual(ip.src, self.pg1.remote_ip4)
4165             self.assertEqual(ip.dst, host1.ip4)
4166             self.assertEqual(tcp.dport, port_in)
4167             self.assertEqual(tcp.sport, external_port)
4168         except:
4169             self.logger.error(ppp("Unexpected or invalid packet", p))
4170             raise
4171
4172         # session close api test
4173         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4174                                             port_out1,
4175                                             self.pg1.remote_ip4n,
4176                                             external_port)
4177         dms = self.vapi.nat_det_map_dump()
4178         self.assertEqual(dms[0].ses_num, 1)
4179
4180         self.vapi.nat_det_close_session_in(host0.ip4n,
4181                                            port_in,
4182                                            self.pg1.remote_ip4n,
4183                                            external_port)
4184         dms = self.vapi.nat_det_map_dump()
4185         self.assertEqual(dms[0].ses_num, 0)
4186
4187     def test_tcp_session_close_detection_in(self):
4188         """ Deterministic NAT TCP session close from inside network """
4189         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4190                                       32,
4191                                       socket.inet_aton(self.nat_addr),
4192                                       32)
4193         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4194         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4195                                                   is_inside=0)
4196
4197         self.initiate_tcp_session(self.pg0, self.pg1)
4198
4199         # close the session from inside
4200         try:
4201             # FIN packet in -> out
4202             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4203                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4204                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4205                      flags="F"))
4206             self.pg0.add_stream(p)
4207             self.pg_enable_capture(self.pg_interfaces)
4208             self.pg_start()
4209             self.pg1.get_capture(1)
4210
4211             pkts = []
4212
4213             # ACK packet out -> in
4214             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4215                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4216                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4217                      flags="A"))
4218             pkts.append(p)
4219
4220             # FIN packet out -> in
4221             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4222                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4223                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4224                      flags="F"))
4225             pkts.append(p)
4226
4227             self.pg1.add_stream(pkts)
4228             self.pg_enable_capture(self.pg_interfaces)
4229             self.pg_start()
4230             self.pg0.get_capture(2)
4231
4232             # ACK packet in -> out
4233             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4234                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4235                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4236                      flags="A"))
4237             self.pg0.add_stream(p)
4238             self.pg_enable_capture(self.pg_interfaces)
4239             self.pg_start()
4240             self.pg1.get_capture(1)
4241
4242             # Check if deterministic NAT44 closed the session
4243             dms = self.vapi.nat_det_map_dump()
4244             self.assertEqual(0, dms[0].ses_num)
4245         except:
4246             self.logger.error("TCP session termination failed")
4247             raise
4248
4249     def test_tcp_session_close_detection_out(self):
4250         """ Deterministic NAT TCP session close from outside network """
4251         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4252                                       32,
4253                                       socket.inet_aton(self.nat_addr),
4254                                       32)
4255         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4256         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4257                                                   is_inside=0)
4258
4259         self.initiate_tcp_session(self.pg0, self.pg1)
4260
4261         # close the session from outside
4262         try:
4263             # FIN packet out -> in
4264             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4265                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4266                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4267                      flags="F"))
4268             self.pg1.add_stream(p)
4269             self.pg_enable_capture(self.pg_interfaces)
4270             self.pg_start()
4271             self.pg0.get_capture(1)
4272
4273             pkts = []
4274
4275             # ACK packet in -> out
4276             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4277                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4278                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4279                      flags="A"))
4280             pkts.append(p)
4281
4282             # ACK packet in -> out
4283             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4284                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4285                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4286                      flags="F"))
4287             pkts.append(p)
4288
4289             self.pg0.add_stream(pkts)
4290             self.pg_enable_capture(self.pg_interfaces)
4291             self.pg_start()
4292             self.pg1.get_capture(2)
4293
4294             # ACK packet out -> in
4295             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4296                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4297                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4298                      flags="A"))
4299             self.pg1.add_stream(p)
4300             self.pg_enable_capture(self.pg_interfaces)
4301             self.pg_start()
4302             self.pg0.get_capture(1)
4303
4304             # Check if deterministic NAT44 closed the session
4305             dms = self.vapi.nat_det_map_dump()
4306             self.assertEqual(0, dms[0].ses_num)
4307         except:
4308             self.logger.error("TCP session termination failed")
4309             raise
4310
4311     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4312     def test_session_timeout(self):
4313         """ Deterministic NAT session timeouts """
4314         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4315                                       32,
4316                                       socket.inet_aton(self.nat_addr),
4317                                       32)
4318         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4319         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4320                                                   is_inside=0)
4321
4322         self.initiate_tcp_session(self.pg0, self.pg1)
4323         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4324         pkts = self.create_stream_in(self.pg0, self.pg1)
4325         self.pg0.add_stream(pkts)
4326         self.pg_enable_capture(self.pg_interfaces)
4327         self.pg_start()
4328         capture = self.pg1.get_capture(len(pkts))
4329         sleep(15)
4330
4331         dms = self.vapi.nat_det_map_dump()
4332         self.assertEqual(0, dms[0].ses_num)
4333
4334     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4335     def test_session_limit_per_user(self):
4336         """ Deterministic NAT maximum sessions per user limit """
4337         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4338                                       32,
4339                                       socket.inet_aton(self.nat_addr),
4340                                       32)
4341         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4342         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4343                                                   is_inside=0)
4344         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4345                                      src_address=self.pg2.local_ip4n,
4346                                      path_mtu=512,
4347                                      template_interval=10)
4348         self.vapi.nat_ipfix()
4349
4350         pkts = []
4351         for port in range(1025, 2025):
4352             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4353                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4354                  UDP(sport=port, dport=port))
4355             pkts.append(p)
4356
4357         self.pg0.add_stream(pkts)
4358         self.pg_enable_capture(self.pg_interfaces)
4359         self.pg_start()
4360         capture = self.pg1.get_capture(len(pkts))
4361
4362         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4363              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4364              UDP(sport=3001, dport=3002))
4365         self.pg0.add_stream(p)
4366         self.pg_enable_capture(self.pg_interfaces)
4367         self.pg_start()
4368         capture = self.pg1.assert_nothing_captured()
4369
4370         # verify ICMP error packet
4371         capture = self.pg0.get_capture(1)
4372         p = capture[0]
4373         self.assertTrue(p.haslayer(ICMP))
4374         icmp = p[ICMP]
4375         self.assertEqual(icmp.type, 3)
4376         self.assertEqual(icmp.code, 1)
4377         self.assertTrue(icmp.haslayer(IPerror))
4378         inner_ip = icmp[IPerror]
4379         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4380         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4381
4382         dms = self.vapi.nat_det_map_dump()
4383
4384         self.assertEqual(1000, dms[0].ses_num)
4385
4386         # verify IPFIX logging
4387         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4388         sleep(1)
4389         capture = self.pg2.get_capture(2)
4390         ipfix = IPFIXDecoder()
4391         # first load template
4392         for p in capture:
4393             self.assertTrue(p.haslayer(IPFIX))
4394             if p.haslayer(Template):
4395                 ipfix.add_template(p.getlayer(Template))
4396         # verify events in data set
4397         for p in capture:
4398             if p.haslayer(Data):
4399                 data = ipfix.decode_data_set(p.getlayer(Set))
4400                 self.verify_ipfix_max_entries_per_user(data)
4401
4402     def clear_nat_det(self):
4403         """
4404         Clear deterministic NAT configuration.
4405         """
4406         self.vapi.nat_ipfix(enable=0)
4407         self.vapi.nat_det_set_timeouts()
4408         deterministic_mappings = self.vapi.nat_det_map_dump()
4409         for dsm in deterministic_mappings:
4410             self.vapi.nat_det_add_del_map(dsm.in_addr,
4411                                           dsm.in_plen,
4412                                           dsm.out_addr,
4413                                           dsm.out_plen,
4414                                           is_add=0)
4415
4416         interfaces = self.vapi.nat44_interface_dump()
4417         for intf in interfaces:
4418             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4419                                                       intf.is_inside,
4420                                                       is_add=0)
4421
4422     def tearDown(self):
4423         super(TestDeterministicNAT, self).tearDown()
4424         if not self.vpp_dead:
4425             self.logger.info(self.vapi.cli("show nat44 detail"))
4426             self.clear_nat_det()
4427
4428
4429 class TestNAT64(MethodHolder):
4430     """ NAT64 Test Cases """
4431
4432     @classmethod
4433     def setUpConstants(cls):
4434         super(TestNAT64, cls).setUpConstants()
4435         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4436                                 "nat64 st hash buckets 256", "}"])
4437
4438     @classmethod
4439     def setUpClass(cls):
4440         super(TestNAT64, cls).setUpClass()
4441
4442         try:
4443             cls.tcp_port_in = 6303
4444             cls.tcp_port_out = 6303
4445             cls.udp_port_in = 6304
4446             cls.udp_port_out = 6304
4447             cls.icmp_id_in = 6305
4448             cls.icmp_id_out = 6305
4449             cls.nat_addr = '10.0.0.3'
4450             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4451             cls.vrf1_id = 10
4452             cls.vrf1_nat_addr = '10.0.10.3'
4453             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4454                                                    cls.vrf1_nat_addr)
4455             cls.ipfix_src_port = 4739
4456             cls.ipfix_domain_id = 1
4457
4458             cls.create_pg_interfaces(range(5))
4459             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4460             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4461             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4462
4463             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4464
4465             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4466
4467             cls.pg0.generate_remote_hosts(2)
4468
4469             for i in cls.ip6_interfaces:
4470                 i.admin_up()
4471                 i.config_ip6()
4472                 i.configure_ipv6_neighbors()
4473
4474             for i in cls.ip4_interfaces:
4475                 i.admin_up()
4476                 i.config_ip4()
4477                 i.resolve_arp()
4478
4479             cls.pg3.admin_up()
4480             cls.pg3.config_ip4()
4481             cls.pg3.resolve_arp()
4482             cls.pg3.config_ip6()
4483             cls.pg3.configure_ipv6_neighbors()
4484
4485         except Exception:
4486             super(TestNAT64, cls).tearDownClass()
4487             raise
4488
4489     def test_pool(self):
4490         """ Add/delete address to NAT64 pool """
4491         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4492
4493         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4494
4495         addresses = self.vapi.nat64_pool_addr_dump()
4496         self.assertEqual(len(addresses), 1)
4497         self.assertEqual(addresses[0].address, nat_addr)
4498
4499         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4500
4501         addresses = self.vapi.nat64_pool_addr_dump()
4502         self.assertEqual(len(addresses), 0)
4503
4504     def test_interface(self):
4505         """ Enable/disable NAT64 feature on the interface """
4506         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4507         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4508
4509         interfaces = self.vapi.nat64_interface_dump()
4510         self.assertEqual(len(interfaces), 2)
4511         pg0_found = False
4512         pg1_found = False
4513         for intf in interfaces:
4514             if intf.sw_if_index == self.pg0.sw_if_index:
4515                 self.assertEqual(intf.is_inside, 1)
4516                 pg0_found = True
4517             elif intf.sw_if_index == self.pg1.sw_if_index:
4518                 self.assertEqual(intf.is_inside, 0)
4519                 pg1_found = True
4520         self.assertTrue(pg0_found)
4521         self.assertTrue(pg1_found)
4522
4523         features = self.vapi.cli("show interface features pg0")
4524         self.assertNotEqual(features.find('nat64-in2out'), -1)
4525         features = self.vapi.cli("show interface features pg1")
4526         self.assertNotEqual(features.find('nat64-out2in'), -1)
4527
4528         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4529         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4530
4531         interfaces = self.vapi.nat64_interface_dump()
4532         self.assertEqual(len(interfaces), 0)
4533
4534     def test_static_bib(self):
4535         """ Add/delete static BIB entry """
4536         in_addr = socket.inet_pton(socket.AF_INET6,
4537                                    '2001:db8:85a3::8a2e:370:7334')
4538         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4539         in_port = 1234
4540         out_port = 5678
4541         proto = IP_PROTOS.tcp
4542
4543         self.vapi.nat64_add_del_static_bib(in_addr,
4544                                            out_addr,
4545                                            in_port,
4546                                            out_port,
4547                                            proto)
4548         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4549         static_bib_num = 0
4550         for bibe in bib:
4551             if bibe.is_static:
4552                 static_bib_num += 1
4553                 self.assertEqual(bibe.i_addr, in_addr)
4554                 self.assertEqual(bibe.o_addr, out_addr)
4555                 self.assertEqual(bibe.i_port, in_port)
4556                 self.assertEqual(bibe.o_port, out_port)
4557         self.assertEqual(static_bib_num, 1)
4558
4559         self.vapi.nat64_add_del_static_bib(in_addr,
4560                                            out_addr,
4561                                            in_port,
4562                                            out_port,
4563                                            proto,
4564                                            is_add=0)
4565         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4566         static_bib_num = 0
4567         for bibe in bib:
4568             if bibe.is_static:
4569                 static_bib_num += 1
4570         self.assertEqual(static_bib_num, 0)
4571
4572     def test_set_timeouts(self):
4573         """ Set NAT64 timeouts """
4574         # verify default values
4575         timeouts = self.vapi.nat64_get_timeouts()
4576         self.assertEqual(timeouts.udp, 300)
4577         self.assertEqual(timeouts.icmp, 60)
4578         self.assertEqual(timeouts.tcp_trans, 240)
4579         self.assertEqual(timeouts.tcp_est, 7440)
4580         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4581
4582         # set and verify custom values
4583         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4584                                      tcp_est=7450, tcp_incoming_syn=10)
4585         timeouts = self.vapi.nat64_get_timeouts()
4586         self.assertEqual(timeouts.udp, 200)
4587         self.assertEqual(timeouts.icmp, 30)
4588         self.assertEqual(timeouts.tcp_trans, 250)
4589         self.assertEqual(timeouts.tcp_est, 7450)
4590         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4591
4592     def test_dynamic(self):
4593         """ NAT64 dynamic translation test """
4594         self.tcp_port_in = 6303
4595         self.udp_port_in = 6304
4596         self.icmp_id_in = 6305
4597
4598         ses_num_start = self.nat64_get_ses_num()
4599
4600         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4601                                                 self.nat_addr_n)
4602         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4603         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4604
4605         # in2out
4606         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4607         self.pg0.add_stream(pkts)
4608         self.pg_enable_capture(self.pg_interfaces)
4609         self.pg_start()
4610         capture = self.pg1.get_capture(len(pkts))
4611         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4612                                 dst_ip=self.pg1.remote_ip4)
4613
4614         # out2in
4615         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4616         self.pg1.add_stream(pkts)
4617         self.pg_enable_capture(self.pg_interfaces)
4618         self.pg_start()
4619         capture = self.pg0.get_capture(len(pkts))
4620         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4621         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4622
4623         # in2out
4624         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4625         self.pg0.add_stream(pkts)
4626         self.pg_enable_capture(self.pg_interfaces)
4627         self.pg_start()
4628         capture = self.pg1.get_capture(len(pkts))
4629         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4630                                 dst_ip=self.pg1.remote_ip4)
4631
4632         # out2in
4633         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4634         self.pg1.add_stream(pkts)
4635         self.pg_enable_capture(self.pg_interfaces)
4636         self.pg_start()
4637         capture = self.pg0.get_capture(len(pkts))
4638         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4639
4640         ses_num_end = self.nat64_get_ses_num()
4641
4642         self.assertEqual(ses_num_end - ses_num_start, 3)
4643
4644         # tenant with specific VRF
4645         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4646                                                 self.vrf1_nat_addr_n,
4647                                                 vrf_id=self.vrf1_id)
4648         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4649
4650         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4651         self.pg2.add_stream(pkts)
4652         self.pg_enable_capture(self.pg_interfaces)
4653         self.pg_start()
4654         capture = self.pg1.get_capture(len(pkts))
4655         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4656                                 dst_ip=self.pg1.remote_ip4)
4657
4658         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4659         self.pg1.add_stream(pkts)
4660         self.pg_enable_capture(self.pg_interfaces)
4661         self.pg_start()
4662         capture = self.pg2.get_capture(len(pkts))
4663         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4664
4665     def test_static(self):
4666         """ NAT64 static translation test """
4667         self.tcp_port_in = 60303
4668         self.udp_port_in = 60304
4669         self.icmp_id_in = 60305
4670         self.tcp_port_out = 60303
4671         self.udp_port_out = 60304
4672         self.icmp_id_out = 60305
4673
4674         ses_num_start = self.nat64_get_ses_num()
4675
4676         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4677                                                 self.nat_addr_n)
4678         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4679         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4680
4681         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4682                                            self.nat_addr_n,
4683                                            self.tcp_port_in,
4684                                            self.tcp_port_out,
4685                                            IP_PROTOS.tcp)
4686         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4687                                            self.nat_addr_n,
4688                                            self.udp_port_in,
4689                                            self.udp_port_out,
4690                                            IP_PROTOS.udp)
4691         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4692                                            self.nat_addr_n,
4693                                            self.icmp_id_in,
4694                                            self.icmp_id_out,
4695                                            IP_PROTOS.icmp)
4696
4697         # in2out
4698         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4699         self.pg0.add_stream(pkts)
4700         self.pg_enable_capture(self.pg_interfaces)
4701         self.pg_start()
4702         capture = self.pg1.get_capture(len(pkts))
4703         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4704                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4705
4706         # out2in
4707         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4708         self.pg1.add_stream(pkts)
4709         self.pg_enable_capture(self.pg_interfaces)
4710         self.pg_start()
4711         capture = self.pg0.get_capture(len(pkts))
4712         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4713         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4714
4715         ses_num_end = self.nat64_get_ses_num()
4716
4717         self.assertEqual(ses_num_end - ses_num_start, 3)
4718
4719     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4720     def test_session_timeout(self):
4721         """ NAT64 session timeout """
4722         self.icmp_id_in = 1234
4723         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4724                                                 self.nat_addr_n)
4725         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4726         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4727         self.vapi.nat64_set_timeouts(icmp=5)
4728
4729         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4730         self.pg0.add_stream(pkts)
4731         self.pg_enable_capture(self.pg_interfaces)
4732         self.pg_start()
4733         capture = self.pg1.get_capture(len(pkts))
4734
4735         ses_num_before_timeout = self.nat64_get_ses_num()
4736
4737         sleep(15)
4738
4739         # ICMP session after timeout
4740         ses_num_after_timeout = self.nat64_get_ses_num()
4741         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4742
4743     def test_icmp_error(self):
4744         """ NAT64 ICMP Error message translation """
4745         self.tcp_port_in = 6303
4746         self.udp_port_in = 6304
4747         self.icmp_id_in = 6305
4748
4749         ses_num_start = self.nat64_get_ses_num()
4750
4751         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4752                                                 self.nat_addr_n)
4753         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4754         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4755
4756         # send some packets to create sessions
4757         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4758         self.pg0.add_stream(pkts)
4759         self.pg_enable_capture(self.pg_interfaces)
4760         self.pg_start()
4761         capture_ip4 = self.pg1.get_capture(len(pkts))
4762         self.verify_capture_out(capture_ip4,
4763                                 nat_ip=self.nat_addr,
4764                                 dst_ip=self.pg1.remote_ip4)
4765
4766         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4767         self.pg1.add_stream(pkts)
4768         self.pg_enable_capture(self.pg_interfaces)
4769         self.pg_start()
4770         capture_ip6 = self.pg0.get_capture(len(pkts))
4771         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4772         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4773                                    self.pg0.remote_ip6)
4774
4775         # in2out
4776         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4777                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4778                 ICMPv6DestUnreach(code=1) /
4779                 packet[IPv6] for packet in capture_ip6]
4780         self.pg0.add_stream(pkts)
4781         self.pg_enable_capture(self.pg_interfaces)
4782         self.pg_start()
4783         capture = self.pg1.get_capture(len(pkts))
4784         for packet in capture:
4785             try:
4786                 self.assertEqual(packet[IP].src, self.nat_addr)
4787                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4788                 self.assertEqual(packet[ICMP].type, 3)
4789                 self.assertEqual(packet[ICMP].code, 13)
4790                 inner = packet[IPerror]
4791                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4792                 self.assertEqual(inner.dst, self.nat_addr)
4793                 self.check_icmp_checksum(packet)
4794                 if inner.haslayer(TCPerror):
4795                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4796                 elif inner.haslayer(UDPerror):
4797                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4798                 else:
4799                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4800             except:
4801                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4802                 raise
4803
4804         # out2in
4805         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4806                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4807                 ICMP(type=3, code=13) /
4808                 packet[IP] for packet in capture_ip4]
4809         self.pg1.add_stream(pkts)
4810         self.pg_enable_capture(self.pg_interfaces)
4811         self.pg_start()
4812         capture = self.pg0.get_capture(len(pkts))
4813         for packet in capture:
4814             try:
4815                 self.assertEqual(packet[IPv6].src, ip.src)
4816                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4817                 icmp = packet[ICMPv6DestUnreach]
4818                 self.assertEqual(icmp.code, 1)
4819                 inner = icmp[IPerror6]
4820                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4821                 self.assertEqual(inner.dst, ip.src)
4822                 self.check_icmpv6_checksum(packet)
4823                 if inner.haslayer(TCPerror):
4824                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4825                 elif inner.haslayer(UDPerror):
4826                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4827                 else:
4828                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4829                                      self.icmp_id_in)
4830             except:
4831                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4832                 raise
4833
4834     def test_hairpinning(self):
4835         """ NAT64 hairpinning """
4836
4837         client = self.pg0.remote_hosts[0]
4838         server = self.pg0.remote_hosts[1]
4839         server_tcp_in_port = 22
4840         server_tcp_out_port = 4022
4841         server_udp_in_port = 23
4842         server_udp_out_port = 4023
4843         client_tcp_in_port = 1234
4844         client_udp_in_port = 1235
4845         client_tcp_out_port = 0
4846         client_udp_out_port = 0
4847         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4848         nat_addr_ip6 = ip.src
4849
4850         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4851                                                 self.nat_addr_n)
4852         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4853         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4854
4855         self.vapi.nat64_add_del_static_bib(server.ip6n,
4856                                            self.nat_addr_n,
4857                                            server_tcp_in_port,
4858                                            server_tcp_out_port,
4859                                            IP_PROTOS.tcp)
4860         self.vapi.nat64_add_del_static_bib(server.ip6n,
4861                                            self.nat_addr_n,
4862                                            server_udp_in_port,
4863                                            server_udp_out_port,
4864                                            IP_PROTOS.udp)
4865
4866         # client to server
4867         pkts = []
4868         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4869              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4870              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4871         pkts.append(p)
4872         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4873              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4874              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4875         pkts.append(p)
4876         self.pg0.add_stream(pkts)
4877         self.pg_enable_capture(self.pg_interfaces)
4878         self.pg_start()
4879         capture = self.pg0.get_capture(len(pkts))
4880         for packet in capture:
4881             try:
4882                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4883                 self.assertEqual(packet[IPv6].dst, server.ip6)
4884                 if packet.haslayer(TCP):
4885                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4886                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4887                     self.check_tcp_checksum(packet)
4888                     client_tcp_out_port = packet[TCP].sport
4889                 else:
4890                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4891                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4892                     self.check_udp_checksum(packet)
4893                     client_udp_out_port = packet[UDP].sport
4894             except:
4895                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4896                 raise
4897
4898         # server to client
4899         pkts = []
4900         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4901              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4902              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4903         pkts.append(p)
4904         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4905              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4906              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4907         pkts.append(p)
4908         self.pg0.add_stream(pkts)
4909         self.pg_enable_capture(self.pg_interfaces)
4910         self.pg_start()
4911         capture = self.pg0.get_capture(len(pkts))
4912         for packet in capture:
4913             try:
4914                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4915                 self.assertEqual(packet[IPv6].dst, client.ip6)
4916                 if packet.haslayer(TCP):
4917                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4918                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4919                     self.check_tcp_checksum(packet)
4920                 else:
4921                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4922                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4923                     self.check_udp_checksum(packet)
4924             except:
4925                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4926                 raise
4927
4928         # ICMP error
4929         pkts = []
4930         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4931                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4932                 ICMPv6DestUnreach(code=1) /
4933                 packet[IPv6] for packet in capture]
4934         self.pg0.add_stream(pkts)
4935         self.pg_enable_capture(self.pg_interfaces)
4936         self.pg_start()
4937         capture = self.pg0.get_capture(len(pkts))
4938         for packet in capture:
4939             try:
4940                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4941                 self.assertEqual(packet[IPv6].dst, server.ip6)
4942                 icmp = packet[ICMPv6DestUnreach]
4943                 self.assertEqual(icmp.code, 1)
4944                 inner = icmp[IPerror6]
4945                 self.assertEqual(inner.src, server.ip6)
4946                 self.assertEqual(inner.dst, nat_addr_ip6)
4947                 self.check_icmpv6_checksum(packet)
4948                 if inner.haslayer(TCPerror):
4949                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4950                     self.assertEqual(inner[TCPerror].dport,
4951                                      client_tcp_out_port)
4952                 else:
4953                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4954                     self.assertEqual(inner[UDPerror].dport,
4955                                      client_udp_out_port)
4956             except:
4957                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4958                 raise
4959
4960     def test_prefix(self):
4961         """ NAT64 Network-Specific Prefix """
4962
4963         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4964                                                 self.nat_addr_n)
4965         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4966         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4967         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4968                                                 self.vrf1_nat_addr_n,
4969                                                 vrf_id=self.vrf1_id)
4970         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4971
4972         # Add global prefix
4973         global_pref64 = "2001:db8::"
4974         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4975         global_pref64_len = 32
4976         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4977
4978         prefix = self.vapi.nat64_prefix_dump()
4979         self.assertEqual(len(prefix), 1)
4980         self.assertEqual(prefix[0].prefix, global_pref64_n)
4981         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4982         self.assertEqual(prefix[0].vrf_id, 0)
4983
4984         # Add tenant specific prefix
4985         vrf1_pref64 = "2001:db8:122:300::"
4986         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4987         vrf1_pref64_len = 56
4988         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4989                                        vrf1_pref64_len,
4990                                        vrf_id=self.vrf1_id)
4991         prefix = self.vapi.nat64_prefix_dump()
4992         self.assertEqual(len(prefix), 2)
4993
4994         # Global prefix
4995         pkts = self.create_stream_in_ip6(self.pg0,
4996                                          self.pg1,
4997                                          pref=global_pref64,
4998                                          plen=global_pref64_len)
4999         self.pg0.add_stream(pkts)
5000         self.pg_enable_capture(self.pg_interfaces)
5001         self.pg_start()
5002         capture = self.pg1.get_capture(len(pkts))
5003         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5004                                 dst_ip=self.pg1.remote_ip4)
5005
5006         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5007         self.pg1.add_stream(pkts)
5008         self.pg_enable_capture(self.pg_interfaces)
5009         self.pg_start()
5010         capture = self.pg0.get_capture(len(pkts))
5011         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5012                                   global_pref64,
5013                                   global_pref64_len)
5014         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5015
5016         # Tenant specific prefix
5017         pkts = self.create_stream_in_ip6(self.pg2,
5018                                          self.pg1,
5019                                          pref=vrf1_pref64,
5020                                          plen=vrf1_pref64_len)
5021         self.pg2.add_stream(pkts)
5022         self.pg_enable_capture(self.pg_interfaces)
5023         self.pg_start()
5024         capture = self.pg1.get_capture(len(pkts))
5025         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5026                                 dst_ip=self.pg1.remote_ip4)
5027
5028         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5029         self.pg1.add_stream(pkts)
5030         self.pg_enable_capture(self.pg_interfaces)
5031         self.pg_start()
5032         capture = self.pg2.get_capture(len(pkts))
5033         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5034                                   vrf1_pref64,
5035                                   vrf1_pref64_len)
5036         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5037
5038     def test_unknown_proto(self):
5039         """ NAT64 translate packet with unknown protocol """
5040
5041         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5042                                                 self.nat_addr_n)
5043         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5044         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5045         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5046
5047         # in2out
5048         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5049              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5050              TCP(sport=self.tcp_port_in, dport=20))
5051         self.pg0.add_stream(p)
5052         self.pg_enable_capture(self.pg_interfaces)
5053         self.pg_start()
5054         p = self.pg1.get_capture(1)
5055
5056         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5057              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5058              GRE() /
5059              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5060              TCP(sport=1234, dport=1234))
5061         self.pg0.add_stream(p)
5062         self.pg_enable_capture(self.pg_interfaces)
5063         self.pg_start()
5064         p = self.pg1.get_capture(1)
5065         packet = p[0]
5066         try:
5067             self.assertEqual(packet[IP].src, self.nat_addr)
5068             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5069             self.assertTrue(packet.haslayer(GRE))
5070             self.check_ip_checksum(packet)
5071         except:
5072             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5073             raise
5074
5075         # out2in
5076         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5077              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5078              GRE() /
5079              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5080              TCP(sport=1234, dport=1234))
5081         self.pg1.add_stream(p)
5082         self.pg_enable_capture(self.pg_interfaces)
5083         self.pg_start()
5084         p = self.pg0.get_capture(1)
5085         packet = p[0]
5086         try:
5087             self.assertEqual(packet[IPv6].src, remote_ip6)
5088             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5089             self.assertEqual(packet[IPv6].nh, 47)
5090         except:
5091             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5092             raise
5093
5094     def test_hairpinning_unknown_proto(self):
5095         """ NAT64 translate packet with unknown protocol - hairpinning """
5096
5097         client = self.pg0.remote_hosts[0]
5098         server = self.pg0.remote_hosts[1]
5099         server_tcp_in_port = 22
5100         server_tcp_out_port = 4022
5101         client_tcp_in_port = 1234
5102         client_tcp_out_port = 1235
5103         server_nat_ip = "10.0.0.100"
5104         client_nat_ip = "10.0.0.110"
5105         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5106         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5107         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5108         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5109
5110         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5111                                                 client_nat_ip_n)
5112         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5113         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5114
5115         self.vapi.nat64_add_del_static_bib(server.ip6n,
5116                                            server_nat_ip_n,
5117                                            server_tcp_in_port,
5118                                            server_tcp_out_port,
5119                                            IP_PROTOS.tcp)
5120
5121         self.vapi.nat64_add_del_static_bib(server.ip6n,
5122                                            server_nat_ip_n,
5123                                            0,
5124                                            0,
5125                                            IP_PROTOS.gre)
5126
5127         self.vapi.nat64_add_del_static_bib(client.ip6n,
5128                                            client_nat_ip_n,
5129                                            client_tcp_in_port,
5130                                            client_tcp_out_port,
5131                                            IP_PROTOS.tcp)
5132
5133         # client to server
5134         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5135              IPv6(src=client.ip6, dst=server_nat_ip6) /
5136              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5137         self.pg0.add_stream(p)
5138         self.pg_enable_capture(self.pg_interfaces)
5139         self.pg_start()
5140         p = self.pg0.get_capture(1)
5141
5142         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5143              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5144              GRE() /
5145              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5146              TCP(sport=1234, dport=1234))
5147         self.pg0.add_stream(p)
5148         self.pg_enable_capture(self.pg_interfaces)
5149         self.pg_start()
5150         p = self.pg0.get_capture(1)
5151         packet = p[0]
5152         try:
5153             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5154             self.assertEqual(packet[IPv6].dst, server.ip6)
5155             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5156         except:
5157             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5158             raise
5159
5160         # server to client
5161         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5162              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5163              GRE() /
5164              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5165              TCP(sport=1234, dport=1234))
5166         self.pg0.add_stream(p)
5167         self.pg_enable_capture(self.pg_interfaces)
5168         self.pg_start()
5169         p = self.pg0.get_capture(1)
5170         packet = p[0]
5171         try:
5172             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5173             self.assertEqual(packet[IPv6].dst, client.ip6)
5174             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5175         except:
5176             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5177             raise
5178
5179     def test_one_armed_nat64(self):
5180         """ One armed NAT64 """
5181         external_port = 0
5182         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5183                                            '64:ff9b::',
5184                                            96)
5185
5186         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5187                                                 self.nat_addr_n)
5188         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5189         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5190
5191         # in2out
5192         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5193              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5194              TCP(sport=12345, dport=80))
5195         self.pg3.add_stream(p)
5196         self.pg_enable_capture(self.pg_interfaces)
5197         self.pg_start()
5198         capture = self.pg3.get_capture(1)
5199         p = capture[0]
5200         try:
5201             ip = p[IP]
5202             tcp = p[TCP]
5203             self.assertEqual(ip.src, self.nat_addr)
5204             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5205             self.assertNotEqual(tcp.sport, 12345)
5206             external_port = tcp.sport
5207             self.assertEqual(tcp.dport, 80)
5208             self.check_tcp_checksum(p)
5209             self.check_ip_checksum(p)
5210         except:
5211             self.logger.error(ppp("Unexpected or invalid packet:", p))
5212             raise
5213
5214         # out2in
5215         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5216              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5217              TCP(sport=80, dport=external_port))
5218         self.pg3.add_stream(p)
5219         self.pg_enable_capture(self.pg_interfaces)
5220         self.pg_start()
5221         capture = self.pg3.get_capture(1)
5222         p = capture[0]
5223         try:
5224             ip = p[IPv6]
5225             tcp = p[TCP]
5226             self.assertEqual(ip.src, remote_host_ip6)
5227             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5228             self.assertEqual(tcp.sport, 80)
5229             self.assertEqual(tcp.dport, 12345)
5230             self.check_tcp_checksum(p)
5231         except:
5232             self.logger.error(ppp("Unexpected or invalid packet:", p))
5233             raise
5234
5235     def test_frag_in_order(self):
5236         """ NAT64 translate fragments arriving in order """
5237         self.tcp_port_in = random.randint(1025, 65535)
5238
5239         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5240                                                 self.nat_addr_n)
5241         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5242         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5243
5244         reass = self.vapi.nat_reass_dump()
5245         reass_n_start = len(reass)
5246
5247         # in2out
5248         data = 'a' * 200
5249         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5250                                            self.tcp_port_in, 20, data)
5251         self.pg0.add_stream(pkts)
5252         self.pg_enable_capture(self.pg_interfaces)
5253         self.pg_start()
5254         frags = self.pg1.get_capture(len(pkts))
5255         p = self.reass_frags_and_verify(frags,
5256                                         self.nat_addr,
5257                                         self.pg1.remote_ip4)
5258         self.assertEqual(p[TCP].dport, 20)
5259         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5260         self.tcp_port_out = p[TCP].sport
5261         self.assertEqual(data, p[Raw].load)
5262
5263         # out2in
5264         data = "A" * 4 + "b" * 16 + "C" * 3
5265         pkts = self.create_stream_frag(self.pg1,
5266                                        self.nat_addr,
5267                                        20,
5268                                        self.tcp_port_out,
5269                                        data)
5270         self.pg1.add_stream(pkts)
5271         self.pg_enable_capture(self.pg_interfaces)
5272         self.pg_start()
5273         frags = self.pg0.get_capture(len(pkts))
5274         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5275         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5276         self.assertEqual(p[TCP].sport, 20)
5277         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5278         self.assertEqual(data, p[Raw].load)
5279
5280         reass = self.vapi.nat_reass_dump()
5281         reass_n_end = len(reass)
5282
5283         self.assertEqual(reass_n_end - reass_n_start, 2)
5284
5285     def test_reass_hairpinning(self):
5286         """ NAT64 fragments hairpinning """
5287         data = 'a' * 200
5288         client = self.pg0.remote_hosts[0]
5289         server = self.pg0.remote_hosts[1]
5290         server_in_port = random.randint(1025, 65535)
5291         server_out_port = random.randint(1025, 65535)
5292         client_in_port = random.randint(1025, 65535)
5293         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5294         nat_addr_ip6 = ip.src
5295
5296         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5297                                                 self.nat_addr_n)
5298         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5299         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5300
5301         # add static BIB entry for server
5302         self.vapi.nat64_add_del_static_bib(server.ip6n,
5303                                            self.nat_addr_n,
5304                                            server_in_port,
5305                                            server_out_port,
5306                                            IP_PROTOS.tcp)
5307
5308         # send packet from host to server
5309         pkts = self.create_stream_frag_ip6(self.pg0,
5310                                            self.nat_addr,
5311                                            client_in_port,
5312                                            server_out_port,
5313                                            data)
5314         self.pg0.add_stream(pkts)
5315         self.pg_enable_capture(self.pg_interfaces)
5316         self.pg_start()
5317         frags = self.pg0.get_capture(len(pkts))
5318         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5319         self.assertNotEqual(p[TCP].sport, client_in_port)
5320         self.assertEqual(p[TCP].dport, server_in_port)
5321         self.assertEqual(data, p[Raw].load)
5322
5323     def test_frag_out_of_order(self):
5324         """ NAT64 translate fragments arriving out of order """
5325         self.tcp_port_in = random.randint(1025, 65535)
5326
5327         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5328                                                 self.nat_addr_n)
5329         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5330         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5331
5332         # in2out
5333         data = 'a' * 200
5334         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5335                                            self.tcp_port_in, 20, data)
5336         pkts.reverse()
5337         self.pg0.add_stream(pkts)
5338         self.pg_enable_capture(self.pg_interfaces)
5339         self.pg_start()
5340         frags = self.pg1.get_capture(len(pkts))
5341         p = self.reass_frags_and_verify(frags,
5342                                         self.nat_addr,
5343                                         self.pg1.remote_ip4)
5344         self.assertEqual(p[TCP].dport, 20)
5345         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5346         self.tcp_port_out = p[TCP].sport
5347         self.assertEqual(data, p[Raw].load)
5348
5349         # out2in
5350         data = "A" * 4 + "B" * 16 + "C" * 3
5351         pkts = self.create_stream_frag(self.pg1,
5352                                        self.nat_addr,
5353                                        20,
5354                                        self.tcp_port_out,
5355                                        data)
5356         pkts.reverse()
5357         self.pg1.add_stream(pkts)
5358         self.pg_enable_capture(self.pg_interfaces)
5359         self.pg_start()
5360         frags = self.pg0.get_capture(len(pkts))
5361         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5362         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5363         self.assertEqual(p[TCP].sport, 20)
5364         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5365         self.assertEqual(data, p[Raw].load)
5366
5367     def test_interface_addr(self):
5368         """ Acquire NAT64 pool addresses from interface """
5369         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5370
5371         # no address in NAT64 pool
5372         adresses = self.vapi.nat44_address_dump()
5373         self.assertEqual(0, len(adresses))
5374
5375         # configure interface address and check NAT64 address pool
5376         self.pg4.config_ip4()
5377         addresses = self.vapi.nat64_pool_addr_dump()
5378         self.assertEqual(len(addresses), 1)
5379         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5380
5381         # remove interface address and check NAT64 address pool
5382         self.pg4.unconfig_ip4()
5383         addresses = self.vapi.nat64_pool_addr_dump()
5384         self.assertEqual(0, len(adresses))
5385
5386     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5387     def test_ipfix_max_bibs_sessions(self):
5388         """ IPFIX logging maximum session and BIB entries exceeded """
5389         max_bibs = 1280
5390         max_sessions = 2560
5391         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5392                                            '64:ff9b::',
5393                                            96)
5394
5395         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5396                                                 self.nat_addr_n)
5397         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5398         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5399
5400         pkts = []
5401         src = ""
5402         for i in range(0, max_bibs):
5403             src = "fd01:aa::%x" % (i)
5404             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5405                  IPv6(src=src, dst=remote_host_ip6) /
5406                  TCP(sport=12345, dport=80))
5407             pkts.append(p)
5408             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5409                  IPv6(src=src, dst=remote_host_ip6) /
5410                  TCP(sport=12345, dport=22))
5411             pkts.append(p)
5412         self.pg0.add_stream(pkts)
5413         self.pg_enable_capture(self.pg_interfaces)
5414         self.pg_start()
5415         self.pg1.get_capture(max_sessions)
5416
5417         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5418                                      src_address=self.pg3.local_ip4n,
5419                                      path_mtu=512,
5420                                      template_interval=10)
5421         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5422                             src_port=self.ipfix_src_port)
5423
5424         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5425              IPv6(src=src, dst=remote_host_ip6) /
5426              TCP(sport=12345, dport=25))
5427         self.pg0.add_stream(p)
5428         self.pg_enable_capture(self.pg_interfaces)
5429         self.pg_start()
5430         self.pg1.get_capture(0)
5431         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5432         capture = self.pg3.get_capture(9)
5433         ipfix = IPFIXDecoder()
5434         # first load template
5435         for p in capture:
5436             self.assertTrue(p.haslayer(IPFIX))
5437             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5438             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5439             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5440             self.assertEqual(p[UDP].dport, 4739)
5441             self.assertEqual(p[IPFIX].observationDomainID,
5442                              self.ipfix_domain_id)
5443             if p.haslayer(Template):
5444                 ipfix.add_template(p.getlayer(Template))
5445         # verify events in data set
5446         for p in capture:
5447             if p.haslayer(Data):
5448                 data = ipfix.decode_data_set(p.getlayer(Set))
5449                 self.verify_ipfix_max_sessions(data, max_sessions)
5450
5451         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5452              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5453              TCP(sport=12345, dport=80))
5454         self.pg0.add_stream(p)
5455         self.pg_enable_capture(self.pg_interfaces)
5456         self.pg_start()
5457         self.pg1.get_capture(0)
5458         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5459         capture = self.pg3.get_capture(1)
5460         # verify events in data set
5461         for p in capture:
5462             self.assertTrue(p.haslayer(IPFIX))
5463             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5464             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5465             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5466             self.assertEqual(p[UDP].dport, 4739)
5467             self.assertEqual(p[IPFIX].observationDomainID,
5468                              self.ipfix_domain_id)
5469             if p.haslayer(Data):
5470                 data = ipfix.decode_data_set(p.getlayer(Set))
5471                 self.verify_ipfix_max_bibs(data, max_bibs)
5472
5473     def test_ipfix_max_frags(self):
5474         """ IPFIX logging maximum fragments pending reassembly exceeded """
5475         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5476                                                 self.nat_addr_n)
5477         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5478         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5479         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5480         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5481                                      src_address=self.pg3.local_ip4n,
5482                                      path_mtu=512,
5483                                      template_interval=10)
5484         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5485                             src_port=self.ipfix_src_port)
5486
5487         data = 'a' * 200
5488         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5489                                            self.tcp_port_in, 20, data)
5490         self.pg0.add_stream(pkts[-1])
5491         self.pg_enable_capture(self.pg_interfaces)
5492         self.pg_start()
5493         self.pg1.get_capture(0)
5494         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5495         capture = self.pg3.get_capture(9)
5496         ipfix = IPFIXDecoder()
5497         # first load template
5498         for p in capture:
5499             self.assertTrue(p.haslayer(IPFIX))
5500             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5501             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5502             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5503             self.assertEqual(p[UDP].dport, 4739)
5504             self.assertEqual(p[IPFIX].observationDomainID,
5505                              self.ipfix_domain_id)
5506             if p.haslayer(Template):
5507                 ipfix.add_template(p.getlayer(Template))
5508         # verify events in data set
5509         for p in capture:
5510             if p.haslayer(Data):
5511                 data = ipfix.decode_data_set(p.getlayer(Set))
5512                 self.verify_ipfix_max_fragments_ip6(data, 0,
5513                                                     self.pg0.remote_ip6n)
5514
5515     def test_ipfix_bib_ses(self):
5516         """ IPFIX logging NAT64 BIB/session create and delete events """
5517         self.tcp_port_in = random.randint(1025, 65535)
5518         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5519                                            '64:ff9b::',
5520                                            96)
5521
5522         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5523                                                 self.nat_addr_n)
5524         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5525         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5526         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5527                                      src_address=self.pg3.local_ip4n,
5528                                      path_mtu=512,
5529                                      template_interval=10)
5530         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5531                             src_port=self.ipfix_src_port)
5532
5533         # Create
5534         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5535              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5536              TCP(sport=self.tcp_port_in, dport=25))
5537         self.pg0.add_stream(p)
5538         self.pg_enable_capture(self.pg_interfaces)
5539         self.pg_start()
5540         p = self.pg1.get_capture(1)
5541         self.tcp_port_out = p[0][TCP].sport
5542         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5543         capture = self.pg3.get_capture(10)
5544         ipfix = IPFIXDecoder()
5545         # first load template
5546         for p in capture:
5547             self.assertTrue(p.haslayer(IPFIX))
5548             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5549             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5550             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5551             self.assertEqual(p[UDP].dport, 4739)
5552             self.assertEqual(p[IPFIX].observationDomainID,
5553                              self.ipfix_domain_id)
5554             if p.haslayer(Template):
5555                 ipfix.add_template(p.getlayer(Template))
5556         # verify events in data set
5557         for p in capture:
5558             if p.haslayer(Data):
5559                 data = ipfix.decode_data_set(p.getlayer(Set))
5560                 if ord(data[0][230]) == 10:
5561                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5562                 elif ord(data[0][230]) == 6:
5563                     self.verify_ipfix_nat64_ses(data,
5564                                                 1,
5565                                                 self.pg0.remote_ip6n,
5566                                                 self.pg1.remote_ip4,
5567                                                 25)
5568                 else:
5569                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5570
5571         # Delete
5572         self.pg_enable_capture(self.pg_interfaces)
5573         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5574                                                 self.nat_addr_n,
5575                                                 is_add=0)
5576         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5577         capture = self.pg3.get_capture(2)
5578         # verify events in data set
5579         for p in capture:
5580             self.assertTrue(p.haslayer(IPFIX))
5581             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5582             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5583             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5584             self.assertEqual(p[UDP].dport, 4739)
5585             self.assertEqual(p[IPFIX].observationDomainID,
5586                              self.ipfix_domain_id)
5587             if p.haslayer(Data):
5588                 data = ipfix.decode_data_set(p.getlayer(Set))
5589                 if ord(data[0][230]) == 11:
5590                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5591                 elif ord(data[0][230]) == 7:
5592                     self.verify_ipfix_nat64_ses(data,
5593                                                 0,
5594                                                 self.pg0.remote_ip6n,
5595                                                 self.pg1.remote_ip4,
5596                                                 25)
5597                 else:
5598                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5599
5600     def nat64_get_ses_num(self):
5601         """
5602         Return number of active NAT64 sessions.
5603         """
5604         st = self.vapi.nat64_st_dump()
5605         return len(st)
5606
5607     def clear_nat64(self):
5608         """
5609         Clear NAT64 configuration.
5610         """
5611         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5612                             domain_id=self.ipfix_domain_id)
5613         self.ipfix_src_port = 4739
5614         self.ipfix_domain_id = 1
5615
5616         self.vapi.nat64_set_timeouts()
5617
5618         interfaces = self.vapi.nat64_interface_dump()
5619         for intf in interfaces:
5620             if intf.is_inside > 1:
5621                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5622                                                   0,
5623                                                   is_add=0)
5624             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5625                                               intf.is_inside,
5626                                               is_add=0)
5627
5628         bib = self.vapi.nat64_bib_dump(255)
5629         for bibe in bib:
5630             if bibe.is_static:
5631                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5632                                                    bibe.o_addr,
5633                                                    bibe.i_port,
5634                                                    bibe.o_port,
5635                                                    bibe.proto,
5636                                                    bibe.vrf_id,
5637                                                    is_add=0)
5638
5639         adresses = self.vapi.nat64_pool_addr_dump()
5640         for addr in adresses:
5641             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5642                                                     addr.address,
5643                                                     vrf_id=addr.vrf_id,
5644                                                     is_add=0)
5645
5646         prefixes = self.vapi.nat64_prefix_dump()
5647         for prefix in prefixes:
5648             self.vapi.nat64_add_del_prefix(prefix.prefix,
5649                                            prefix.prefix_len,
5650                                            vrf_id=prefix.vrf_id,
5651                                            is_add=0)
5652
5653     def tearDown(self):
5654         super(TestNAT64, self).tearDown()
5655         if not self.vpp_dead:
5656             self.logger.info(self.vapi.cli("show nat64 pool"))
5657             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5658             self.logger.info(self.vapi.cli("show nat64 prefix"))
5659             self.logger.info(self.vapi.cli("show nat64 bib all"))
5660             self.logger.info(self.vapi.cli("show nat64 session table all"))
5661             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5662             self.clear_nat64()
5663
5664
5665 class TestDSlite(MethodHolder):
5666     """ DS-Lite Test Cases """
5667
5668     @classmethod
5669     def setUpClass(cls):
5670         super(TestDSlite, cls).setUpClass()
5671
5672         try:
5673             cls.nat_addr = '10.0.0.3'
5674             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5675
5676             cls.create_pg_interfaces(range(2))
5677             cls.pg0.admin_up()
5678             cls.pg0.config_ip4()
5679             cls.pg0.resolve_arp()
5680             cls.pg1.admin_up()
5681             cls.pg1.config_ip6()
5682             cls.pg1.generate_remote_hosts(2)
5683             cls.pg1.configure_ipv6_neighbors()
5684
5685         except Exception:
5686             super(TestDSlite, cls).tearDownClass()
5687             raise
5688
5689     def test_dslite(self):
5690         """ Test DS-Lite """
5691         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5692                                                  self.nat_addr_n)
5693         aftr_ip4 = '192.0.0.1'
5694         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5695         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5696         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5697         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5698
5699         # UDP
5700         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5701              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5702              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5703              UDP(sport=20000, dport=10000))
5704         self.pg1.add_stream(p)
5705         self.pg_enable_capture(self.pg_interfaces)
5706         self.pg_start()
5707         capture = self.pg0.get_capture(1)
5708         capture = capture[0]
5709         self.assertFalse(capture.haslayer(IPv6))
5710         self.assertEqual(capture[IP].src, self.nat_addr)
5711         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5712         self.assertNotEqual(capture[UDP].sport, 20000)
5713         self.assertEqual(capture[UDP].dport, 10000)
5714         self.check_ip_checksum(capture)
5715         out_port = capture[UDP].sport
5716
5717         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5718              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5719              UDP(sport=10000, dport=out_port))
5720         self.pg0.add_stream(p)
5721         self.pg_enable_capture(self.pg_interfaces)
5722         self.pg_start()
5723         capture = self.pg1.get_capture(1)
5724         capture = capture[0]
5725         self.assertEqual(capture[IPv6].src, aftr_ip6)
5726         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5727         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5728         self.assertEqual(capture[IP].dst, '192.168.1.1')
5729         self.assertEqual(capture[UDP].sport, 10000)
5730         self.assertEqual(capture[UDP].dport, 20000)
5731         self.check_ip_checksum(capture)
5732
5733         # TCP
5734         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5735              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5736              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5737              TCP(sport=20001, dport=10001))
5738         self.pg1.add_stream(p)
5739         self.pg_enable_capture(self.pg_interfaces)
5740         self.pg_start()
5741         capture = self.pg0.get_capture(1)
5742         capture = capture[0]
5743         self.assertFalse(capture.haslayer(IPv6))
5744         self.assertEqual(capture[IP].src, self.nat_addr)
5745         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5746         self.assertNotEqual(capture[TCP].sport, 20001)
5747         self.assertEqual(capture[TCP].dport, 10001)
5748         self.check_ip_checksum(capture)
5749         self.check_tcp_checksum(capture)
5750         out_port = capture[TCP].sport
5751
5752         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5753              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5754              TCP(sport=10001, dport=out_port))
5755         self.pg0.add_stream(p)
5756         self.pg_enable_capture(self.pg_interfaces)
5757         self.pg_start()
5758         capture = self.pg1.get_capture(1)
5759         capture = capture[0]
5760         self.assertEqual(capture[IPv6].src, aftr_ip6)
5761         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5762         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5763         self.assertEqual(capture[IP].dst, '192.168.1.1')
5764         self.assertEqual(capture[TCP].sport, 10001)
5765         self.assertEqual(capture[TCP].dport, 20001)
5766         self.check_ip_checksum(capture)
5767         self.check_tcp_checksum(capture)
5768
5769         # ICMP
5770         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5771              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5772              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5773              ICMP(id=4000, type='echo-request'))
5774         self.pg1.add_stream(p)
5775         self.pg_enable_capture(self.pg_interfaces)
5776         self.pg_start()
5777         capture = self.pg0.get_capture(1)
5778         capture = capture[0]
5779         self.assertFalse(capture.haslayer(IPv6))
5780         self.assertEqual(capture[IP].src, self.nat_addr)
5781         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5782         self.assertNotEqual(capture[ICMP].id, 4000)
5783         self.check_ip_checksum(capture)
5784         self.check_icmp_checksum(capture)
5785         out_id = capture[ICMP].id
5786
5787         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5788              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5789              ICMP(id=out_id, type='echo-reply'))
5790         self.pg0.add_stream(p)
5791         self.pg_enable_capture(self.pg_interfaces)
5792         self.pg_start()
5793         capture = self.pg1.get_capture(1)
5794         capture = capture[0]
5795         self.assertEqual(capture[IPv6].src, aftr_ip6)
5796         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5797         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5798         self.assertEqual(capture[IP].dst, '192.168.1.1')
5799         self.assertEqual(capture[ICMP].id, 4000)
5800         self.check_ip_checksum(capture)
5801         self.check_icmp_checksum(capture)
5802
5803         # ping DS-Lite AFTR tunnel endpoint address
5804         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5805              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5806              ICMPv6EchoRequest())
5807         self.pg1.add_stream(p)
5808         self.pg_enable_capture(self.pg_interfaces)
5809         self.pg_start()
5810         capture = self.pg1.get_capture(1)
5811         self.assertEqual(1, len(capture))
5812         capture = capture[0]
5813         self.assertEqual(capture[IPv6].src, aftr_ip6)
5814         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5815         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5816
5817     def tearDown(self):
5818         super(TestDSlite, self).tearDown()
5819         if not self.vpp_dead:
5820             self.logger.info(self.vapi.cli("show dslite pool"))
5821             self.logger.info(
5822                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5823             self.logger.info(self.vapi.cli("show dslite sessions"))
5824
5825
5826 class TestDSliteCE(MethodHolder):
5827     """ DS-Lite CE Test Cases """
5828
5829     @classmethod
5830     def setUpConstants(cls):
5831         super(TestDSliteCE, cls).setUpConstants()
5832         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5833
5834     @classmethod
5835     def setUpClass(cls):
5836         super(TestDSliteCE, cls).setUpClass()
5837
5838         try:
5839             cls.create_pg_interfaces(range(2))
5840             cls.pg0.admin_up()
5841             cls.pg0.config_ip4()
5842             cls.pg0.resolve_arp()
5843             cls.pg1.admin_up()
5844             cls.pg1.config_ip6()
5845             cls.pg1.generate_remote_hosts(1)
5846             cls.pg1.configure_ipv6_neighbors()
5847
5848         except Exception:
5849             super(TestDSliteCE, cls).tearDownClass()
5850             raise
5851
5852     def test_dslite_ce(self):
5853         """ Test DS-Lite CE """
5854
5855         b4_ip4 = '192.0.0.2'
5856         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5857         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5858         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5859         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5860
5861         aftr_ip4 = '192.0.0.1'
5862         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5863         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5864         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5865         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5866
5867         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5868                                    dst_address_length=128,
5869                                    next_hop_address=self.pg1.remote_ip6n,
5870                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5871                                    is_ipv6=1)
5872
5873         # UDP encapsulation
5874         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5875              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5876              UDP(sport=10000, dport=20000))
5877         self.pg0.add_stream(p)
5878         self.pg_enable_capture(self.pg_interfaces)
5879         self.pg_start()
5880         capture = self.pg1.get_capture(1)
5881         capture = capture[0]
5882         self.assertEqual(capture[IPv6].src, b4_ip6)
5883         self.assertEqual(capture[IPv6].dst, aftr_ip6)
5884         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5885         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5886         self.assertEqual(capture[UDP].sport, 10000)
5887         self.assertEqual(capture[UDP].dport, 20000)
5888         self.check_ip_checksum(capture)
5889
5890         # UDP decapsulation
5891         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5892              IPv6(dst=b4_ip6, src=aftr_ip6) /
5893              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5894              UDP(sport=20000, dport=10000))
5895         self.pg1.add_stream(p)
5896         self.pg_enable_capture(self.pg_interfaces)
5897         self.pg_start()
5898         capture = self.pg0.get_capture(1)
5899         capture = capture[0]
5900         self.assertFalse(capture.haslayer(IPv6))
5901         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5902         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5903         self.assertEqual(capture[UDP].sport, 20000)
5904         self.assertEqual(capture[UDP].dport, 10000)
5905         self.check_ip_checksum(capture)
5906
5907         # ping DS-Lite B4 tunnel endpoint address
5908         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5909              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5910              ICMPv6EchoRequest())
5911         self.pg1.add_stream(p)
5912         self.pg_enable_capture(self.pg_interfaces)
5913         self.pg_start()
5914         capture = self.pg1.get_capture(1)
5915         self.assertEqual(1, len(capture))
5916         capture = capture[0]
5917         self.assertEqual(capture[IPv6].src, b4_ip6)
5918         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5919         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5920
5921     def tearDown(self):
5922         super(TestDSliteCE, self).tearDown()
5923         if not self.vpp_dead:
5924             self.logger.info(
5925                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5926             self.logger.info(
5927                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5928
5929 if __name__ == '__main__':
5930     unittest.main(testRunner=VppTestRunner)