NAT44: fix ICMP error translation for endpoint dependent sessions (VPP-1150)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 tag=sm.tag,
1034                 is_add=0)
1035
1036         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037         for lb_sm in lb_static_mappings:
1038             self.vapi.nat44_add_del_lb_static_mapping(
1039                 lb_sm.external_addr,
1040                 lb_sm.external_port,
1041                 lb_sm.protocol,
1042                 vrf_id=lb_sm.vrf_id,
1043                 twice_nat=lb_sm.twice_nat,
1044                 out2in_only=lb_sm.out2in_only,
1045                 tag=lb_sm.tag,
1046                 is_add=0,
1047                 local_num=0,
1048                 locals=[])
1049
1050         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051         for id_m in identity_mappings:
1052             self.vapi.nat44_add_del_identity_mapping(
1053                 addr_only=id_m.addr_only,
1054                 ip=id_m.ip_address,
1055                 port=id_m.port,
1056                 sw_if_index=id_m.sw_if_index,
1057                 vrf_id=id_m.vrf_id,
1058                 protocol=id_m.protocol,
1059                 is_add=0)
1060
1061         adresses = self.vapi.nat44_address_dump()
1062         for addr in adresses:
1063             self.vapi.nat44_add_del_address_range(addr.ip_address,
1064                                                   addr.ip_address,
1065                                                   twice_nat=addr.twice_nat,
1066                                                   is_add=0)
1067
1068         self.vapi.nat_set_reass()
1069         self.vapi.nat_set_reass(is_ip6=1)
1070
1071     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072                                  local_port=0, external_port=0, vrf_id=0,
1073                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1074                                  proto=0, twice_nat=0, out2in_only=0, tag=""):
1075         """
1076         Add/delete NAT44 static mapping
1077
1078         :param local_ip: Local IP address
1079         :param external_ip: External IP address
1080         :param local_port: Local port number (Optional)
1081         :param external_port: External port number (Optional)
1082         :param vrf_id: VRF ID (Default 0)
1083         :param is_add: 1 if add, 0 if delete (Default add)
1084         :param external_sw_if_index: External interface instead of IP address
1085         :param proto: IP protocol (Mandatory if port specified)
1086         :param twice_nat: 1 if translate external host address and port
1087         :param out2in_only: if 1 rule is matching only out2in direction
1088         :param tag: Opaque string tag
1089         """
1090         addr_only = 1
1091         if local_port and external_port:
1092             addr_only = 0
1093         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095         self.vapi.nat44_add_del_static_mapping(
1096             l_ip,
1097             e_ip,
1098             external_sw_if_index,
1099             local_port,
1100             external_port,
1101             addr_only,
1102             vrf_id,
1103             proto,
1104             twice_nat,
1105             out2in_only,
1106             tag,
1107             is_add)
1108
1109     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1110         """
1111         Add/delete NAT44 address
1112
1113         :param ip: IP address
1114         :param is_add: 1 if add, 0 if delete (Default add)
1115         :param twice_nat: twice NAT address for extenal hosts
1116         """
1117         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1119                                               vrf_id=vrf_id,
1120                                               twice_nat=twice_nat)
1121
1122     def test_dynamic(self):
1123         """ NAT44 dynamic translation test """
1124
1125         self.nat44_add_address(self.nat_addr)
1126         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1128                                                   is_inside=0)
1129
1130         # in2out
1131         pkts = self.create_stream_in(self.pg0, self.pg1)
1132         self.pg0.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135         capture = self.pg1.get_capture(len(pkts))
1136         self.verify_capture_out(capture)
1137
1138         # out2in
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147         """ NAT44 handling of client packets with TTL=1 """
1148
1149         self.nat44_add_address(self.nat_addr)
1150         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                   is_inside=0)
1153
1154         # Client side - generate traffic
1155         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156         self.pg0.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159
1160         # Client side - verify ICMP type 11 packets
1161         capture = self.pg0.get_capture(len(pkts))
1162         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163
1164     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165         """ NAT44 handling of server packets with TTL=1 """
1166
1167         self.nat44_add_address(self.nat_addr)
1168         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170                                                   is_inside=0)
1171
1172         # Client side - create sessions
1173         pkts = self.create_stream_in(self.pg0, self.pg1)
1174         self.pg0.add_stream(pkts)
1175         self.pg_enable_capture(self.pg_interfaces)
1176         self.pg_start()
1177
1178         # Server side - generate traffic
1179         capture = self.pg1.get_capture(len(pkts))
1180         self.verify_capture_out(capture)
1181         pkts = self.create_stream_out(self.pg1, ttl=1)
1182         self.pg1.add_stream(pkts)
1183         self.pg_enable_capture(self.pg_interfaces)
1184         self.pg_start()
1185
1186         # Server side - verify ICMP type 11 packets
1187         capture = self.pg1.get_capture(len(pkts))
1188         self.verify_capture_out_with_icmp_errors(capture,
1189                                                  src_ip=self.pg1.local_ip4)
1190
1191     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192         """ NAT44 handling of error responses to client packets with TTL=2 """
1193
1194         self.nat44_add_address(self.nat_addr)
1195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1197                                                   is_inside=0)
1198
1199         # Client side - generate traffic
1200         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201         self.pg0.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204
1205         # Server side - simulate ICMP type 11 response
1206         capture = self.pg1.get_capture(len(pkts))
1207         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209                 ICMP(type=11) / packet[IP] for packet in capture]
1210         self.pg1.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213
1214         # Client side - verify ICMP type 11 packets
1215         capture = self.pg0.get_capture(len(pkts))
1216         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1217
1218     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219         """ NAT44 handling of error responses to server packets with TTL=2 """
1220
1221         self.nat44_add_address(self.nat_addr)
1222         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1224                                                   is_inside=0)
1225
1226         # Client side - create sessions
1227         pkts = self.create_stream_in(self.pg0, self.pg1)
1228         self.pg0.add_stream(pkts)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231
1232         # Server side - generate traffic
1233         capture = self.pg1.get_capture(len(pkts))
1234         self.verify_capture_out(capture)
1235         pkts = self.create_stream_out(self.pg1, ttl=2)
1236         self.pg1.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239
1240         # Client side - simulate ICMP type 11 response
1241         capture = self.pg0.get_capture(len(pkts))
1242         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244                 ICMP(type=11) / packet[IP] for packet in capture]
1245         self.pg0.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248
1249         # Server side - verify ICMP type 11 packets
1250         capture = self.pg1.get_capture(len(pkts))
1251         self.verify_capture_out_with_icmp_errors(capture)
1252
1253     def test_ping_out_interface_from_outside(self):
1254         """ Ping NAT44 out interface from outside network """
1255
1256         self.nat44_add_address(self.nat_addr)
1257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259                                                   is_inside=0)
1260
1261         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263              ICMP(id=self.icmp_id_out, type='echo-request'))
1264         pkts = [p]
1265         self.pg1.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg1.get_capture(len(pkts))
1269         self.assertEqual(1, len(capture))
1270         packet = capture[0]
1271         try:
1272             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1276         except:
1277             self.logger.error(ppp("Unexpected or invalid packet "
1278                                   "(outside network):", packet))
1279             raise
1280
1281     def test_ping_internal_host_from_outside(self):
1282         """ Ping internal host from outside network """
1283
1284         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287                                                   is_inside=0)
1288
1289         # out2in
1290         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292                ICMP(id=self.icmp_id_out, type='echo-request'))
1293         self.pg1.add_stream(pkt)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         capture = self.pg0.get_capture(1)
1297         self.verify_capture_in(capture, self.pg0, packet_num=1)
1298         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1299
1300         # in2out
1301         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303                ICMP(id=self.icmp_id_in, type='echo-reply'))
1304         self.pg0.add_stream(pkt)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         capture = self.pg1.get_capture(1)
1308         self.verify_capture_out(capture, same_port=True, packet_num=1)
1309         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1310
1311     def test_forwarding(self):
1312         """ NAT44 forwarding test """
1313
1314         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1316                                                   is_inside=0)
1317         self.vapi.nat44_forwarding_enable_disable(1)
1318
1319         real_ip = self.pg0.remote_ip4n
1320         alias_ip = self.nat_addr_n
1321         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322                                                external_ip=alias_ip)
1323
1324         try:
1325             # in2out - static mapping match
1326
1327             pkts = self.create_stream_out(self.pg1)
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, same_port=True)
1340
1341             # in2out - no static mapping match
1342
1343             host0 = self.pg0.remote_hosts[0]
1344             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1345             try:
1346                 pkts = self.create_stream_out(self.pg1,
1347                                               dst_ip=self.pg0.remote_ip4,
1348                                               use_inside_ports=True)
1349                 self.pg1.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg0.get_capture(len(pkts))
1353                 self.verify_capture_in(capture, self.pg0)
1354
1355                 pkts = self.create_stream_in(self.pg0, self.pg1)
1356                 self.pg0.add_stream(pkts)
1357                 self.pg_enable_capture(self.pg_interfaces)
1358                 self.pg_start()
1359                 capture = self.pg1.get_capture(len(pkts))
1360                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1361                                         same_port=True)
1362             finally:
1363                 self.pg0.remote_hosts[0] = host0
1364
1365         finally:
1366             self.vapi.nat44_forwarding_enable_disable(0)
1367             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368                                                    external_ip=alias_ip,
1369                                                    is_add=0)
1370
1371     def test_static_in(self):
1372         """ 1:1 NAT initialized from inside network """
1373
1374         nat_ip = "10.0.0.10"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378
1379         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1382                                                   is_inside=0)
1383         sm = self.vapi.nat44_static_mapping_dump()
1384         self.assertEqual(len(sm), 1)
1385         self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386
1387         # in2out
1388         pkts = self.create_stream_in(self.pg0, self.pg1)
1389         self.pg0.add_stream(pkts)
1390         self.pg_enable_capture(self.pg_interfaces)
1391         self.pg_start()
1392         capture = self.pg1.get_capture(len(pkts))
1393         self.verify_capture_out(capture, nat_ip, True)
1394
1395         # out2in
1396         pkts = self.create_stream_out(self.pg1, nat_ip)
1397         self.pg1.add_stream(pkts)
1398         self.pg_enable_capture(self.pg_interfaces)
1399         self.pg_start()
1400         capture = self.pg0.get_capture(len(pkts))
1401         self.verify_capture_in(capture, self.pg0)
1402
1403     def test_static_out(self):
1404         """ 1:1 NAT initialized from outside network """
1405
1406         nat_ip = "10.0.0.20"
1407         self.tcp_port_out = 6303
1408         self.udp_port_out = 6304
1409         self.icmp_id_out = 6305
1410         tag = "testTAG"
1411
1412         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1413         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1415                                                   is_inside=0)
1416         sm = self.vapi.nat44_static_mapping_dump()
1417         self.assertEqual(len(sm), 1)
1418         self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1419
1420         # out2in
1421         pkts = self.create_stream_out(self.pg1, nat_ip)
1422         self.pg1.add_stream(pkts)
1423         self.pg_enable_capture(self.pg_interfaces)
1424         self.pg_start()
1425         capture = self.pg0.get_capture(len(pkts))
1426         self.verify_capture_in(capture, self.pg0)
1427
1428         # in2out
1429         pkts = self.create_stream_in(self.pg0, self.pg1)
1430         self.pg0.add_stream(pkts)
1431         self.pg_enable_capture(self.pg_interfaces)
1432         self.pg_start()
1433         capture = self.pg1.get_capture(len(pkts))
1434         self.verify_capture_out(capture, nat_ip, True)
1435
1436     def test_static_with_port_in(self):
1437         """ 1:1 NAPT initialized from inside network """
1438
1439         self.tcp_port_out = 3606
1440         self.udp_port_out = 3607
1441         self.icmp_id_out = 3608
1442
1443         self.nat44_add_address(self.nat_addr)
1444         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445                                       self.tcp_port_in, self.tcp_port_out,
1446                                       proto=IP_PROTOS.tcp)
1447         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448                                       self.udp_port_in, self.udp_port_out,
1449                                       proto=IP_PROTOS.udp)
1450         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451                                       self.icmp_id_in, self.icmp_id_out,
1452                                       proto=IP_PROTOS.icmp)
1453         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1454         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1455                                                   is_inside=0)
1456
1457         # in2out
1458         pkts = self.create_stream_in(self.pg0, self.pg1)
1459         self.pg0.add_stream(pkts)
1460         self.pg_enable_capture(self.pg_interfaces)
1461         self.pg_start()
1462         capture = self.pg1.get_capture(len(pkts))
1463         self.verify_capture_out(capture)
1464
1465         # out2in
1466         pkts = self.create_stream_out(self.pg1)
1467         self.pg1.add_stream(pkts)
1468         self.pg_enable_capture(self.pg_interfaces)
1469         self.pg_start()
1470         capture = self.pg0.get_capture(len(pkts))
1471         self.verify_capture_in(capture, self.pg0)
1472
1473     def test_static_with_port_out(self):
1474         """ 1:1 NAPT initialized from outside network """
1475
1476         self.tcp_port_out = 30606
1477         self.udp_port_out = 30607
1478         self.icmp_id_out = 30608
1479
1480         self.nat44_add_address(self.nat_addr)
1481         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1482                                       self.tcp_port_in, self.tcp_port_out,
1483                                       proto=IP_PROTOS.tcp)
1484         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485                                       self.udp_port_in, self.udp_port_out,
1486                                       proto=IP_PROTOS.udp)
1487         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488                                       self.icmp_id_in, self.icmp_id_out,
1489                                       proto=IP_PROTOS.icmp)
1490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1492                                                   is_inside=0)
1493
1494         # out2in
1495         pkts = self.create_stream_out(self.pg1)
1496         self.pg1.add_stream(pkts)
1497         self.pg_enable_capture(self.pg_interfaces)
1498         self.pg_start()
1499         capture = self.pg0.get_capture(len(pkts))
1500         self.verify_capture_in(capture, self.pg0)
1501
1502         # in2out
1503         pkts = self.create_stream_in(self.pg0, self.pg1)
1504         self.pg0.add_stream(pkts)
1505         self.pg_enable_capture(self.pg_interfaces)
1506         self.pg_start()
1507         capture = self.pg1.get_capture(len(pkts))
1508         self.verify_capture_out(capture)
1509
1510     def test_static_with_port_out2(self):
1511         """ 1:1 NAPT symmetrical rule """
1512
1513         external_port = 80
1514         local_port = 8080
1515
1516         self.vapi.nat44_forwarding_enable_disable(1)
1517         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1518                                       local_port, external_port,
1519                                       proto=IP_PROTOS.tcp, out2in_only=1)
1520         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1521         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1522                                                   is_inside=0)
1523
1524         # from client to service
1525         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1526              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1527              TCP(sport=12345, dport=external_port))
1528         self.pg1.add_stream(p)
1529         self.pg_enable_capture(self.pg_interfaces)
1530         self.pg_start()
1531         capture = self.pg0.get_capture(1)
1532         p = capture[0]
1533         server = None
1534         try:
1535             ip = p[IP]
1536             tcp = p[TCP]
1537             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1538             self.assertEqual(tcp.dport, local_port)
1539             self.check_tcp_checksum(p)
1540             self.check_ip_checksum(p)
1541         except:
1542             self.logger.error(ppp("Unexpected or invalid packet:", p))
1543             raise
1544
1545         # ICMP error
1546         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1547              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1548              ICMP(type=11) / capture[0][IP])
1549         self.pg0.add_stream(p)
1550         self.pg_enable_capture(self.pg_interfaces)
1551         self.pg_start()
1552         capture = self.pg1.get_capture(1)
1553         p = capture[0]
1554         try:
1555             self.assertEqual(p[IP].src, self.nat_addr)
1556             inner = p[IPerror]
1557             self.assertEqual(inner.dst, self.nat_addr)
1558             self.assertEqual(inner[TCPerror].dport, external_port)
1559         except:
1560             self.logger.error(ppp("Unexpected or invalid packet:", p))
1561             raise
1562
1563         # from service back to client
1564         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1565              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1566              TCP(sport=local_port, dport=12345))
1567         self.pg0.add_stream(p)
1568         self.pg_enable_capture(self.pg_interfaces)
1569         self.pg_start()
1570         capture = self.pg1.get_capture(1)
1571         p = capture[0]
1572         try:
1573             ip = p[IP]
1574             tcp = p[TCP]
1575             self.assertEqual(ip.src, self.nat_addr)
1576             self.assertEqual(tcp.sport, external_port)
1577             self.check_tcp_checksum(p)
1578             self.check_ip_checksum(p)
1579         except:
1580             self.logger.error(ppp("Unexpected or invalid packet:", p))
1581             raise
1582
1583         # ICMP error
1584         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1585              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1586              ICMP(type=11) / capture[0][IP])
1587         self.pg1.add_stream(p)
1588         self.pg_enable_capture(self.pg_interfaces)
1589         self.pg_start()
1590         capture = self.pg0.get_capture(1)
1591         p = capture[0]
1592         try:
1593             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1594             inner = p[IPerror]
1595             self.assertEqual(inner.src, self.pg0.remote_ip4)
1596             self.assertEqual(inner[TCPerror].sport, local_port)
1597         except:
1598             self.logger.error(ppp("Unexpected or invalid packet:", p))
1599             raise
1600
1601         # from client to server (no translation)
1602         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1603              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1604              TCP(sport=12346, dport=local_port))
1605         self.pg1.add_stream(p)
1606         self.pg_enable_capture(self.pg_interfaces)
1607         self.pg_start()
1608         capture = self.pg0.get_capture(1)
1609         p = capture[0]
1610         server = None
1611         try:
1612             ip = p[IP]
1613             tcp = p[TCP]
1614             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1615             self.assertEqual(tcp.dport, local_port)
1616             self.check_tcp_checksum(p)
1617             self.check_ip_checksum(p)
1618         except:
1619             self.logger.error(ppp("Unexpected or invalid packet:", p))
1620             raise
1621
1622         # from service back to client (no translation)
1623         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1624              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1625              TCP(sport=local_port, dport=12346))
1626         self.pg0.add_stream(p)
1627         self.pg_enable_capture(self.pg_interfaces)
1628         self.pg_start()
1629         capture = self.pg1.get_capture(1)
1630         p = capture[0]
1631         try:
1632             ip = p[IP]
1633             tcp = p[TCP]
1634             self.assertEqual(ip.src, self.pg0.remote_ip4)
1635             self.assertEqual(tcp.sport, local_port)
1636             self.check_tcp_checksum(p)
1637             self.check_ip_checksum(p)
1638         except:
1639             self.logger.error(ppp("Unexpected or invalid packet:", p))
1640             raise
1641
1642     def test_static_vrf_aware(self):
1643         """ 1:1 NAT VRF awareness """
1644
1645         nat_ip1 = "10.0.0.30"
1646         nat_ip2 = "10.0.0.40"
1647         self.tcp_port_out = 6303
1648         self.udp_port_out = 6304
1649         self.icmp_id_out = 6305
1650
1651         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1652                                       vrf_id=10)
1653         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1654                                       vrf_id=10)
1655         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1656                                                   is_inside=0)
1657         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1658         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1659
1660         # inside interface VRF match NAT44 static mapping VRF
1661         pkts = self.create_stream_in(self.pg4, self.pg3)
1662         self.pg4.add_stream(pkts)
1663         self.pg_enable_capture(self.pg_interfaces)
1664         self.pg_start()
1665         capture = self.pg3.get_capture(len(pkts))
1666         self.verify_capture_out(capture, nat_ip1, True)
1667
1668         # inside interface VRF don't match NAT44 static mapping VRF (packets
1669         # are dropped)
1670         pkts = self.create_stream_in(self.pg0, self.pg3)
1671         self.pg0.add_stream(pkts)
1672         self.pg_enable_capture(self.pg_interfaces)
1673         self.pg_start()
1674         self.pg3.assert_nothing_captured()
1675
1676     def test_identity_nat(self):
1677         """ Identity NAT """
1678
1679         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1680         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1681         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1682                                                   is_inside=0)
1683
1684         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1686              TCP(sport=12345, dport=56789))
1687         self.pg1.add_stream(p)
1688         self.pg_enable_capture(self.pg_interfaces)
1689         self.pg_start()
1690         capture = self.pg0.get_capture(1)
1691         p = capture[0]
1692         try:
1693             ip = p[IP]
1694             tcp = p[TCP]
1695             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1696             self.assertEqual(ip.src, self.pg1.remote_ip4)
1697             self.assertEqual(tcp.dport, 56789)
1698             self.assertEqual(tcp.sport, 12345)
1699             self.check_tcp_checksum(p)
1700             self.check_ip_checksum(p)
1701         except:
1702             self.logger.error(ppp("Unexpected or invalid packet:", p))
1703             raise
1704
1705     def test_static_lb(self):
1706         """ NAT44 local service load balancing """
1707         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1708         external_port = 80
1709         local_port = 8080
1710         server1 = self.pg0.remote_hosts[0]
1711         server2 = self.pg0.remote_hosts[1]
1712
1713         locals = [{'addr': server1.ip4n,
1714                    'port': local_port,
1715                    'probability': 70},
1716                   {'addr': server2.ip4n,
1717                    'port': local_port,
1718                    'probability': 30}]
1719
1720         self.nat44_add_address(self.nat_addr)
1721         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1722                                                   external_port,
1723                                                   IP_PROTOS.tcp,
1724                                                   local_num=len(locals),
1725                                                   locals=locals)
1726         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1727         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1728                                                   is_inside=0)
1729
1730         # from client to service
1731         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1732              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1733              TCP(sport=12345, dport=external_port))
1734         self.pg1.add_stream(p)
1735         self.pg_enable_capture(self.pg_interfaces)
1736         self.pg_start()
1737         capture = self.pg0.get_capture(1)
1738         p = capture[0]
1739         server = None
1740         try:
1741             ip = p[IP]
1742             tcp = p[TCP]
1743             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1744             if ip.dst == server1.ip4:
1745                 server = server1
1746             else:
1747                 server = server2
1748             self.assertEqual(tcp.dport, local_port)
1749             self.check_tcp_checksum(p)
1750             self.check_ip_checksum(p)
1751         except:
1752             self.logger.error(ppp("Unexpected or invalid packet:", p))
1753             raise
1754
1755         # from service back to client
1756         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1757              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1758              TCP(sport=local_port, dport=12345))
1759         self.pg0.add_stream(p)
1760         self.pg_enable_capture(self.pg_interfaces)
1761         self.pg_start()
1762         capture = self.pg1.get_capture(1)
1763         p = capture[0]
1764         try:
1765             ip = p[IP]
1766             tcp = p[TCP]
1767             self.assertEqual(ip.src, self.nat_addr)
1768             self.assertEqual(tcp.sport, external_port)
1769             self.check_tcp_checksum(p)
1770             self.check_ip_checksum(p)
1771         except:
1772             self.logger.error(ppp("Unexpected or invalid packet:", p))
1773             raise
1774
1775         # multiple clients
1776         server1_n = 0
1777         server2_n = 0
1778         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1779         pkts = []
1780         for client in clients:
1781             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1782                  IP(src=client, dst=self.nat_addr) /
1783                  TCP(sport=12345, dport=external_port))
1784             pkts.append(p)
1785         self.pg1.add_stream(pkts)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788         capture = self.pg0.get_capture(len(pkts))
1789         for p in capture:
1790             if p[IP].dst == server1.ip4:
1791                 server1_n += 1
1792             else:
1793                 server2_n += 1
1794         self.assertTrue(server1_n > server2_n)
1795
1796     def test_static_lb_2(self):
1797         """ NAT44 local service load balancing (asymmetrical rule) """
1798         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1799         external_port = 80
1800         local_port = 8080
1801         server1 = self.pg0.remote_hosts[0]
1802         server2 = self.pg0.remote_hosts[1]
1803
1804         locals = [{'addr': server1.ip4n,
1805                    'port': local_port,
1806                    'probability': 70},
1807                   {'addr': server2.ip4n,
1808                    'port': local_port,
1809                    'probability': 30}]
1810
1811         self.vapi.nat44_forwarding_enable_disable(1)
1812         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1813                                                   external_port,
1814                                                   IP_PROTOS.tcp,
1815                                                   out2in_only=1,
1816                                                   local_num=len(locals),
1817                                                   locals=locals)
1818         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1819         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1820                                                   is_inside=0)
1821
1822         # from client to service
1823         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1824              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1825              TCP(sport=12345, dport=external_port))
1826         self.pg1.add_stream(p)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg0.get_capture(1)
1830         p = capture[0]
1831         server = None
1832         try:
1833             ip = p[IP]
1834             tcp = p[TCP]
1835             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1836             if ip.dst == server1.ip4:
1837                 server = server1
1838             else:
1839                 server = server2
1840             self.assertEqual(tcp.dport, local_port)
1841             self.check_tcp_checksum(p)
1842             self.check_ip_checksum(p)
1843         except:
1844             self.logger.error(ppp("Unexpected or invalid packet:", p))
1845             raise
1846
1847         # from service back to client
1848         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1849              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1850              TCP(sport=local_port, dport=12345))
1851         self.pg0.add_stream(p)
1852         self.pg_enable_capture(self.pg_interfaces)
1853         self.pg_start()
1854         capture = self.pg1.get_capture(1)
1855         p = capture[0]
1856         try:
1857             ip = p[IP]
1858             tcp = p[TCP]
1859             self.assertEqual(ip.src, self.nat_addr)
1860             self.assertEqual(tcp.sport, external_port)
1861             self.check_tcp_checksum(p)
1862             self.check_ip_checksum(p)
1863         except:
1864             self.logger.error(ppp("Unexpected or invalid packet:", p))
1865             raise
1866
1867         # from client to server (no translation)
1868         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1869              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1870              TCP(sport=12346, dport=local_port))
1871         self.pg1.add_stream(p)
1872         self.pg_enable_capture(self.pg_interfaces)
1873         self.pg_start()
1874         capture = self.pg0.get_capture(1)
1875         p = capture[0]
1876         server = None
1877         try:
1878             ip = p[IP]
1879             tcp = p[TCP]
1880             self.assertEqual(ip.dst, server1.ip4)
1881             self.assertEqual(tcp.dport, local_port)
1882             self.check_tcp_checksum(p)
1883             self.check_ip_checksum(p)
1884         except:
1885             self.logger.error(ppp("Unexpected or invalid packet:", p))
1886             raise
1887
1888         # from service back to client (no translation)
1889         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1890              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1891              TCP(sport=local_port, dport=12346))
1892         self.pg0.add_stream(p)
1893         self.pg_enable_capture(self.pg_interfaces)
1894         self.pg_start()
1895         capture = self.pg1.get_capture(1)
1896         p = capture[0]
1897         try:
1898             ip = p[IP]
1899             tcp = p[TCP]
1900             self.assertEqual(ip.src, server1.ip4)
1901             self.assertEqual(tcp.sport, local_port)
1902             self.check_tcp_checksum(p)
1903             self.check_ip_checksum(p)
1904         except:
1905             self.logger.error(ppp("Unexpected or invalid packet:", p))
1906             raise
1907
1908     def test_multiple_inside_interfaces(self):
1909         """ NAT44 multiple non-overlapping address space inside interfaces """
1910
1911         self.nat44_add_address(self.nat_addr)
1912         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1913         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1914         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1915                                                   is_inside=0)
1916
1917         # between two NAT44 inside interfaces (no translation)
1918         pkts = self.create_stream_in(self.pg0, self.pg1)
1919         self.pg0.add_stream(pkts)
1920         self.pg_enable_capture(self.pg_interfaces)
1921         self.pg_start()
1922         capture = self.pg1.get_capture(len(pkts))
1923         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1924
1925         # from NAT44 inside to interface without NAT44 feature (no translation)
1926         pkts = self.create_stream_in(self.pg0, self.pg2)
1927         self.pg0.add_stream(pkts)
1928         self.pg_enable_capture(self.pg_interfaces)
1929         self.pg_start()
1930         capture = self.pg2.get_capture(len(pkts))
1931         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1932
1933         # in2out 1st interface
1934         pkts = self.create_stream_in(self.pg0, self.pg3)
1935         self.pg0.add_stream(pkts)
1936         self.pg_enable_capture(self.pg_interfaces)
1937         self.pg_start()
1938         capture = self.pg3.get_capture(len(pkts))
1939         self.verify_capture_out(capture)
1940
1941         # out2in 1st interface
1942         pkts = self.create_stream_out(self.pg3)
1943         self.pg3.add_stream(pkts)
1944         self.pg_enable_capture(self.pg_interfaces)
1945         self.pg_start()
1946         capture = self.pg0.get_capture(len(pkts))
1947         self.verify_capture_in(capture, self.pg0)
1948
1949         # in2out 2nd interface
1950         pkts = self.create_stream_in(self.pg1, self.pg3)
1951         self.pg1.add_stream(pkts)
1952         self.pg_enable_capture(self.pg_interfaces)
1953         self.pg_start()
1954         capture = self.pg3.get_capture(len(pkts))
1955         self.verify_capture_out(capture)
1956
1957         # out2in 2nd interface
1958         pkts = self.create_stream_out(self.pg3)
1959         self.pg3.add_stream(pkts)
1960         self.pg_enable_capture(self.pg_interfaces)
1961         self.pg_start()
1962         capture = self.pg1.get_capture(len(pkts))
1963         self.verify_capture_in(capture, self.pg1)
1964
1965     def test_inside_overlapping_interfaces(self):
1966         """ NAT44 multiple inside interfaces with overlapping address space """
1967
1968         static_nat_ip = "10.0.0.10"
1969         self.nat44_add_address(self.nat_addr)
1970         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1971                                                   is_inside=0)
1972         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1973         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1974         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1975         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1976                                       vrf_id=20)
1977
1978         # between NAT44 inside interfaces with same VRF (no translation)
1979         pkts = self.create_stream_in(self.pg4, self.pg5)
1980         self.pg4.add_stream(pkts)
1981         self.pg_enable_capture(self.pg_interfaces)
1982         self.pg_start()
1983         capture = self.pg5.get_capture(len(pkts))
1984         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1985
1986         # between NAT44 inside interfaces with different VRF (hairpinning)
1987         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1988              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1989              TCP(sport=1234, dport=5678))
1990         self.pg4.add_stream(p)
1991         self.pg_enable_capture(self.pg_interfaces)
1992         self.pg_start()
1993         capture = self.pg6.get_capture(1)
1994         p = capture[0]
1995         try:
1996             ip = p[IP]
1997             tcp = p[TCP]
1998             self.assertEqual(ip.src, self.nat_addr)
1999             self.assertEqual(ip.dst, self.pg6.remote_ip4)
2000             self.assertNotEqual(tcp.sport, 1234)
2001             self.assertEqual(tcp.dport, 5678)
2002         except:
2003             self.logger.error(ppp("Unexpected or invalid packet:", p))
2004             raise
2005
2006         # in2out 1st interface
2007         pkts = self.create_stream_in(self.pg4, self.pg3)
2008         self.pg4.add_stream(pkts)
2009         self.pg_enable_capture(self.pg_interfaces)
2010         self.pg_start()
2011         capture = self.pg3.get_capture(len(pkts))
2012         self.verify_capture_out(capture)
2013
2014         # out2in 1st interface
2015         pkts = self.create_stream_out(self.pg3)
2016         self.pg3.add_stream(pkts)
2017         self.pg_enable_capture(self.pg_interfaces)
2018         self.pg_start()
2019         capture = self.pg4.get_capture(len(pkts))
2020         self.verify_capture_in(capture, self.pg4)
2021
2022         # in2out 2nd interface
2023         pkts = self.create_stream_in(self.pg5, self.pg3)
2024         self.pg5.add_stream(pkts)
2025         self.pg_enable_capture(self.pg_interfaces)
2026         self.pg_start()
2027         capture = self.pg3.get_capture(len(pkts))
2028         self.verify_capture_out(capture)
2029
2030         # out2in 2nd interface
2031         pkts = self.create_stream_out(self.pg3)
2032         self.pg3.add_stream(pkts)
2033         self.pg_enable_capture(self.pg_interfaces)
2034         self.pg_start()
2035         capture = self.pg5.get_capture(len(pkts))
2036         self.verify_capture_in(capture, self.pg5)
2037
2038         # pg5 session dump
2039         addresses = self.vapi.nat44_address_dump()
2040         self.assertEqual(len(addresses), 1)
2041         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2042         self.assertEqual(len(sessions), 3)
2043         for session in sessions:
2044             self.assertFalse(session.is_static)
2045             self.assertEqual(session.inside_ip_address[0:4],
2046                              self.pg5.remote_ip4n)
2047             self.assertEqual(session.outside_ip_address,
2048                              addresses[0].ip_address)
2049         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2050         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2051         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2052         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2053         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2054         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2055         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2056         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2057         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2058
2059         # in2out 3rd interface
2060         pkts = self.create_stream_in(self.pg6, self.pg3)
2061         self.pg6.add_stream(pkts)
2062         self.pg_enable_capture(self.pg_interfaces)
2063         self.pg_start()
2064         capture = self.pg3.get_capture(len(pkts))
2065         self.verify_capture_out(capture, static_nat_ip, True)
2066
2067         # out2in 3rd interface
2068         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2069         self.pg3.add_stream(pkts)
2070         self.pg_enable_capture(self.pg_interfaces)
2071         self.pg_start()
2072         capture = self.pg6.get_capture(len(pkts))
2073         self.verify_capture_in(capture, self.pg6)
2074
2075         # general user and session dump verifications
2076         users = self.vapi.nat44_user_dump()
2077         self.assertTrue(len(users) >= 3)
2078         addresses = self.vapi.nat44_address_dump()
2079         self.assertEqual(len(addresses), 1)
2080         for user in users:
2081             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2082                                                          user.vrf_id)
2083             for session in sessions:
2084                 self.assertEqual(user.ip_address, session.inside_ip_address)
2085                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2086                 self.assertTrue(session.protocol in
2087                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2088                                  IP_PROTOS.icmp])
2089
2090         # pg4 session dump
2091         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2092         self.assertTrue(len(sessions) >= 4)
2093         for session in sessions:
2094             self.assertFalse(session.is_static)
2095             self.assertEqual(session.inside_ip_address[0:4],
2096                              self.pg4.remote_ip4n)
2097             self.assertEqual(session.outside_ip_address,
2098                              addresses[0].ip_address)
2099
2100         # pg6 session dump
2101         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2102         self.assertTrue(len(sessions) >= 3)
2103         for session in sessions:
2104             self.assertTrue(session.is_static)
2105             self.assertEqual(session.inside_ip_address[0:4],
2106                              self.pg6.remote_ip4n)
2107             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2108                              map(int, static_nat_ip.split('.')))
2109             self.assertTrue(session.inside_port in
2110                             [self.tcp_port_in, self.udp_port_in,
2111                              self.icmp_id_in])
2112
2113     def test_hairpinning(self):
2114         """ NAT44 hairpinning - 1:1 NAPT """
2115
2116         host = self.pg0.remote_hosts[0]
2117         server = self.pg0.remote_hosts[1]
2118         host_in_port = 1234
2119         host_out_port = 0
2120         server_in_port = 5678
2121         server_out_port = 8765
2122
2123         self.nat44_add_address(self.nat_addr)
2124         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2125         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2126                                                   is_inside=0)
2127         # add static mapping for server
2128         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2129                                       server_in_port, server_out_port,
2130                                       proto=IP_PROTOS.tcp)
2131
2132         # send packet from host to server
2133         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2134              IP(src=host.ip4, dst=self.nat_addr) /
2135              TCP(sport=host_in_port, dport=server_out_port))
2136         self.pg0.add_stream(p)
2137         self.pg_enable_capture(self.pg_interfaces)
2138         self.pg_start()
2139         capture = self.pg0.get_capture(1)
2140         p = capture[0]
2141         try:
2142             ip = p[IP]
2143             tcp = p[TCP]
2144             self.assertEqual(ip.src, self.nat_addr)
2145             self.assertEqual(ip.dst, server.ip4)
2146             self.assertNotEqual(tcp.sport, host_in_port)
2147             self.assertEqual(tcp.dport, server_in_port)
2148             self.check_tcp_checksum(p)
2149             host_out_port = tcp.sport
2150         except:
2151             self.logger.error(ppp("Unexpected or invalid packet:", p))
2152             raise
2153
2154         # send reply from server to host
2155         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2156              IP(src=server.ip4, dst=self.nat_addr) /
2157              TCP(sport=server_in_port, dport=host_out_port))
2158         self.pg0.add_stream(p)
2159         self.pg_enable_capture(self.pg_interfaces)
2160         self.pg_start()
2161         capture = self.pg0.get_capture(1)
2162         p = capture[0]
2163         try:
2164             ip = p[IP]
2165             tcp = p[TCP]
2166             self.assertEqual(ip.src, self.nat_addr)
2167             self.assertEqual(ip.dst, host.ip4)
2168             self.assertEqual(tcp.sport, server_out_port)
2169             self.assertEqual(tcp.dport, host_in_port)
2170             self.check_tcp_checksum(p)
2171         except:
2172             self.logger.error(ppp("Unexpected or invalid packet:", p))
2173             raise
2174
2175     def test_hairpinning2(self):
2176         """ NAT44 hairpinning - 1:1 NAT"""
2177
2178         server1_nat_ip = "10.0.0.10"
2179         server2_nat_ip = "10.0.0.11"
2180         host = self.pg0.remote_hosts[0]
2181         server1 = self.pg0.remote_hosts[1]
2182         server2 = self.pg0.remote_hosts[2]
2183         server_tcp_port = 22
2184         server_udp_port = 20
2185
2186         self.nat44_add_address(self.nat_addr)
2187         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2188         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2189                                                   is_inside=0)
2190
2191         # add static mapping for servers
2192         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2193         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2194
2195         # host to server1
2196         pkts = []
2197         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2198              IP(src=host.ip4, dst=server1_nat_ip) /
2199              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2200         pkts.append(p)
2201         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2202              IP(src=host.ip4, dst=server1_nat_ip) /
2203              UDP(sport=self.udp_port_in, dport=server_udp_port))
2204         pkts.append(p)
2205         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2206              IP(src=host.ip4, dst=server1_nat_ip) /
2207              ICMP(id=self.icmp_id_in, type='echo-request'))
2208         pkts.append(p)
2209         self.pg0.add_stream(pkts)
2210         self.pg_enable_capture(self.pg_interfaces)
2211         self.pg_start()
2212         capture = self.pg0.get_capture(len(pkts))
2213         for packet in capture:
2214             try:
2215                 self.assertEqual(packet[IP].src, self.nat_addr)
2216                 self.assertEqual(packet[IP].dst, server1.ip4)
2217                 if packet.haslayer(TCP):
2218                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2219                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2220                     self.tcp_port_out = packet[TCP].sport
2221                     self.check_tcp_checksum(packet)
2222                 elif packet.haslayer(UDP):
2223                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2224                     self.assertEqual(packet[UDP].dport, server_udp_port)
2225                     self.udp_port_out = packet[UDP].sport
2226                 else:
2227                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2228                     self.icmp_id_out = packet[ICMP].id
2229             except:
2230                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231                 raise
2232
2233         # server1 to host
2234         pkts = []
2235         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236              IP(src=server1.ip4, dst=self.nat_addr) /
2237              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2238         pkts.append(p)
2239         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240              IP(src=server1.ip4, dst=self.nat_addr) /
2241              UDP(sport=server_udp_port, dport=self.udp_port_out))
2242         pkts.append(p)
2243         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2244              IP(src=server1.ip4, dst=self.nat_addr) /
2245              ICMP(id=self.icmp_id_out, type='echo-reply'))
2246         pkts.append(p)
2247         self.pg0.add_stream(pkts)
2248         self.pg_enable_capture(self.pg_interfaces)
2249         self.pg_start()
2250         capture = self.pg0.get_capture(len(pkts))
2251         for packet in capture:
2252             try:
2253                 self.assertEqual(packet[IP].src, server1_nat_ip)
2254                 self.assertEqual(packet[IP].dst, host.ip4)
2255                 if packet.haslayer(TCP):
2256                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2257                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2258                     self.check_tcp_checksum(packet)
2259                 elif packet.haslayer(UDP):
2260                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2261                     self.assertEqual(packet[UDP].sport, server_udp_port)
2262                 else:
2263                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2264             except:
2265                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2266                 raise
2267
2268         # server2 to server1
2269         pkts = []
2270         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2271              IP(src=server2.ip4, dst=server1_nat_ip) /
2272              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2273         pkts.append(p)
2274         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2275              IP(src=server2.ip4, dst=server1_nat_ip) /
2276              UDP(sport=self.udp_port_in, dport=server_udp_port))
2277         pkts.append(p)
2278         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2279              IP(src=server2.ip4, dst=server1_nat_ip) /
2280              ICMP(id=self.icmp_id_in, type='echo-request'))
2281         pkts.append(p)
2282         self.pg0.add_stream(pkts)
2283         self.pg_enable_capture(self.pg_interfaces)
2284         self.pg_start()
2285         capture = self.pg0.get_capture(len(pkts))
2286         for packet in capture:
2287             try:
2288                 self.assertEqual(packet[IP].src, server2_nat_ip)
2289                 self.assertEqual(packet[IP].dst, server1.ip4)
2290                 if packet.haslayer(TCP):
2291                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2292                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2293                     self.tcp_port_out = packet[TCP].sport
2294                     self.check_tcp_checksum(packet)
2295                 elif packet.haslayer(UDP):
2296                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2297                     self.assertEqual(packet[UDP].dport, server_udp_port)
2298                     self.udp_port_out = packet[UDP].sport
2299                 else:
2300                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2301                     self.icmp_id_out = packet[ICMP].id
2302             except:
2303                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2304                 raise
2305
2306         # server1 to server2
2307         pkts = []
2308         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309              IP(src=server1.ip4, dst=server2_nat_ip) /
2310              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2311         pkts.append(p)
2312         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313              IP(src=server1.ip4, dst=server2_nat_ip) /
2314              UDP(sport=server_udp_port, dport=self.udp_port_out))
2315         pkts.append(p)
2316         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2317              IP(src=server1.ip4, dst=server2_nat_ip) /
2318              ICMP(id=self.icmp_id_out, type='echo-reply'))
2319         pkts.append(p)
2320         self.pg0.add_stream(pkts)
2321         self.pg_enable_capture(self.pg_interfaces)
2322         self.pg_start()
2323         capture = self.pg0.get_capture(len(pkts))
2324         for packet in capture:
2325             try:
2326                 self.assertEqual(packet[IP].src, server1_nat_ip)
2327                 self.assertEqual(packet[IP].dst, server2.ip4)
2328                 if packet.haslayer(TCP):
2329                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2330                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2331                     self.check_tcp_checksum(packet)
2332                 elif packet.haslayer(UDP):
2333                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2334                     self.assertEqual(packet[UDP].sport, server_udp_port)
2335                 else:
2336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2337             except:
2338                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2339                 raise
2340
2341     def test_max_translations_per_user(self):
2342         """ MAX translations per user - recycle the least recently used """
2343
2344         self.nat44_add_address(self.nat_addr)
2345         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2346         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2347                                                   is_inside=0)
2348
2349         # get maximum number of translations per user
2350         nat44_config = self.vapi.nat_show_config()
2351
2352         # send more than maximum number of translations per user packets
2353         pkts_num = nat44_config.max_translations_per_user + 5
2354         pkts = []
2355         for port in range(0, pkts_num):
2356             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2357                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2358                  TCP(sport=1025 + port))
2359             pkts.append(p)
2360         self.pg0.add_stream(pkts)
2361         self.pg_enable_capture(self.pg_interfaces)
2362         self.pg_start()
2363
2364         # verify number of translated packet
2365         self.pg1.get_capture(pkts_num)
2366
2367     def test_interface_addr(self):
2368         """ Acquire NAT44 addresses from interface """
2369         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370
2371         # no address in NAT pool
2372         adresses = self.vapi.nat44_address_dump()
2373         self.assertEqual(0, len(adresses))
2374
2375         # configure interface address and check NAT address pool
2376         self.pg7.config_ip4()
2377         adresses = self.vapi.nat44_address_dump()
2378         self.assertEqual(1, len(adresses))
2379         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2380
2381         # remove interface address and check NAT address pool
2382         self.pg7.unconfig_ip4()
2383         adresses = self.vapi.nat44_address_dump()
2384         self.assertEqual(0, len(adresses))
2385
2386     def test_interface_addr_static_mapping(self):
2387         """ Static mapping with addresses from interface """
2388         tag = "testTAG"
2389
2390         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2391         self.nat44_add_static_mapping(
2392             '1.2.3.4',
2393             external_sw_if_index=self.pg7.sw_if_index,
2394             tag=tag)
2395
2396         # static mappings with external interface
2397         static_mappings = self.vapi.nat44_static_mapping_dump()
2398         self.assertEqual(1, len(static_mappings))
2399         self.assertEqual(self.pg7.sw_if_index,
2400                          static_mappings[0].external_sw_if_index)
2401         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2402
2403         # configure interface address and check static mappings
2404         self.pg7.config_ip4()
2405         static_mappings = self.vapi.nat44_static_mapping_dump()
2406         self.assertEqual(1, len(static_mappings))
2407         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2408                          self.pg7.local_ip4n)
2409         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2410         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2411
2412         # remove interface address and check static mappings
2413         self.pg7.unconfig_ip4()
2414         static_mappings = self.vapi.nat44_static_mapping_dump()
2415         self.assertEqual(0, len(static_mappings))
2416
2417     def test_interface_addr_identity_nat(self):
2418         """ Identity NAT with addresses from interface """
2419
2420         port = 53053
2421         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2422         self.vapi.nat44_add_del_identity_mapping(
2423             sw_if_index=self.pg7.sw_if_index,
2424             port=port,
2425             protocol=IP_PROTOS.tcp,
2426             addr_only=0)
2427
2428         # identity mappings with external interface
2429         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2430         self.assertEqual(1, len(identity_mappings))
2431         self.assertEqual(self.pg7.sw_if_index,
2432                          identity_mappings[0].sw_if_index)
2433
2434         # configure interface address and check identity mappings
2435         self.pg7.config_ip4()
2436         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2437         self.assertEqual(1, len(identity_mappings))
2438         self.assertEqual(identity_mappings[0].ip_address,
2439                          self.pg7.local_ip4n)
2440         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2441         self.assertEqual(port, identity_mappings[0].port)
2442         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2443
2444         # remove interface address and check identity mappings
2445         self.pg7.unconfig_ip4()
2446         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2447         self.assertEqual(0, len(identity_mappings))
2448
2449     def test_ipfix_nat44_sess(self):
2450         """ IPFIX logging NAT44 session created/delted """
2451         self.ipfix_domain_id = 10
2452         self.ipfix_src_port = 20202
2453         colector_port = 30303
2454         bind_layers(UDP, IPFIX, dport=30303)
2455         self.nat44_add_address(self.nat_addr)
2456         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2457         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2458                                                   is_inside=0)
2459         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2460                                      src_address=self.pg3.local_ip4n,
2461                                      path_mtu=512,
2462                                      template_interval=10,
2463                                      collector_port=colector_port)
2464         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2465                             src_port=self.ipfix_src_port)
2466
2467         pkts = self.create_stream_in(self.pg0, self.pg1)
2468         self.pg0.add_stream(pkts)
2469         self.pg_enable_capture(self.pg_interfaces)
2470         self.pg_start()
2471         capture = self.pg1.get_capture(len(pkts))
2472         self.verify_capture_out(capture)
2473         self.nat44_add_address(self.nat_addr, is_add=0)
2474         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2475         capture = self.pg3.get_capture(9)
2476         ipfix = IPFIXDecoder()
2477         # first load template
2478         for p in capture:
2479             self.assertTrue(p.haslayer(IPFIX))
2480             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2481             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2482             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2483             self.assertEqual(p[UDP].dport, colector_port)
2484             self.assertEqual(p[IPFIX].observationDomainID,
2485                              self.ipfix_domain_id)
2486             if p.haslayer(Template):
2487                 ipfix.add_template(p.getlayer(Template))
2488         # verify events in data set
2489         for p in capture:
2490             if p.haslayer(Data):
2491                 data = ipfix.decode_data_set(p.getlayer(Set))
2492                 self.verify_ipfix_nat44_ses(data)
2493
2494     def test_ipfix_addr_exhausted(self):
2495         """ IPFIX logging NAT addresses exhausted """
2496         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2497         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2498                                                   is_inside=0)
2499         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2500                                      src_address=self.pg3.local_ip4n,
2501                                      path_mtu=512,
2502                                      template_interval=10)
2503         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2504                             src_port=self.ipfix_src_port)
2505
2506         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2507              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2508              TCP(sport=3025))
2509         self.pg0.add_stream(p)
2510         self.pg_enable_capture(self.pg_interfaces)
2511         self.pg_start()
2512         capture = self.pg1.get_capture(0)
2513         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2514         capture = self.pg3.get_capture(9)
2515         ipfix = IPFIXDecoder()
2516         # first load template
2517         for p in capture:
2518             self.assertTrue(p.haslayer(IPFIX))
2519             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2520             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2521             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2522             self.assertEqual(p[UDP].dport, 4739)
2523             self.assertEqual(p[IPFIX].observationDomainID,
2524                              self.ipfix_domain_id)
2525             if p.haslayer(Template):
2526                 ipfix.add_template(p.getlayer(Template))
2527         # verify events in data set
2528         for p in capture:
2529             if p.haslayer(Data):
2530                 data = ipfix.decode_data_set(p.getlayer(Set))
2531                 self.verify_ipfix_addr_exhausted(data)
2532
2533     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2534     def test_ipfix_max_sessions(self):
2535         """ IPFIX logging maximum session entries exceeded """
2536         self.nat44_add_address(self.nat_addr)
2537         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2538         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2539                                                   is_inside=0)
2540
2541         nat44_config = self.vapi.nat_show_config()
2542         max_sessions = 10 * nat44_config.translation_buckets
2543
2544         pkts = []
2545         for i in range(0, max_sessions):
2546             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2547             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2548                  IP(src=src, dst=self.pg1.remote_ip4) /
2549                  TCP(sport=1025))
2550             pkts.append(p)
2551         self.pg0.add_stream(pkts)
2552         self.pg_enable_capture(self.pg_interfaces)
2553         self.pg_start()
2554
2555         self.pg1.get_capture(max_sessions)
2556         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2557                                      src_address=self.pg3.local_ip4n,
2558                                      path_mtu=512,
2559                                      template_interval=10)
2560         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2561                             src_port=self.ipfix_src_port)
2562
2563         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2564              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2565              TCP(sport=1025))
2566         self.pg0.add_stream(p)
2567         self.pg_enable_capture(self.pg_interfaces)
2568         self.pg_start()
2569         self.pg1.get_capture(0)
2570         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2571         capture = self.pg3.get_capture(9)
2572         ipfix = IPFIXDecoder()
2573         # first load template
2574         for p in capture:
2575             self.assertTrue(p.haslayer(IPFIX))
2576             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2577             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2578             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2579             self.assertEqual(p[UDP].dport, 4739)
2580             self.assertEqual(p[IPFIX].observationDomainID,
2581                              self.ipfix_domain_id)
2582             if p.haslayer(Template):
2583                 ipfix.add_template(p.getlayer(Template))
2584         # verify events in data set
2585         for p in capture:
2586             if p.haslayer(Data):
2587                 data = ipfix.decode_data_set(p.getlayer(Set))
2588                 self.verify_ipfix_max_sessions(data, max_sessions)
2589
2590     def test_pool_addr_fib(self):
2591         """ NAT44 add pool addresses to FIB """
2592         static_addr = '10.0.0.10'
2593         self.nat44_add_address(self.nat_addr)
2594         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2595         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2596                                                   is_inside=0)
2597         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2598
2599         # NAT44 address
2600         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2601              ARP(op=ARP.who_has, pdst=self.nat_addr,
2602                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2603         self.pg1.add_stream(p)
2604         self.pg_enable_capture(self.pg_interfaces)
2605         self.pg_start()
2606         capture = self.pg1.get_capture(1)
2607         self.assertTrue(capture[0].haslayer(ARP))
2608         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2609
2610         # 1:1 NAT address
2611         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2612              ARP(op=ARP.who_has, pdst=static_addr,
2613                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2614         self.pg1.add_stream(p)
2615         self.pg_enable_capture(self.pg_interfaces)
2616         self.pg_start()
2617         capture = self.pg1.get_capture(1)
2618         self.assertTrue(capture[0].haslayer(ARP))
2619         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2620
2621         # send ARP to non-NAT44 interface
2622         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2623              ARP(op=ARP.who_has, pdst=self.nat_addr,
2624                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2625         self.pg2.add_stream(p)
2626         self.pg_enable_capture(self.pg_interfaces)
2627         self.pg_start()
2628         capture = self.pg1.get_capture(0)
2629
2630         # remove addresses and verify
2631         self.nat44_add_address(self.nat_addr, is_add=0)
2632         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2633                                       is_add=0)
2634
2635         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2636              ARP(op=ARP.who_has, pdst=self.nat_addr,
2637                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2638         self.pg1.add_stream(p)
2639         self.pg_enable_capture(self.pg_interfaces)
2640         self.pg_start()
2641         capture = self.pg1.get_capture(0)
2642
2643         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2644              ARP(op=ARP.who_has, pdst=static_addr,
2645                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2646         self.pg1.add_stream(p)
2647         self.pg_enable_capture(self.pg_interfaces)
2648         self.pg_start()
2649         capture = self.pg1.get_capture(0)
2650
2651     def test_vrf_mode(self):
2652         """ NAT44 tenant VRF aware address pool mode """
2653
2654         vrf_id1 = 1
2655         vrf_id2 = 2
2656         nat_ip1 = "10.0.0.10"
2657         nat_ip2 = "10.0.0.11"
2658
2659         self.pg0.unconfig_ip4()
2660         self.pg1.unconfig_ip4()
2661         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2662         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2663         self.pg0.set_table_ip4(vrf_id1)
2664         self.pg1.set_table_ip4(vrf_id2)
2665         self.pg0.config_ip4()
2666         self.pg1.config_ip4()
2667
2668         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2669         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2670         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2671         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2672         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2673                                                   is_inside=0)
2674
2675         # first VRF
2676         pkts = self.create_stream_in(self.pg0, self.pg2)
2677         self.pg0.add_stream(pkts)
2678         self.pg_enable_capture(self.pg_interfaces)
2679         self.pg_start()
2680         capture = self.pg2.get_capture(len(pkts))
2681         self.verify_capture_out(capture, nat_ip1)
2682
2683         # second VRF
2684         pkts = self.create_stream_in(self.pg1, self.pg2)
2685         self.pg1.add_stream(pkts)
2686         self.pg_enable_capture(self.pg_interfaces)
2687         self.pg_start()
2688         capture = self.pg2.get_capture(len(pkts))
2689         self.verify_capture_out(capture, nat_ip2)
2690
2691         self.pg0.unconfig_ip4()
2692         self.pg1.unconfig_ip4()
2693         self.pg0.set_table_ip4(0)
2694         self.pg1.set_table_ip4(0)
2695         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2696         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2697
2698     def test_vrf_feature_independent(self):
2699         """ NAT44 tenant VRF independent address pool mode """
2700
2701         nat_ip1 = "10.0.0.10"
2702         nat_ip2 = "10.0.0.11"
2703
2704         self.nat44_add_address(nat_ip1)
2705         self.nat44_add_address(nat_ip2, vrf_id=99)
2706         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2707         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2708         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2709                                                   is_inside=0)
2710
2711         # first VRF
2712         pkts = self.create_stream_in(self.pg0, self.pg2)
2713         self.pg0.add_stream(pkts)
2714         self.pg_enable_capture(self.pg_interfaces)
2715         self.pg_start()
2716         capture = self.pg2.get_capture(len(pkts))
2717         self.verify_capture_out(capture, nat_ip1)
2718
2719         # second VRF
2720         pkts = self.create_stream_in(self.pg1, self.pg2)
2721         self.pg1.add_stream(pkts)
2722         self.pg_enable_capture(self.pg_interfaces)
2723         self.pg_start()
2724         capture = self.pg2.get_capture(len(pkts))
2725         self.verify_capture_out(capture, nat_ip1)
2726
2727     def test_dynamic_ipless_interfaces(self):
2728         """ NAT44 interfaces without configured IP address """
2729
2730         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2731                                       mactobinary(self.pg7.remote_mac),
2732                                       self.pg7.remote_ip4n,
2733                                       is_static=1)
2734         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2735                                       mactobinary(self.pg8.remote_mac),
2736                                       self.pg8.remote_ip4n,
2737                                       is_static=1)
2738
2739         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2740                                    dst_address_length=32,
2741                                    next_hop_address=self.pg7.remote_ip4n,
2742                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2743         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2744                                    dst_address_length=32,
2745                                    next_hop_address=self.pg8.remote_ip4n,
2746                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2747
2748         self.nat44_add_address(self.nat_addr)
2749         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2750         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2751                                                   is_inside=0)
2752
2753         # in2out
2754         pkts = self.create_stream_in(self.pg7, self.pg8)
2755         self.pg7.add_stream(pkts)
2756         self.pg_enable_capture(self.pg_interfaces)
2757         self.pg_start()
2758         capture = self.pg8.get_capture(len(pkts))
2759         self.verify_capture_out(capture)
2760
2761         # out2in
2762         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2763         self.pg8.add_stream(pkts)
2764         self.pg_enable_capture(self.pg_interfaces)
2765         self.pg_start()
2766         capture = self.pg7.get_capture(len(pkts))
2767         self.verify_capture_in(capture, self.pg7)
2768
2769     def test_static_ipless_interfaces(self):
2770         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2771
2772         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2773                                       mactobinary(self.pg7.remote_mac),
2774                                       self.pg7.remote_ip4n,
2775                                       is_static=1)
2776         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2777                                       mactobinary(self.pg8.remote_mac),
2778                                       self.pg8.remote_ip4n,
2779                                       is_static=1)
2780
2781         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2782                                    dst_address_length=32,
2783                                    next_hop_address=self.pg7.remote_ip4n,
2784                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2785         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2786                                    dst_address_length=32,
2787                                    next_hop_address=self.pg8.remote_ip4n,
2788                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2789
2790         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2791         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2792         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2793                                                   is_inside=0)
2794
2795         # out2in
2796         pkts = self.create_stream_out(self.pg8)
2797         self.pg8.add_stream(pkts)
2798         self.pg_enable_capture(self.pg_interfaces)
2799         self.pg_start()
2800         capture = self.pg7.get_capture(len(pkts))
2801         self.verify_capture_in(capture, self.pg7)
2802
2803         # in2out
2804         pkts = self.create_stream_in(self.pg7, self.pg8)
2805         self.pg7.add_stream(pkts)
2806         self.pg_enable_capture(self.pg_interfaces)
2807         self.pg_start()
2808         capture = self.pg8.get_capture(len(pkts))
2809         self.verify_capture_out(capture, self.nat_addr, True)
2810
2811     def test_static_with_port_ipless_interfaces(self):
2812         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2813
2814         self.tcp_port_out = 30606
2815         self.udp_port_out = 30607
2816         self.icmp_id_out = 30608
2817
2818         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2819                                       mactobinary(self.pg7.remote_mac),
2820                                       self.pg7.remote_ip4n,
2821                                       is_static=1)
2822         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2823                                       mactobinary(self.pg8.remote_mac),
2824                                       self.pg8.remote_ip4n,
2825                                       is_static=1)
2826
2827         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2828                                    dst_address_length=32,
2829                                    next_hop_address=self.pg7.remote_ip4n,
2830                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2831         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2832                                    dst_address_length=32,
2833                                    next_hop_address=self.pg8.remote_ip4n,
2834                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2835
2836         self.nat44_add_address(self.nat_addr)
2837         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2838                                       self.tcp_port_in, self.tcp_port_out,
2839                                       proto=IP_PROTOS.tcp)
2840         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2841                                       self.udp_port_in, self.udp_port_out,
2842                                       proto=IP_PROTOS.udp)
2843         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2844                                       self.icmp_id_in, self.icmp_id_out,
2845                                       proto=IP_PROTOS.icmp)
2846         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2847         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2848                                                   is_inside=0)
2849
2850         # out2in
2851         pkts = self.create_stream_out(self.pg8)
2852         self.pg8.add_stream(pkts)
2853         self.pg_enable_capture(self.pg_interfaces)
2854         self.pg_start()
2855         capture = self.pg7.get_capture(len(pkts))
2856         self.verify_capture_in(capture, self.pg7)
2857
2858         # in2out
2859         pkts = self.create_stream_in(self.pg7, self.pg8)
2860         self.pg7.add_stream(pkts)
2861         self.pg_enable_capture(self.pg_interfaces)
2862         self.pg_start()
2863         capture = self.pg8.get_capture(len(pkts))
2864         self.verify_capture_out(capture)
2865
2866     def test_static_unknown_proto(self):
2867         """ 1:1 NAT translate packet with unknown protocol """
2868         nat_ip = "10.0.0.10"
2869         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2870         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2871         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2872                                                   is_inside=0)
2873
2874         # in2out
2875         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2876              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2877              GRE() /
2878              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2879              TCP(sport=1234, dport=1234))
2880         self.pg0.add_stream(p)
2881         self.pg_enable_capture(self.pg_interfaces)
2882         self.pg_start()
2883         p = self.pg1.get_capture(1)
2884         packet = p[0]
2885         try:
2886             self.assertEqual(packet[IP].src, nat_ip)
2887             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2888             self.assertTrue(packet.haslayer(GRE))
2889             self.check_ip_checksum(packet)
2890         except:
2891             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2892             raise
2893
2894         # out2in
2895         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2896              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2897              GRE() /
2898              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2899              TCP(sport=1234, dport=1234))
2900         self.pg1.add_stream(p)
2901         self.pg_enable_capture(self.pg_interfaces)
2902         self.pg_start()
2903         p = self.pg0.get_capture(1)
2904         packet = p[0]
2905         try:
2906             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2907             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2908             self.assertTrue(packet.haslayer(GRE))
2909             self.check_ip_checksum(packet)
2910         except:
2911             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2912             raise
2913
2914     def test_hairpinning_static_unknown_proto(self):
2915         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2916
2917         host = self.pg0.remote_hosts[0]
2918         server = self.pg0.remote_hosts[1]
2919
2920         host_nat_ip = "10.0.0.10"
2921         server_nat_ip = "10.0.0.11"
2922
2923         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2924         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2925         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2926         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2927                                                   is_inside=0)
2928
2929         # host to server
2930         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2931              IP(src=host.ip4, dst=server_nat_ip) /
2932              GRE() /
2933              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2934              TCP(sport=1234, dport=1234))
2935         self.pg0.add_stream(p)
2936         self.pg_enable_capture(self.pg_interfaces)
2937         self.pg_start()
2938         p = self.pg0.get_capture(1)
2939         packet = p[0]
2940         try:
2941             self.assertEqual(packet[IP].src, host_nat_ip)
2942             self.assertEqual(packet[IP].dst, server.ip4)
2943             self.assertTrue(packet.haslayer(GRE))
2944             self.check_ip_checksum(packet)
2945         except:
2946             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2947             raise
2948
2949         # server to host
2950         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2951              IP(src=server.ip4, dst=host_nat_ip) /
2952              GRE() /
2953              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2954              TCP(sport=1234, dport=1234))
2955         self.pg0.add_stream(p)
2956         self.pg_enable_capture(self.pg_interfaces)
2957         self.pg_start()
2958         p = self.pg0.get_capture(1)
2959         packet = p[0]
2960         try:
2961             self.assertEqual(packet[IP].src, server_nat_ip)
2962             self.assertEqual(packet[IP].dst, host.ip4)
2963             self.assertTrue(packet.haslayer(GRE))
2964             self.check_ip_checksum(packet)
2965         except:
2966             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2967             raise
2968
2969     def test_unknown_proto(self):
2970         """ NAT44 translate packet with unknown protocol """
2971         self.nat44_add_address(self.nat_addr)
2972         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2973         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2974                                                   is_inside=0)
2975
2976         # in2out
2977         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2978              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2979              TCP(sport=self.tcp_port_in, dport=20))
2980         self.pg0.add_stream(p)
2981         self.pg_enable_capture(self.pg_interfaces)
2982         self.pg_start()
2983         p = self.pg1.get_capture(1)
2984
2985         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2986              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2987              GRE() /
2988              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2989              TCP(sport=1234, dport=1234))
2990         self.pg0.add_stream(p)
2991         self.pg_enable_capture(self.pg_interfaces)
2992         self.pg_start()
2993         p = self.pg1.get_capture(1)
2994         packet = p[0]
2995         try:
2996             self.assertEqual(packet[IP].src, self.nat_addr)
2997             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2998             self.assertTrue(packet.haslayer(GRE))
2999             self.check_ip_checksum(packet)
3000         except:
3001             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3002             raise
3003
3004         # out2in
3005         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3006              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3007              GRE() /
3008              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3009              TCP(sport=1234, dport=1234))
3010         self.pg1.add_stream(p)
3011         self.pg_enable_capture(self.pg_interfaces)
3012         self.pg_start()
3013         p = self.pg0.get_capture(1)
3014         packet = p[0]
3015         try:
3016             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3017             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3018             self.assertTrue(packet.haslayer(GRE))
3019             self.check_ip_checksum(packet)
3020         except:
3021             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3022             raise
3023
3024     def test_hairpinning_unknown_proto(self):
3025         """ NAT44 translate packet with unknown protocol - hairpinning """
3026         host = self.pg0.remote_hosts[0]
3027         server = self.pg0.remote_hosts[1]
3028         host_in_port = 1234
3029         host_out_port = 0
3030         server_in_port = 5678
3031         server_out_port = 8765
3032         server_nat_ip = "10.0.0.11"
3033
3034         self.nat44_add_address(self.nat_addr)
3035         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3036         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3037                                                   is_inside=0)
3038
3039         # add static mapping for server
3040         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3041
3042         # host to server
3043         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3044              IP(src=host.ip4, dst=server_nat_ip) /
3045              TCP(sport=host_in_port, dport=server_out_port))
3046         self.pg0.add_stream(p)
3047         self.pg_enable_capture(self.pg_interfaces)
3048         self.pg_start()
3049         capture = self.pg0.get_capture(1)
3050
3051         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3052              IP(src=host.ip4, dst=server_nat_ip) /
3053              GRE() /
3054              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3055              TCP(sport=1234, dport=1234))
3056         self.pg0.add_stream(p)
3057         self.pg_enable_capture(self.pg_interfaces)
3058         self.pg_start()
3059         p = self.pg0.get_capture(1)
3060         packet = p[0]
3061         try:
3062             self.assertEqual(packet[IP].src, self.nat_addr)
3063             self.assertEqual(packet[IP].dst, server.ip4)
3064             self.assertTrue(packet.haslayer(GRE))
3065             self.check_ip_checksum(packet)
3066         except:
3067             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3068             raise
3069
3070         # server to host
3071         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3072              IP(src=server.ip4, dst=self.nat_addr) /
3073              GRE() /
3074              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3075              TCP(sport=1234, dport=1234))
3076         self.pg0.add_stream(p)
3077         self.pg_enable_capture(self.pg_interfaces)
3078         self.pg_start()
3079         p = self.pg0.get_capture(1)
3080         packet = p[0]
3081         try:
3082             self.assertEqual(packet[IP].src, server_nat_ip)
3083             self.assertEqual(packet[IP].dst, host.ip4)
3084             self.assertTrue(packet.haslayer(GRE))
3085             self.check_ip_checksum(packet)
3086         except:
3087             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3088             raise
3089
3090     def test_output_feature(self):
3091         """ NAT44 interface output feature (in2out postrouting) """
3092         self.nat44_add_address(self.nat_addr)
3093         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3094         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3095         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3096                                                          is_inside=0)
3097
3098         # in2out
3099         pkts = self.create_stream_in(self.pg0, self.pg3)
3100         self.pg0.add_stream(pkts)
3101         self.pg_enable_capture(self.pg_interfaces)
3102         self.pg_start()
3103         capture = self.pg3.get_capture(len(pkts))
3104         self.verify_capture_out(capture)
3105
3106         # out2in
3107         pkts = self.create_stream_out(self.pg3)
3108         self.pg3.add_stream(pkts)
3109         self.pg_enable_capture(self.pg_interfaces)
3110         self.pg_start()
3111         capture = self.pg0.get_capture(len(pkts))
3112         self.verify_capture_in(capture, self.pg0)
3113
3114         # from non-NAT interface to NAT inside interface
3115         pkts = self.create_stream_in(self.pg2, self.pg0)
3116         self.pg2.add_stream(pkts)
3117         self.pg_enable_capture(self.pg_interfaces)
3118         self.pg_start()
3119         capture = self.pg0.get_capture(len(pkts))
3120         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3121
3122     def test_output_feature_vrf_aware(self):
3123         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3124         nat_ip_vrf10 = "10.0.0.10"
3125         nat_ip_vrf20 = "10.0.0.20"
3126
3127         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3128                                    dst_address_length=32,
3129                                    next_hop_address=self.pg3.remote_ip4n,
3130                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3131                                    table_id=10)
3132         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3133                                    dst_address_length=32,
3134                                    next_hop_address=self.pg3.remote_ip4n,
3135                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3136                                    table_id=20)
3137
3138         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3139         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3140         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3141         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3142         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3143                                                          is_inside=0)
3144
3145         # in2out VRF 10
3146         pkts = self.create_stream_in(self.pg4, self.pg3)
3147         self.pg4.add_stream(pkts)
3148         self.pg_enable_capture(self.pg_interfaces)
3149         self.pg_start()
3150         capture = self.pg3.get_capture(len(pkts))
3151         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3152
3153         # out2in VRF 10
3154         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3155         self.pg3.add_stream(pkts)
3156         self.pg_enable_capture(self.pg_interfaces)
3157         self.pg_start()
3158         capture = self.pg4.get_capture(len(pkts))
3159         self.verify_capture_in(capture, self.pg4)
3160
3161         # in2out VRF 20
3162         pkts = self.create_stream_in(self.pg6, self.pg3)
3163         self.pg6.add_stream(pkts)
3164         self.pg_enable_capture(self.pg_interfaces)
3165         self.pg_start()
3166         capture = self.pg3.get_capture(len(pkts))
3167         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3168
3169         # out2in VRF 20
3170         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3171         self.pg3.add_stream(pkts)
3172         self.pg_enable_capture(self.pg_interfaces)
3173         self.pg_start()
3174         capture = self.pg6.get_capture(len(pkts))
3175         self.verify_capture_in(capture, self.pg6)
3176
3177     def test_output_feature_hairpinning(self):
3178         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3179         host = self.pg0.remote_hosts[0]
3180         server = self.pg0.remote_hosts[1]
3181         host_in_port = 1234
3182         host_out_port = 0
3183         server_in_port = 5678
3184         server_out_port = 8765
3185
3186         self.nat44_add_address(self.nat_addr)
3187         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3188         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3189                                                          is_inside=0)
3190
3191         # add static mapping for server
3192         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3193                                       server_in_port, server_out_port,
3194                                       proto=IP_PROTOS.tcp)
3195
3196         # send packet from host to server
3197         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3198              IP(src=host.ip4, dst=self.nat_addr) /
3199              TCP(sport=host_in_port, dport=server_out_port))
3200         self.pg0.add_stream(p)
3201         self.pg_enable_capture(self.pg_interfaces)
3202         self.pg_start()
3203         capture = self.pg0.get_capture(1)
3204         p = capture[0]
3205         try:
3206             ip = p[IP]
3207             tcp = p[TCP]
3208             self.assertEqual(ip.src, self.nat_addr)
3209             self.assertEqual(ip.dst, server.ip4)
3210             self.assertNotEqual(tcp.sport, host_in_port)
3211             self.assertEqual(tcp.dport, server_in_port)
3212             self.check_tcp_checksum(p)
3213             host_out_port = tcp.sport
3214         except:
3215             self.logger.error(ppp("Unexpected or invalid packet:", p))
3216             raise
3217
3218         # send reply from server to host
3219         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3220              IP(src=server.ip4, dst=self.nat_addr) /
3221              TCP(sport=server_in_port, dport=host_out_port))
3222         self.pg0.add_stream(p)
3223         self.pg_enable_capture(self.pg_interfaces)
3224         self.pg_start()
3225         capture = self.pg0.get_capture(1)
3226         p = capture[0]
3227         try:
3228             ip = p[IP]
3229             tcp = p[TCP]
3230             self.assertEqual(ip.src, self.nat_addr)
3231             self.assertEqual(ip.dst, host.ip4)
3232             self.assertEqual(tcp.sport, server_out_port)
3233             self.assertEqual(tcp.dport, host_in_port)
3234             self.check_tcp_checksum(p)
3235         except:
3236             self.logger.error(ppp("Unexpected or invalid packet:", p))
3237             raise
3238
3239     def test_one_armed_nat44(self):
3240         """ One armed NAT44 """
3241         remote_host = self.pg9.remote_hosts[0]
3242         local_host = self.pg9.remote_hosts[1]
3243         external_port = 0
3244
3245         self.nat44_add_address(self.nat_addr)
3246         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3247         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3248                                                   is_inside=0)
3249
3250         # in2out
3251         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3252              IP(src=local_host.ip4, dst=remote_host.ip4) /
3253              TCP(sport=12345, dport=80))
3254         self.pg9.add_stream(p)
3255         self.pg_enable_capture(self.pg_interfaces)
3256         self.pg_start()
3257         capture = self.pg9.get_capture(1)
3258         p = capture[0]
3259         try:
3260             ip = p[IP]
3261             tcp = p[TCP]
3262             self.assertEqual(ip.src, self.nat_addr)
3263             self.assertEqual(ip.dst, remote_host.ip4)
3264             self.assertNotEqual(tcp.sport, 12345)
3265             external_port = tcp.sport
3266             self.assertEqual(tcp.dport, 80)
3267             self.check_tcp_checksum(p)
3268             self.check_ip_checksum(p)
3269         except:
3270             self.logger.error(ppp("Unexpected or invalid packet:", p))
3271             raise
3272
3273         # out2in
3274         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3275              IP(src=remote_host.ip4, dst=self.nat_addr) /
3276              TCP(sport=80, dport=external_port))
3277         self.pg9.add_stream(p)
3278         self.pg_enable_capture(self.pg_interfaces)
3279         self.pg_start()
3280         capture = self.pg9.get_capture(1)
3281         p = capture[0]
3282         try:
3283             ip = p[IP]
3284             tcp = p[TCP]
3285             self.assertEqual(ip.src, remote_host.ip4)
3286             self.assertEqual(ip.dst, local_host.ip4)
3287             self.assertEqual(tcp.sport, 80)
3288             self.assertEqual(tcp.dport, 12345)
3289             self.check_tcp_checksum(p)
3290             self.check_ip_checksum(p)
3291         except:
3292             self.logger.error(ppp("Unexpected or invalid packet:", p))
3293             raise
3294
3295     def test_one_armed_nat44_static(self):
3296         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3297         remote_host = self.pg9.remote_hosts[0]
3298         local_host = self.pg9.remote_hosts[1]
3299         external_port = 80
3300         local_port = 8080
3301         eh_port_in = 0
3302
3303         self.vapi.nat44_forwarding_enable_disable(1)
3304         self.nat44_add_address(self.nat_addr, twice_nat=1)
3305         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3306                                       local_port, external_port,
3307                                       proto=IP_PROTOS.tcp, out2in_only=1,
3308                                       twice_nat=1)
3309         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3310         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3311                                                   is_inside=0)
3312
3313         # from client to service
3314         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3315              IP(src=remote_host.ip4, dst=self.nat_addr) /
3316              TCP(sport=12345, dport=external_port))
3317         self.pg9.add_stream(p)
3318         self.pg_enable_capture(self.pg_interfaces)
3319         self.pg_start()
3320         capture = self.pg9.get_capture(1)
3321         p = capture[0]
3322         server = None
3323         try:
3324             ip = p[IP]
3325             tcp = p[TCP]
3326             self.assertEqual(ip.dst, local_host.ip4)
3327             self.assertEqual(ip.src, self.nat_addr)
3328             self.assertEqual(tcp.dport, local_port)
3329             self.assertNotEqual(tcp.sport, 12345)
3330             eh_port_in = tcp.sport
3331             self.check_tcp_checksum(p)
3332             self.check_ip_checksum(p)
3333         except:
3334             self.logger.error(ppp("Unexpected or invalid packet:", p))
3335             raise
3336
3337         # from service back to client
3338         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3339              IP(src=local_host.ip4, dst=self.nat_addr) /
3340              TCP(sport=local_port, dport=eh_port_in))
3341         self.pg9.add_stream(p)
3342         self.pg_enable_capture(self.pg_interfaces)
3343         self.pg_start()
3344         capture = self.pg9.get_capture(1)
3345         p = capture[0]
3346         try:
3347             ip = p[IP]
3348             tcp = p[TCP]
3349             self.assertEqual(ip.src, self.nat_addr)
3350             self.assertEqual(ip.dst, remote_host.ip4)
3351             self.assertEqual(tcp.sport, external_port)
3352             self.assertEqual(tcp.dport, 12345)
3353             self.check_tcp_checksum(p)
3354             self.check_ip_checksum(p)
3355         except:
3356             self.logger.error(ppp("Unexpected or invalid packet:", p))
3357             raise
3358
3359     def test_del_session(self):
3360         """ Delete NAT44 session """
3361         self.nat44_add_address(self.nat_addr)
3362         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3363         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3364                                                   is_inside=0)
3365
3366         pkts = self.create_stream_in(self.pg0, self.pg1)
3367         self.pg0.add_stream(pkts)
3368         self.pg_enable_capture(self.pg_interfaces)
3369         self.pg_start()
3370         capture = self.pg1.get_capture(len(pkts))
3371
3372         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3373         nsessions = len(sessions)
3374
3375         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3376                                     sessions[0].inside_port,
3377                                     sessions[0].protocol)
3378         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3379                                     sessions[1].outside_port,
3380                                     sessions[1].protocol,
3381                                     is_in=0)
3382
3383         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3384         self.assertEqual(nsessions - len(sessions), 2)
3385
3386     def test_set_get_reass(self):
3387         """ NAT44 set/get virtual fragmentation reassembly """
3388         reas_cfg1 = self.vapi.nat_get_reass()
3389
3390         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3391                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3392                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3393
3394         reas_cfg2 = self.vapi.nat_get_reass()
3395
3396         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3397         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3398         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3399
3400         self.vapi.nat_set_reass(drop_frag=1)
3401         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3402
3403     def test_frag_in_order(self):
3404         """ NAT44 translate fragments arriving in order """
3405         self.nat44_add_address(self.nat_addr)
3406         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3407         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3408                                                   is_inside=0)
3409
3410         data = "A" * 4 + "B" * 16 + "C" * 3
3411         self.tcp_port_in = random.randint(1025, 65535)
3412
3413         reass = self.vapi.nat_reass_dump()
3414         reass_n_start = len(reass)
3415
3416         # in2out
3417         pkts = self.create_stream_frag(self.pg0,
3418                                        self.pg1.remote_ip4,
3419                                        self.tcp_port_in,
3420                                        20,
3421                                        data)
3422         self.pg0.add_stream(pkts)
3423         self.pg_enable_capture(self.pg_interfaces)
3424         self.pg_start()
3425         frags = self.pg1.get_capture(len(pkts))
3426         p = self.reass_frags_and_verify(frags,
3427                                         self.nat_addr,
3428                                         self.pg1.remote_ip4)
3429         self.assertEqual(p[TCP].dport, 20)
3430         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3431         self.tcp_port_out = p[TCP].sport
3432         self.assertEqual(data, p[Raw].load)
3433
3434         # out2in
3435         pkts = self.create_stream_frag(self.pg1,
3436                                        self.nat_addr,
3437                                        20,
3438                                        self.tcp_port_out,
3439                                        data)
3440         self.pg1.add_stream(pkts)
3441         self.pg_enable_capture(self.pg_interfaces)
3442         self.pg_start()
3443         frags = self.pg0.get_capture(len(pkts))
3444         p = self.reass_frags_and_verify(frags,
3445                                         self.pg1.remote_ip4,
3446                                         self.pg0.remote_ip4)
3447         self.assertEqual(p[TCP].sport, 20)
3448         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3449         self.assertEqual(data, p[Raw].load)
3450
3451         reass = self.vapi.nat_reass_dump()
3452         reass_n_end = len(reass)
3453
3454         self.assertEqual(reass_n_end - reass_n_start, 2)
3455
3456     def test_reass_hairpinning(self):
3457         """ NAT44 fragments hairpinning """
3458         host = self.pg0.remote_hosts[0]
3459         server = self.pg0.remote_hosts[1]
3460         host_in_port = random.randint(1025, 65535)
3461         host_out_port = 0
3462         server_in_port = random.randint(1025, 65535)
3463         server_out_port = random.randint(1025, 65535)
3464         data = "A" * 4 + "B" * 16 + "C" * 3
3465
3466         self.nat44_add_address(self.nat_addr)
3467         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3468         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3469                                                   is_inside=0)
3470         # add static mapping for server
3471         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3472                                       server_in_port, server_out_port,
3473                                       proto=IP_PROTOS.tcp)
3474
3475         # send packet from host to server
3476         pkts = self.create_stream_frag(self.pg0,
3477                                        self.nat_addr,
3478                                        host_in_port,
3479                                        server_out_port,
3480                                        data)
3481         self.pg0.add_stream(pkts)
3482         self.pg_enable_capture(self.pg_interfaces)
3483         self.pg_start()
3484         frags = self.pg0.get_capture(len(pkts))
3485         p = self.reass_frags_and_verify(frags,
3486                                         self.nat_addr,
3487                                         server.ip4)
3488         self.assertNotEqual(p[TCP].sport, host_in_port)
3489         self.assertEqual(p[TCP].dport, server_in_port)
3490         self.assertEqual(data, p[Raw].load)
3491
3492     def test_frag_out_of_order(self):
3493         """ NAT44 translate fragments arriving out of order """
3494         self.nat44_add_address(self.nat_addr)
3495         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3496         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3497                                                   is_inside=0)
3498
3499         data = "A" * 4 + "B" * 16 + "C" * 3
3500         random.randint(1025, 65535)
3501
3502         # in2out
3503         pkts = self.create_stream_frag(self.pg0,
3504                                        self.pg1.remote_ip4,
3505                                        self.tcp_port_in,
3506                                        20,
3507                                        data)
3508         pkts.reverse()
3509         self.pg0.add_stream(pkts)
3510         self.pg_enable_capture(self.pg_interfaces)
3511         self.pg_start()
3512         frags = self.pg1.get_capture(len(pkts))
3513         p = self.reass_frags_and_verify(frags,
3514                                         self.nat_addr,
3515                                         self.pg1.remote_ip4)
3516         self.assertEqual(p[TCP].dport, 20)
3517         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3518         self.tcp_port_out = p[TCP].sport
3519         self.assertEqual(data, p[Raw].load)
3520
3521         # out2in
3522         pkts = self.create_stream_frag(self.pg1,
3523                                        self.nat_addr,
3524                                        20,
3525                                        self.tcp_port_out,
3526                                        data)
3527         pkts.reverse()
3528         self.pg1.add_stream(pkts)
3529         self.pg_enable_capture(self.pg_interfaces)
3530         self.pg_start()
3531         frags = self.pg0.get_capture(len(pkts))
3532         p = self.reass_frags_and_verify(frags,
3533                                         self.pg1.remote_ip4,
3534                                         self.pg0.remote_ip4)
3535         self.assertEqual(p[TCP].sport, 20)
3536         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3537         self.assertEqual(data, p[Raw].load)
3538
3539     def test_port_restricted(self):
3540         """ Port restricted NAT44 (MAP-E CE) """
3541         self.nat44_add_address(self.nat_addr)
3542         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3543         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3544                                                   is_inside=0)
3545         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3546                       "psid-offset 6 psid-len 6")
3547
3548         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3549              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3550              TCP(sport=4567, dport=22))
3551         self.pg0.add_stream(p)
3552         self.pg_enable_capture(self.pg_interfaces)
3553         self.pg_start()
3554         capture = self.pg1.get_capture(1)
3555         p = capture[0]
3556         try:
3557             ip = p[IP]
3558             tcp = p[TCP]
3559             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3560             self.assertEqual(ip.src, self.nat_addr)
3561             self.assertEqual(tcp.dport, 22)
3562             self.assertNotEqual(tcp.sport, 4567)
3563             self.assertEqual((tcp.sport >> 6) & 63, 10)
3564             self.check_tcp_checksum(p)
3565             self.check_ip_checksum(p)
3566         except:
3567             self.logger.error(ppp("Unexpected or invalid packet:", p))
3568             raise
3569
3570     def test_twice_nat(self):
3571         """ Twice NAT44 """
3572         twice_nat_addr = '10.0.1.3'
3573         port_in = 8080
3574         port_out = 80
3575         eh_port_out = 4567
3576         eh_port_in = 0
3577         self.nat44_add_address(self.nat_addr)
3578         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3579         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3580                                       port_in, port_out, proto=IP_PROTOS.tcp,
3581                                       twice_nat=1)
3582         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3583         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3584                                                   is_inside=0)
3585
3586         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3587              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3588              TCP(sport=eh_port_out, dport=port_out))
3589         self.pg1.add_stream(p)
3590         self.pg_enable_capture(self.pg_interfaces)
3591         self.pg_start()
3592         capture = self.pg0.get_capture(1)
3593         p = capture[0]
3594         try:
3595             ip = p[IP]
3596             tcp = p[TCP]
3597             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3598             self.assertEqual(ip.src, twice_nat_addr)
3599             self.assertEqual(tcp.dport, port_in)
3600             self.assertNotEqual(tcp.sport, eh_port_out)
3601             eh_port_in = tcp.sport
3602             self.check_tcp_checksum(p)
3603             self.check_ip_checksum(p)
3604         except:
3605             self.logger.error(ppp("Unexpected or invalid packet:", p))
3606             raise
3607
3608         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3609              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3610              TCP(sport=port_in, dport=eh_port_in))
3611         self.pg0.add_stream(p)
3612         self.pg_enable_capture(self.pg_interfaces)
3613         self.pg_start()
3614         capture = self.pg1.get_capture(1)
3615         p = capture[0]
3616         try:
3617             ip = p[IP]
3618             tcp = p[TCP]
3619             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3620             self.assertEqual(ip.src, self.nat_addr)
3621             self.assertEqual(tcp.dport, eh_port_out)
3622             self.assertEqual(tcp.sport, port_out)
3623             self.check_tcp_checksum(p)
3624             self.check_ip_checksum(p)
3625         except:
3626             self.logger.error(ppp("Unexpected or invalid packet:", p))
3627             raise
3628
3629     def test_twice_nat_lb(self):
3630         """ Twice NAT44 local service load balancing """
3631         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3632         twice_nat_addr = '10.0.1.3'
3633         local_port = 8080
3634         external_port = 80
3635         eh_port_out = 4567
3636         eh_port_in = 0
3637         server1 = self.pg0.remote_hosts[0]
3638         server2 = self.pg0.remote_hosts[1]
3639
3640         locals = [{'addr': server1.ip4n,
3641                    'port': local_port,
3642                    'probability': 50},
3643                   {'addr': server2.ip4n,
3644                    'port': local_port,
3645                    'probability': 50}]
3646
3647         self.nat44_add_address(self.nat_addr)
3648         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3649
3650         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3651                                                   external_port,
3652                                                   IP_PROTOS.tcp,
3653                                                   twice_nat=1,
3654                                                   local_num=len(locals),
3655                                                   locals=locals)
3656         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3657         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3658                                                   is_inside=0)
3659
3660         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3661              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3662              TCP(sport=eh_port_out, dport=external_port))
3663         self.pg1.add_stream(p)
3664         self.pg_enable_capture(self.pg_interfaces)
3665         self.pg_start()
3666         capture = self.pg0.get_capture(1)
3667         p = capture[0]
3668         server = None
3669         try:
3670             ip = p[IP]
3671             tcp = p[TCP]
3672             self.assertEqual(ip.src, twice_nat_addr)
3673             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3674             if ip.dst == server1.ip4:
3675                 server = server1
3676             else:
3677                 server = server2
3678             self.assertNotEqual(tcp.sport, eh_port_out)
3679             eh_port_in = tcp.sport
3680             self.assertEqual(tcp.dport, local_port)
3681             self.check_tcp_checksum(p)
3682             self.check_ip_checksum(p)
3683         except:
3684             self.logger.error(ppp("Unexpected or invalid packet:", p))
3685             raise
3686
3687         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3688              IP(src=server.ip4, dst=twice_nat_addr) /
3689              TCP(sport=local_port, dport=eh_port_in))
3690         self.pg0.add_stream(p)
3691         self.pg_enable_capture(self.pg_interfaces)
3692         self.pg_start()
3693         capture = self.pg1.get_capture(1)
3694         p = capture[0]
3695         try:
3696             ip = p[IP]
3697             tcp = p[TCP]
3698             self.assertEqual(ip.src, self.nat_addr)
3699             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3700             self.assertEqual(tcp.sport, external_port)
3701             self.assertEqual(tcp.dport, eh_port_out)
3702             self.check_tcp_checksum(p)
3703             self.check_ip_checksum(p)
3704         except:
3705             self.logger.error(ppp("Unexpected or invalid packet:", p))
3706             raise
3707
3708     def test_twice_nat_interface_addr(self):
3709         """ Acquire twice NAT44 addresses from interface """
3710         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3711
3712         # no address in NAT pool
3713         adresses = self.vapi.nat44_address_dump()
3714         self.assertEqual(0, len(adresses))
3715
3716         # configure interface address and check NAT address pool
3717         self.pg7.config_ip4()
3718         adresses = self.vapi.nat44_address_dump()
3719         self.assertEqual(1, len(adresses))
3720         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3721         self.assertEqual(adresses[0].twice_nat, 1)
3722
3723         # remove interface address and check NAT address pool
3724         self.pg7.unconfig_ip4()
3725         adresses = self.vapi.nat44_address_dump()
3726         self.assertEqual(0, len(adresses))
3727
3728     def test_ipfix_max_frags(self):
3729         """ IPFIX logging maximum fragments pending reassembly exceeded """
3730         self.nat44_add_address(self.nat_addr)
3731         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3732         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3733                                                   is_inside=0)
3734         self.vapi.nat_set_reass(max_frag=0)
3735         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3736                                      src_address=self.pg3.local_ip4n,
3737                                      path_mtu=512,
3738                                      template_interval=10)
3739         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3740                             src_port=self.ipfix_src_port)
3741
3742         data = "A" * 4 + "B" * 16 + "C" * 3
3743         self.tcp_port_in = random.randint(1025, 65535)
3744         pkts = self.create_stream_frag(self.pg0,
3745                                        self.pg1.remote_ip4,
3746                                        self.tcp_port_in,
3747                                        20,
3748                                        data)
3749         self.pg0.add_stream(pkts[-1])
3750         self.pg_enable_capture(self.pg_interfaces)
3751         self.pg_start()
3752         frags = self.pg1.get_capture(0)
3753         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3754         capture = self.pg3.get_capture(9)
3755         ipfix = IPFIXDecoder()
3756         # first load template
3757         for p in capture:
3758             self.assertTrue(p.haslayer(IPFIX))
3759             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3760             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3761             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3762             self.assertEqual(p[UDP].dport, 4739)
3763             self.assertEqual(p[IPFIX].observationDomainID,
3764                              self.ipfix_domain_id)
3765             if p.haslayer(Template):
3766                 ipfix.add_template(p.getlayer(Template))
3767         # verify events in data set
3768         for p in capture:
3769             if p.haslayer(Data):
3770                 data = ipfix.decode_data_set(p.getlayer(Set))
3771                 self.verify_ipfix_max_fragments_ip4(data, 0,
3772                                                     self.pg0.remote_ip4n)
3773
3774     def tearDown(self):
3775         super(TestNAT44, self).tearDown()
3776         if not self.vpp_dead:
3777             self.logger.info(self.vapi.cli("show nat44 addresses"))
3778             self.logger.info(self.vapi.cli("show nat44 interfaces"))
3779             self.logger.info(self.vapi.cli("show nat44 static mappings"))
3780             self.logger.info(self.vapi.cli("show nat44 interface address"))
3781             self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3782             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3783             self.vapi.cli("nat addr-port-assignment-alg default")
3784             self.clear_nat44()
3785
3786
3787 class TestNAT44Out2InDPO(MethodHolder):
3788     """ NAT44 Test Cases using out2in DPO """
3789
3790     @classmethod
3791     def setUpConstants(cls):
3792         super(TestNAT44Out2InDPO, cls).setUpConstants()
3793         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3794
3795     @classmethod
3796     def setUpClass(cls):
3797         super(TestNAT44Out2InDPO, cls).setUpClass()
3798
3799         try:
3800             cls.tcp_port_in = 6303
3801             cls.tcp_port_out = 6303
3802             cls.udp_port_in = 6304
3803             cls.udp_port_out = 6304
3804             cls.icmp_id_in = 6305
3805             cls.icmp_id_out = 6305
3806             cls.nat_addr = '10.0.0.3'
3807             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3808             cls.dst_ip4 = '192.168.70.1'
3809
3810             cls.create_pg_interfaces(range(2))
3811
3812             cls.pg0.admin_up()
3813             cls.pg0.config_ip4()
3814             cls.pg0.resolve_arp()
3815
3816             cls.pg1.admin_up()
3817             cls.pg1.config_ip6()
3818             cls.pg1.resolve_ndp()
3819
3820             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3821                                       dst_address_length=0,
3822                                       next_hop_address=cls.pg1.remote_ip6n,
3823                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
3824
3825         except Exception:
3826             super(TestNAT44Out2InDPO, cls).tearDownClass()
3827             raise
3828
3829     def configure_xlat(self):
3830         self.dst_ip6_pfx = '1:2:3::'
3831         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3832                                               self.dst_ip6_pfx)
3833         self.dst_ip6_pfx_len = 96
3834         self.src_ip6_pfx = '4:5:6::'
3835         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3836                                               self.src_ip6_pfx)
3837         self.src_ip6_pfx_len = 96
3838         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3839                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3840                                  '\x00\x00\x00\x00', 0, is_translation=1,
3841                                  is_rfc6052=1)
3842
3843     def test_464xlat_ce(self):
3844         """ Test 464XLAT CE with NAT44 """
3845
3846         self.configure_xlat()
3847
3848         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3849         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3850
3851         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3852                                        self.dst_ip6_pfx_len)
3853         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3854                                        self.src_ip6_pfx_len)
3855
3856         try:
3857             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3858             self.pg0.add_stream(pkts)
3859             self.pg_enable_capture(self.pg_interfaces)
3860             self.pg_start()
3861             capture = self.pg1.get_capture(len(pkts))
3862             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3863                                         dst_ip=out_src_ip6)
3864
3865             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3866                                               out_dst_ip6)
3867             self.pg1.add_stream(pkts)
3868             self.pg_enable_capture(self.pg_interfaces)
3869             self.pg_start()
3870             capture = self.pg0.get_capture(len(pkts))
3871             self.verify_capture_in(capture, self.pg0)
3872         finally:
3873             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3874                                                       is_add=0)
3875             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3876                                                   self.nat_addr_n, is_add=0)
3877
3878     def test_464xlat_ce_no_nat(self):
3879         """ Test 464XLAT CE without NAT44 """
3880
3881         self.configure_xlat()
3882
3883         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3884                                        self.dst_ip6_pfx_len)
3885         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3886                                        self.src_ip6_pfx_len)
3887
3888         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3889         self.pg0.add_stream(pkts)
3890         self.pg_enable_capture(self.pg_interfaces)
3891         self.pg_start()
3892         capture = self.pg1.get_capture(len(pkts))
3893         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3894                                     nat_ip=out_dst_ip6, same_port=True)
3895
3896         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3897         self.pg1.add_stream(pkts)
3898         self.pg_enable_capture(self.pg_interfaces)
3899         self.pg_start()
3900         capture = self.pg0.get_capture(len(pkts))
3901         self.verify_capture_in(capture, self.pg0)
3902
3903
3904 class TestDeterministicNAT(MethodHolder):
3905     """ Deterministic NAT Test Cases """
3906
3907     @classmethod
3908     def setUpConstants(cls):
3909         super(TestDeterministicNAT, cls).setUpConstants()
3910         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3911
3912     @classmethod
3913     def setUpClass(cls):
3914         super(TestDeterministicNAT, cls).setUpClass()
3915
3916         try:
3917             cls.tcp_port_in = 6303
3918             cls.tcp_external_port = 6303
3919             cls.udp_port_in = 6304
3920             cls.udp_external_port = 6304
3921             cls.icmp_id_in = 6305
3922             cls.nat_addr = '10.0.0.3'
3923
3924             cls.create_pg_interfaces(range(3))
3925             cls.interfaces = list(cls.pg_interfaces)
3926
3927             for i in cls.interfaces:
3928                 i.admin_up()
3929                 i.config_ip4()
3930                 i.resolve_arp()
3931
3932             cls.pg0.generate_remote_hosts(2)
3933             cls.pg0.configure_ipv4_neighbors()
3934
3935         except Exception:
3936             super(TestDeterministicNAT, cls).tearDownClass()
3937             raise
3938
3939     def create_stream_in(self, in_if, out_if, ttl=64):
3940         """
3941         Create packet stream for inside network
3942
3943         :param in_if: Inside interface
3944         :param out_if: Outside interface
3945         :param ttl: TTL of generated packets
3946         """
3947         pkts = []
3948         # TCP
3949         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3950              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3951              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3952         pkts.append(p)
3953
3954         # UDP
3955         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3956              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3957              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3958         pkts.append(p)
3959
3960         # ICMP
3961         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3962              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3963              ICMP(id=self.icmp_id_in, type='echo-request'))
3964         pkts.append(p)
3965
3966         return pkts
3967
3968     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3969         """
3970         Create packet stream for outside network
3971
3972         :param out_if: Outside interface
3973         :param dst_ip: Destination IP address (Default use global NAT address)
3974         :param ttl: TTL of generated packets
3975         """
3976         if dst_ip is None:
3977             dst_ip = self.nat_addr
3978         pkts = []
3979         # TCP
3980         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3981              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3982              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3983         pkts.append(p)
3984
3985         # UDP
3986         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3987              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3988              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3989         pkts.append(p)
3990
3991         # ICMP
3992         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3993              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3994              ICMP(id=self.icmp_external_id, type='echo-reply'))
3995         pkts.append(p)
3996
3997         return pkts
3998
3999     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4000         """
4001         Verify captured packets on outside network
4002
4003         :param capture: Captured packets
4004         :param nat_ip: Translated IP address (Default use global NAT address)
4005         :param same_port: Sorce port number is not translated (Default False)
4006         :param packet_num: Expected number of packets (Default 3)
4007         """
4008         if nat_ip is None:
4009             nat_ip = self.nat_addr
4010         self.assertEqual(packet_num, len(capture))
4011         for packet in capture:
4012             try:
4013                 self.assertEqual(packet[IP].src, nat_ip)
4014                 if packet.haslayer(TCP):
4015                     self.tcp_port_out = packet[TCP].sport
4016                 elif packet.haslayer(UDP):
4017                     self.udp_port_out = packet[UDP].sport
4018                 else:
4019                     self.icmp_external_id = packet[ICMP].id
4020             except:
4021                 self.logger.error(ppp("Unexpected or invalid packet "
4022                                       "(outside network):", packet))
4023                 raise
4024
4025     def initiate_tcp_session(self, in_if, out_if):
4026         """
4027         Initiates TCP session
4028
4029         :param in_if: Inside interface
4030         :param out_if: Outside interface
4031         """
4032         try:
4033             # SYN packet in->out
4034             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4035                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4036                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4037                      flags="S"))
4038             in_if.add_stream(p)
4039             self.pg_enable_capture(self.pg_interfaces)
4040             self.pg_start()
4041             capture = out_if.get_capture(1)
4042             p = capture[0]
4043             self.tcp_port_out = p[TCP].sport
4044
4045             # SYN + ACK packet out->in
4046             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4047                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4048                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4049                      flags="SA"))
4050             out_if.add_stream(p)
4051             self.pg_enable_capture(self.pg_interfaces)
4052             self.pg_start()
4053             in_if.get_capture(1)
4054
4055             # ACK packet in->out
4056             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4057                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4058                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4059                      flags="A"))
4060             in_if.add_stream(p)
4061             self.pg_enable_capture(self.pg_interfaces)
4062             self.pg_start()
4063             out_if.get_capture(1)
4064
4065         except:
4066             self.logger.error("TCP 3 way handshake failed")
4067             raise
4068
4069     def verify_ipfix_max_entries_per_user(self, data):
4070         """
4071         Verify IPFIX maximum entries per user exceeded event
4072
4073         :param data: Decoded IPFIX data records
4074         """
4075         self.assertEqual(1, len(data))
4076         record = data[0]
4077         # natEvent
4078         self.assertEqual(ord(record[230]), 13)
4079         # natQuotaExceededEvent
4080         self.assertEqual('\x03\x00\x00\x00', record[466])
4081         # maxEntriesPerUser
4082         self.assertEqual('\xe8\x03\x00\x00', record[473])
4083         # sourceIPv4Address
4084         self.assertEqual(self.pg0.remote_ip4n, record[8])
4085
4086     def test_deterministic_mode(self):
4087         """ NAT plugin run deterministic mode """
4088         in_addr = '172.16.255.0'
4089         out_addr = '172.17.255.50'
4090         in_addr_t = '172.16.255.20'
4091         in_addr_n = socket.inet_aton(in_addr)
4092         out_addr_n = socket.inet_aton(out_addr)
4093         in_addr_t_n = socket.inet_aton(in_addr_t)
4094         in_plen = 24
4095         out_plen = 32
4096
4097         nat_config = self.vapi.nat_show_config()
4098         self.assertEqual(1, nat_config.deterministic)
4099
4100         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4101
4102         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4103         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4104         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4105         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4106
4107         deterministic_mappings = self.vapi.nat_det_map_dump()
4108         self.assertEqual(len(deterministic_mappings), 1)
4109         dsm = deterministic_mappings[0]
4110         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4111         self.assertEqual(in_plen, dsm.in_plen)
4112         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4113         self.assertEqual(out_plen, dsm.out_plen)
4114
4115         self.clear_nat_det()
4116         deterministic_mappings = self.vapi.nat_det_map_dump()
4117         self.assertEqual(len(deterministic_mappings), 0)
4118
4119     def test_set_timeouts(self):
4120         """ Set deterministic NAT timeouts """
4121         timeouts_before = self.vapi.nat_det_get_timeouts()
4122
4123         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4124                                        timeouts_before.tcp_established + 10,
4125                                        timeouts_before.tcp_transitory + 10,
4126                                        timeouts_before.icmp + 10)
4127
4128         timeouts_after = self.vapi.nat_det_get_timeouts()
4129
4130         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4131         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4132         self.assertNotEqual(timeouts_before.tcp_established,
4133                             timeouts_after.tcp_established)
4134         self.assertNotEqual(timeouts_before.tcp_transitory,
4135                             timeouts_after.tcp_transitory)
4136
4137     def test_det_in(self):
4138         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4139
4140         nat_ip = "10.0.0.10"
4141
4142         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4143                                       32,
4144                                       socket.inet_aton(nat_ip),
4145                                       32)
4146         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4147         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4148                                                   is_inside=0)
4149
4150         # in2out
4151         pkts = self.create_stream_in(self.pg0, self.pg1)
4152         self.pg0.add_stream(pkts)
4153         self.pg_enable_capture(self.pg_interfaces)
4154         self.pg_start()
4155         capture = self.pg1.get_capture(len(pkts))
4156         self.verify_capture_out(capture, nat_ip)
4157
4158         # out2in
4159         pkts = self.create_stream_out(self.pg1, nat_ip)
4160         self.pg1.add_stream(pkts)
4161         self.pg_enable_capture(self.pg_interfaces)
4162         self.pg_start()
4163         capture = self.pg0.get_capture(len(pkts))
4164         self.verify_capture_in(capture, self.pg0)
4165
4166         # session dump test
4167         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4168         self.assertEqual(len(sessions), 3)
4169
4170         # TCP session
4171         s = sessions[0]
4172         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4173         self.assertEqual(s.in_port, self.tcp_port_in)
4174         self.assertEqual(s.out_port, self.tcp_port_out)
4175         self.assertEqual(s.ext_port, self.tcp_external_port)
4176
4177         # UDP session
4178         s = sessions[1]
4179         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4180         self.assertEqual(s.in_port, self.udp_port_in)
4181         self.assertEqual(s.out_port, self.udp_port_out)
4182         self.assertEqual(s.ext_port, self.udp_external_port)
4183
4184         # ICMP session
4185         s = sessions[2]
4186         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4187         self.assertEqual(s.in_port, self.icmp_id_in)
4188         self.assertEqual(s.out_port, self.icmp_external_id)
4189
4190     def test_multiple_users(self):
4191         """ Deterministic NAT multiple users """
4192
4193         nat_ip = "10.0.0.10"
4194         port_in = 80
4195         external_port = 6303
4196
4197         host0 = self.pg0.remote_hosts[0]
4198         host1 = self.pg0.remote_hosts[1]
4199
4200         self.vapi.nat_det_add_del_map(host0.ip4n,
4201                                       24,
4202                                       socket.inet_aton(nat_ip),
4203                                       32)
4204         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4205         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4206                                                   is_inside=0)
4207
4208         # host0 to out
4209         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4210              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4211              TCP(sport=port_in, dport=external_port))
4212         self.pg0.add_stream(p)
4213         self.pg_enable_capture(self.pg_interfaces)
4214         self.pg_start()
4215         capture = self.pg1.get_capture(1)
4216         p = capture[0]
4217         try:
4218             ip = p[IP]
4219             tcp = p[TCP]
4220             self.assertEqual(ip.src, nat_ip)
4221             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4222             self.assertEqual(tcp.dport, external_port)
4223             port_out0 = tcp.sport
4224         except:
4225             self.logger.error(ppp("Unexpected or invalid packet:", p))
4226             raise
4227
4228         # host1 to out
4229         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4230              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4231              TCP(sport=port_in, dport=external_port))
4232         self.pg0.add_stream(p)
4233         self.pg_enable_capture(self.pg_interfaces)
4234         self.pg_start()
4235         capture = self.pg1.get_capture(1)
4236         p = capture[0]
4237         try:
4238             ip = p[IP]
4239             tcp = p[TCP]
4240             self.assertEqual(ip.src, nat_ip)
4241             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4242             self.assertEqual(tcp.dport, external_port)
4243             port_out1 = tcp.sport
4244         except:
4245             self.logger.error(ppp("Unexpected or invalid packet:", p))
4246             raise
4247
4248         dms = self.vapi.nat_det_map_dump()
4249         self.assertEqual(1, len(dms))
4250         self.assertEqual(2, dms[0].ses_num)
4251
4252         # out to host0
4253         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4254              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4255              TCP(sport=external_port, dport=port_out0))
4256         self.pg1.add_stream(p)
4257         self.pg_enable_capture(self.pg_interfaces)
4258         self.pg_start()
4259         capture = self.pg0.get_capture(1)
4260         p = capture[0]
4261         try:
4262             ip = p[IP]
4263             tcp = p[TCP]
4264             self.assertEqual(ip.src, self.pg1.remote_ip4)
4265             self.assertEqual(ip.dst, host0.ip4)
4266             self.assertEqual(tcp.dport, port_in)
4267             self.assertEqual(tcp.sport, external_port)
4268         except:
4269             self.logger.error(ppp("Unexpected or invalid packet:", p))
4270             raise
4271
4272         # out to host1
4273         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4274              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4275              TCP(sport=external_port, dport=port_out1))
4276         self.pg1.add_stream(p)
4277         self.pg_enable_capture(self.pg_interfaces)
4278         self.pg_start()
4279         capture = self.pg0.get_capture(1)
4280         p = capture[0]
4281         try:
4282             ip = p[IP]
4283             tcp = p[TCP]
4284             self.assertEqual(ip.src, self.pg1.remote_ip4)
4285             self.assertEqual(ip.dst, host1.ip4)
4286             self.assertEqual(tcp.dport, port_in)
4287             self.assertEqual(tcp.sport, external_port)
4288         except:
4289             self.logger.error(ppp("Unexpected or invalid packet", p))
4290             raise
4291
4292         # session close api test
4293         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4294                                             port_out1,
4295                                             self.pg1.remote_ip4n,
4296                                             external_port)
4297         dms = self.vapi.nat_det_map_dump()
4298         self.assertEqual(dms[0].ses_num, 1)
4299
4300         self.vapi.nat_det_close_session_in(host0.ip4n,
4301                                            port_in,
4302                                            self.pg1.remote_ip4n,
4303                                            external_port)
4304         dms = self.vapi.nat_det_map_dump()
4305         self.assertEqual(dms[0].ses_num, 0)
4306
4307     def test_tcp_session_close_detection_in(self):
4308         """ Deterministic NAT TCP session close from inside network """
4309         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4310                                       32,
4311                                       socket.inet_aton(self.nat_addr),
4312                                       32)
4313         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4314         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4315                                                   is_inside=0)
4316
4317         self.initiate_tcp_session(self.pg0, self.pg1)
4318
4319         # close the session from inside
4320         try:
4321             # FIN packet in -> out
4322             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4323                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4324                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4325                      flags="F"))
4326             self.pg0.add_stream(p)
4327             self.pg_enable_capture(self.pg_interfaces)
4328             self.pg_start()
4329             self.pg1.get_capture(1)
4330
4331             pkts = []
4332
4333             # ACK packet out -> in
4334             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4335                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4336                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4337                      flags="A"))
4338             pkts.append(p)
4339
4340             # FIN packet out -> in
4341             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4342                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4343                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4344                      flags="F"))
4345             pkts.append(p)
4346
4347             self.pg1.add_stream(pkts)
4348             self.pg_enable_capture(self.pg_interfaces)
4349             self.pg_start()
4350             self.pg0.get_capture(2)
4351
4352             # ACK packet in -> out
4353             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4354                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4355                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4356                      flags="A"))
4357             self.pg0.add_stream(p)
4358             self.pg_enable_capture(self.pg_interfaces)
4359             self.pg_start()
4360             self.pg1.get_capture(1)
4361
4362             # Check if deterministic NAT44 closed the session
4363             dms = self.vapi.nat_det_map_dump()
4364             self.assertEqual(0, dms[0].ses_num)
4365         except:
4366             self.logger.error("TCP session termination failed")
4367             raise
4368
4369     def test_tcp_session_close_detection_out(self):
4370         """ Deterministic NAT TCP session close from outside network """
4371         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4372                                       32,
4373                                       socket.inet_aton(self.nat_addr),
4374                                       32)
4375         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4376         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4377                                                   is_inside=0)
4378
4379         self.initiate_tcp_session(self.pg0, self.pg1)
4380
4381         # close the session from outside
4382         try:
4383             # FIN packet out -> in
4384             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4385                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4386                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4387                      flags="F"))
4388             self.pg1.add_stream(p)
4389             self.pg_enable_capture(self.pg_interfaces)
4390             self.pg_start()
4391             self.pg0.get_capture(1)
4392
4393             pkts = []
4394
4395             # ACK packet in -> out
4396             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4397                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4398                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4399                      flags="A"))
4400             pkts.append(p)
4401
4402             # ACK packet in -> out
4403             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4404                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4405                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4406                      flags="F"))
4407             pkts.append(p)
4408
4409             self.pg0.add_stream(pkts)
4410             self.pg_enable_capture(self.pg_interfaces)
4411             self.pg_start()
4412             self.pg1.get_capture(2)
4413
4414             # ACK packet out -> in
4415             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4416                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4417                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4418                      flags="A"))
4419             self.pg1.add_stream(p)
4420             self.pg_enable_capture(self.pg_interfaces)
4421             self.pg_start()
4422             self.pg0.get_capture(1)
4423
4424             # Check if deterministic NAT44 closed the session
4425             dms = self.vapi.nat_det_map_dump()
4426             self.assertEqual(0, dms[0].ses_num)
4427         except:
4428             self.logger.error("TCP session termination failed")
4429             raise
4430
4431     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4432     def test_session_timeout(self):
4433         """ Deterministic NAT session timeouts """
4434         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4435                                       32,
4436                                       socket.inet_aton(self.nat_addr),
4437                                       32)
4438         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4439         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4440                                                   is_inside=0)
4441
4442         self.initiate_tcp_session(self.pg0, self.pg1)
4443         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4444         pkts = self.create_stream_in(self.pg0, self.pg1)
4445         self.pg0.add_stream(pkts)
4446         self.pg_enable_capture(self.pg_interfaces)
4447         self.pg_start()
4448         capture = self.pg1.get_capture(len(pkts))
4449         sleep(15)
4450
4451         dms = self.vapi.nat_det_map_dump()
4452         self.assertEqual(0, dms[0].ses_num)
4453
4454     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4455     def test_session_limit_per_user(self):
4456         """ Deterministic NAT maximum sessions per user limit """
4457         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4458                                       32,
4459                                       socket.inet_aton(self.nat_addr),
4460                                       32)
4461         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4462         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4463                                                   is_inside=0)
4464         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4465                                      src_address=self.pg2.local_ip4n,
4466                                      path_mtu=512,
4467                                      template_interval=10)
4468         self.vapi.nat_ipfix()
4469
4470         pkts = []
4471         for port in range(1025, 2025):
4472             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4473                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4474                  UDP(sport=port, dport=port))
4475             pkts.append(p)
4476
4477         self.pg0.add_stream(pkts)
4478         self.pg_enable_capture(self.pg_interfaces)
4479         self.pg_start()
4480         capture = self.pg1.get_capture(len(pkts))
4481
4482         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4483              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4484              UDP(sport=3001, dport=3002))
4485         self.pg0.add_stream(p)
4486         self.pg_enable_capture(self.pg_interfaces)
4487         self.pg_start()
4488         capture = self.pg1.assert_nothing_captured()
4489
4490         # verify ICMP error packet
4491         capture = self.pg0.get_capture(1)
4492         p = capture[0]
4493         self.assertTrue(p.haslayer(ICMP))
4494         icmp = p[ICMP]
4495         self.assertEqual(icmp.type, 3)
4496         self.assertEqual(icmp.code, 1)
4497         self.assertTrue(icmp.haslayer(IPerror))
4498         inner_ip = icmp[IPerror]
4499         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4500         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4501
4502         dms = self.vapi.nat_det_map_dump()
4503
4504         self.assertEqual(1000, dms[0].ses_num)
4505
4506         # verify IPFIX logging
4507         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4508         sleep(1)
4509         capture = self.pg2.get_capture(2)
4510         ipfix = IPFIXDecoder()
4511         # first load template
4512         for p in capture:
4513             self.assertTrue(p.haslayer(IPFIX))
4514             if p.haslayer(Template):
4515                 ipfix.add_template(p.getlayer(Template))
4516         # verify events in data set
4517         for p in capture:
4518             if p.haslayer(Data):
4519                 data = ipfix.decode_data_set(p.getlayer(Set))
4520                 self.verify_ipfix_max_entries_per_user(data)
4521
4522     def clear_nat_det(self):
4523         """
4524         Clear deterministic NAT configuration.
4525         """
4526         self.vapi.nat_ipfix(enable=0)
4527         self.vapi.nat_det_set_timeouts()
4528         deterministic_mappings = self.vapi.nat_det_map_dump()
4529         for dsm in deterministic_mappings:
4530             self.vapi.nat_det_add_del_map(dsm.in_addr,
4531                                           dsm.in_plen,
4532                                           dsm.out_addr,
4533                                           dsm.out_plen,
4534                                           is_add=0)
4535
4536         interfaces = self.vapi.nat44_interface_dump()
4537         for intf in interfaces:
4538             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4539                                                       intf.is_inside,
4540                                                       is_add=0)
4541
4542     def tearDown(self):
4543         super(TestDeterministicNAT, self).tearDown()
4544         if not self.vpp_dead:
4545             self.logger.info(self.vapi.cli("show nat44 interfaces"))
4546             self.logger.info(
4547                 self.vapi.cli("show nat44 deterministic mappings"))
4548             self.logger.info(
4549                 self.vapi.cli("show nat44 deterministic timeouts"))
4550             self.logger.info(
4551                 self.vapi.cli("show nat44 deterministic sessions"))
4552             self.clear_nat_det()
4553
4554
4555 class TestNAT64(MethodHolder):
4556     """ NAT64 Test Cases """
4557
4558     @classmethod
4559     def setUpConstants(cls):
4560         super(TestNAT64, cls).setUpConstants()
4561         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4562                                 "nat64 st hash buckets 256", "}"])
4563
4564     @classmethod
4565     def setUpClass(cls):
4566         super(TestNAT64, cls).setUpClass()
4567
4568         try:
4569             cls.tcp_port_in = 6303
4570             cls.tcp_port_out = 6303
4571             cls.udp_port_in = 6304
4572             cls.udp_port_out = 6304
4573             cls.icmp_id_in = 6305
4574             cls.icmp_id_out = 6305
4575             cls.nat_addr = '10.0.0.3'
4576             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4577             cls.vrf1_id = 10
4578             cls.vrf1_nat_addr = '10.0.10.3'
4579             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4580                                                    cls.vrf1_nat_addr)
4581             cls.ipfix_src_port = 4739
4582             cls.ipfix_domain_id = 1
4583
4584             cls.create_pg_interfaces(range(5))
4585             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4586             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4587             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4588
4589             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4590
4591             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4592
4593             cls.pg0.generate_remote_hosts(2)
4594
4595             for i in cls.ip6_interfaces:
4596                 i.admin_up()
4597                 i.config_ip6()
4598                 i.configure_ipv6_neighbors()
4599
4600             for i in cls.ip4_interfaces:
4601                 i.admin_up()
4602                 i.config_ip4()
4603                 i.resolve_arp()
4604
4605             cls.pg3.admin_up()
4606             cls.pg3.config_ip4()
4607             cls.pg3.resolve_arp()
4608             cls.pg3.config_ip6()
4609             cls.pg3.configure_ipv6_neighbors()
4610
4611         except Exception:
4612             super(TestNAT64, cls).tearDownClass()
4613             raise
4614
4615     def test_pool(self):
4616         """ Add/delete address to NAT64 pool """
4617         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4618
4619         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4620
4621         addresses = self.vapi.nat64_pool_addr_dump()
4622         self.assertEqual(len(addresses), 1)
4623         self.assertEqual(addresses[0].address, nat_addr)
4624
4625         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4626
4627         addresses = self.vapi.nat64_pool_addr_dump()
4628         self.assertEqual(len(addresses), 0)
4629
4630     def test_interface(self):
4631         """ Enable/disable NAT64 feature on the interface """
4632         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4633         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4634
4635         interfaces = self.vapi.nat64_interface_dump()
4636         self.assertEqual(len(interfaces), 2)
4637         pg0_found = False
4638         pg1_found = False
4639         for intf in interfaces:
4640             if intf.sw_if_index == self.pg0.sw_if_index:
4641                 self.assertEqual(intf.is_inside, 1)
4642                 pg0_found = True
4643             elif intf.sw_if_index == self.pg1.sw_if_index:
4644                 self.assertEqual(intf.is_inside, 0)
4645                 pg1_found = True
4646         self.assertTrue(pg0_found)
4647         self.assertTrue(pg1_found)
4648
4649         features = self.vapi.cli("show interface features pg0")
4650         self.assertNotEqual(features.find('nat64-in2out'), -1)
4651         features = self.vapi.cli("show interface features pg1")
4652         self.assertNotEqual(features.find('nat64-out2in'), -1)
4653
4654         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4655         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4656
4657         interfaces = self.vapi.nat64_interface_dump()
4658         self.assertEqual(len(interfaces), 0)
4659
4660     def test_static_bib(self):
4661         """ Add/delete static BIB entry """
4662         in_addr = socket.inet_pton(socket.AF_INET6,
4663                                    '2001:db8:85a3::8a2e:370:7334')
4664         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4665         in_port = 1234
4666         out_port = 5678
4667         proto = IP_PROTOS.tcp
4668
4669         self.vapi.nat64_add_del_static_bib(in_addr,
4670                                            out_addr,
4671                                            in_port,
4672                                            out_port,
4673                                            proto)
4674         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4675         static_bib_num = 0
4676         for bibe in bib:
4677             if bibe.is_static:
4678                 static_bib_num += 1
4679                 self.assertEqual(bibe.i_addr, in_addr)
4680                 self.assertEqual(bibe.o_addr, out_addr)
4681                 self.assertEqual(bibe.i_port, in_port)
4682                 self.assertEqual(bibe.o_port, out_port)
4683         self.assertEqual(static_bib_num, 1)
4684
4685         self.vapi.nat64_add_del_static_bib(in_addr,
4686                                            out_addr,
4687                                            in_port,
4688                                            out_port,
4689                                            proto,
4690                                            is_add=0)
4691         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4692         static_bib_num = 0
4693         for bibe in bib:
4694             if bibe.is_static:
4695                 static_bib_num += 1
4696         self.assertEqual(static_bib_num, 0)
4697
4698     def test_set_timeouts(self):
4699         """ Set NAT64 timeouts """
4700         # verify default values
4701         timeouts = self.vapi.nat64_get_timeouts()
4702         self.assertEqual(timeouts.udp, 300)
4703         self.assertEqual(timeouts.icmp, 60)
4704         self.assertEqual(timeouts.tcp_trans, 240)
4705         self.assertEqual(timeouts.tcp_est, 7440)
4706         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4707
4708         # set and verify custom values
4709         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4710                                      tcp_est=7450, tcp_incoming_syn=10)
4711         timeouts = self.vapi.nat64_get_timeouts()
4712         self.assertEqual(timeouts.udp, 200)
4713         self.assertEqual(timeouts.icmp, 30)
4714         self.assertEqual(timeouts.tcp_trans, 250)
4715         self.assertEqual(timeouts.tcp_est, 7450)
4716         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4717
4718     def test_dynamic(self):
4719         """ NAT64 dynamic translation test """
4720         self.tcp_port_in = 6303
4721         self.udp_port_in = 6304
4722         self.icmp_id_in = 6305
4723
4724         ses_num_start = self.nat64_get_ses_num()
4725
4726         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4727                                                 self.nat_addr_n)
4728         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4729         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4730
4731         # in2out
4732         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4733         self.pg0.add_stream(pkts)
4734         self.pg_enable_capture(self.pg_interfaces)
4735         self.pg_start()
4736         capture = self.pg1.get_capture(len(pkts))
4737         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4738                                 dst_ip=self.pg1.remote_ip4)
4739
4740         # out2in
4741         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4742         self.pg1.add_stream(pkts)
4743         self.pg_enable_capture(self.pg_interfaces)
4744         self.pg_start()
4745         capture = self.pg0.get_capture(len(pkts))
4746         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4747         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4748
4749         # in2out
4750         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4751         self.pg0.add_stream(pkts)
4752         self.pg_enable_capture(self.pg_interfaces)
4753         self.pg_start()
4754         capture = self.pg1.get_capture(len(pkts))
4755         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4756                                 dst_ip=self.pg1.remote_ip4)
4757
4758         # out2in
4759         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4760         self.pg1.add_stream(pkts)
4761         self.pg_enable_capture(self.pg_interfaces)
4762         self.pg_start()
4763         capture = self.pg0.get_capture(len(pkts))
4764         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4765
4766         ses_num_end = self.nat64_get_ses_num()
4767
4768         self.assertEqual(ses_num_end - ses_num_start, 3)
4769
4770         # tenant with specific VRF
4771         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4772                                                 self.vrf1_nat_addr_n,
4773                                                 vrf_id=self.vrf1_id)
4774         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4775
4776         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4777         self.pg2.add_stream(pkts)
4778         self.pg_enable_capture(self.pg_interfaces)
4779         self.pg_start()
4780         capture = self.pg1.get_capture(len(pkts))
4781         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4782                                 dst_ip=self.pg1.remote_ip4)
4783
4784         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4785         self.pg1.add_stream(pkts)
4786         self.pg_enable_capture(self.pg_interfaces)
4787         self.pg_start()
4788         capture = self.pg2.get_capture(len(pkts))
4789         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4790
4791     def test_static(self):
4792         """ NAT64 static translation test """
4793         self.tcp_port_in = 60303
4794         self.udp_port_in = 60304
4795         self.icmp_id_in = 60305
4796         self.tcp_port_out = 60303
4797         self.udp_port_out = 60304
4798         self.icmp_id_out = 60305
4799
4800         ses_num_start = self.nat64_get_ses_num()
4801
4802         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4803                                                 self.nat_addr_n)
4804         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4805         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4806
4807         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4808                                            self.nat_addr_n,
4809                                            self.tcp_port_in,
4810                                            self.tcp_port_out,
4811                                            IP_PROTOS.tcp)
4812         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4813                                            self.nat_addr_n,
4814                                            self.udp_port_in,
4815                                            self.udp_port_out,
4816                                            IP_PROTOS.udp)
4817         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4818                                            self.nat_addr_n,
4819                                            self.icmp_id_in,
4820                                            self.icmp_id_out,
4821                                            IP_PROTOS.icmp)
4822
4823         # in2out
4824         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4825         self.pg0.add_stream(pkts)
4826         self.pg_enable_capture(self.pg_interfaces)
4827         self.pg_start()
4828         capture = self.pg1.get_capture(len(pkts))
4829         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4830                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4831
4832         # out2in
4833         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4834         self.pg1.add_stream(pkts)
4835         self.pg_enable_capture(self.pg_interfaces)
4836         self.pg_start()
4837         capture = self.pg0.get_capture(len(pkts))
4838         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4839         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4840
4841         ses_num_end = self.nat64_get_ses_num()
4842
4843         self.assertEqual(ses_num_end - ses_num_start, 3)
4844
4845     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4846     def test_session_timeout(self):
4847         """ NAT64 session timeout """
4848         self.icmp_id_in = 1234
4849         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4850                                                 self.nat_addr_n)
4851         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4852         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4853         self.vapi.nat64_set_timeouts(icmp=5)
4854
4855         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4856         self.pg0.add_stream(pkts)
4857         self.pg_enable_capture(self.pg_interfaces)
4858         self.pg_start()
4859         capture = self.pg1.get_capture(len(pkts))
4860
4861         ses_num_before_timeout = self.nat64_get_ses_num()
4862
4863         sleep(15)
4864
4865         # ICMP session after timeout
4866         ses_num_after_timeout = self.nat64_get_ses_num()
4867         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4868
4869     def test_icmp_error(self):
4870         """ NAT64 ICMP Error message translation """
4871         self.tcp_port_in = 6303
4872         self.udp_port_in = 6304
4873         self.icmp_id_in = 6305
4874
4875         ses_num_start = self.nat64_get_ses_num()
4876
4877         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4878                                                 self.nat_addr_n)
4879         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4880         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4881
4882         # send some packets to create sessions
4883         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4884         self.pg0.add_stream(pkts)
4885         self.pg_enable_capture(self.pg_interfaces)
4886         self.pg_start()
4887         capture_ip4 = self.pg1.get_capture(len(pkts))
4888         self.verify_capture_out(capture_ip4,
4889                                 nat_ip=self.nat_addr,
4890                                 dst_ip=self.pg1.remote_ip4)
4891
4892         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4893         self.pg1.add_stream(pkts)
4894         self.pg_enable_capture(self.pg_interfaces)
4895         self.pg_start()
4896         capture_ip6 = self.pg0.get_capture(len(pkts))
4897         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4898         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4899                                    self.pg0.remote_ip6)
4900
4901         # in2out
4902         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4903                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4904                 ICMPv6DestUnreach(code=1) /
4905                 packet[IPv6] for packet in capture_ip6]
4906         self.pg0.add_stream(pkts)
4907         self.pg_enable_capture(self.pg_interfaces)
4908         self.pg_start()
4909         capture = self.pg1.get_capture(len(pkts))
4910         for packet in capture:
4911             try:
4912                 self.assertEqual(packet[IP].src, self.nat_addr)
4913                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4914                 self.assertEqual(packet[ICMP].type, 3)
4915                 self.assertEqual(packet[ICMP].code, 13)
4916                 inner = packet[IPerror]
4917                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4918                 self.assertEqual(inner.dst, self.nat_addr)
4919                 self.check_icmp_checksum(packet)
4920                 if inner.haslayer(TCPerror):
4921                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4922                 elif inner.haslayer(UDPerror):
4923                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4924                 else:
4925                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4926             except:
4927                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4928                 raise
4929
4930         # out2in
4931         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4932                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4933                 ICMP(type=3, code=13) /
4934                 packet[IP] for packet in capture_ip4]
4935         self.pg1.add_stream(pkts)
4936         self.pg_enable_capture(self.pg_interfaces)
4937         self.pg_start()
4938         capture = self.pg0.get_capture(len(pkts))
4939         for packet in capture:
4940             try:
4941                 self.assertEqual(packet[IPv6].src, ip.src)
4942                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4943                 icmp = packet[ICMPv6DestUnreach]
4944                 self.assertEqual(icmp.code, 1)
4945                 inner = icmp[IPerror6]
4946                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4947                 self.assertEqual(inner.dst, ip.src)
4948                 self.check_icmpv6_checksum(packet)
4949                 if inner.haslayer(TCPerror):
4950                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4951                 elif inner.haslayer(UDPerror):
4952                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4953                 else:
4954                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4955                                      self.icmp_id_in)
4956             except:
4957                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4958                 raise
4959
4960     def test_hairpinning(self):
4961         """ NAT64 hairpinning """
4962
4963         client = self.pg0.remote_hosts[0]
4964         server = self.pg0.remote_hosts[1]
4965         server_tcp_in_port = 22
4966         server_tcp_out_port = 4022
4967         server_udp_in_port = 23
4968         server_udp_out_port = 4023
4969         client_tcp_in_port = 1234
4970         client_udp_in_port = 1235
4971         client_tcp_out_port = 0
4972         client_udp_out_port = 0
4973         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4974         nat_addr_ip6 = ip.src
4975
4976         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4977                                                 self.nat_addr_n)
4978         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4979         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4980
4981         self.vapi.nat64_add_del_static_bib(server.ip6n,
4982                                            self.nat_addr_n,
4983                                            server_tcp_in_port,
4984                                            server_tcp_out_port,
4985                                            IP_PROTOS.tcp)
4986         self.vapi.nat64_add_del_static_bib(server.ip6n,
4987                                            self.nat_addr_n,
4988                                            server_udp_in_port,
4989                                            server_udp_out_port,
4990                                            IP_PROTOS.udp)
4991
4992         # client to server
4993         pkts = []
4994         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4996              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4997         pkts.append(p)
4998         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4999              IPv6(src=client.ip6, dst=nat_addr_ip6) /
5000              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5001         pkts.append(p)
5002         self.pg0.add_stream(pkts)
5003         self.pg_enable_capture(self.pg_interfaces)
5004         self.pg_start()
5005         capture = self.pg0.get_capture(len(pkts))
5006         for packet in capture:
5007             try:
5008                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5009                 self.assertEqual(packet[IPv6].dst, server.ip6)
5010                 if packet.haslayer(TCP):
5011                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5012                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5013                     self.check_tcp_checksum(packet)
5014                     client_tcp_out_port = packet[TCP].sport
5015                 else:
5016                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5017                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
5018                     self.check_udp_checksum(packet)
5019                     client_udp_out_port = packet[UDP].sport
5020             except:
5021                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5022                 raise
5023
5024         # server to client
5025         pkts = []
5026         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5027              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5028              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5029         pkts.append(p)
5030         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5031              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5032              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5033         pkts.append(p)
5034         self.pg0.add_stream(pkts)
5035         self.pg_enable_capture(self.pg_interfaces)
5036         self.pg_start()
5037         capture = self.pg0.get_capture(len(pkts))
5038         for packet in capture:
5039             try:
5040                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5041                 self.assertEqual(packet[IPv6].dst, client.ip6)
5042                 if packet.haslayer(TCP):
5043                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5044                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5045                     self.check_tcp_checksum(packet)
5046                 else:
5047                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
5048                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
5049                     self.check_udp_checksum(packet)
5050             except:
5051                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5052                 raise
5053
5054         # ICMP error
5055         pkts = []
5056         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5057                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5058                 ICMPv6DestUnreach(code=1) /
5059                 packet[IPv6] for packet in capture]
5060         self.pg0.add_stream(pkts)
5061         self.pg_enable_capture(self.pg_interfaces)
5062         self.pg_start()
5063         capture = self.pg0.get_capture(len(pkts))
5064         for packet in capture:
5065             try:
5066                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5067                 self.assertEqual(packet[IPv6].dst, server.ip6)
5068                 icmp = packet[ICMPv6DestUnreach]
5069                 self.assertEqual(icmp.code, 1)
5070                 inner = icmp[IPerror6]
5071                 self.assertEqual(inner.src, server.ip6)
5072                 self.assertEqual(inner.dst, nat_addr_ip6)
5073                 self.check_icmpv6_checksum(packet)
5074                 if inner.haslayer(TCPerror):
5075                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5076                     self.assertEqual(inner[TCPerror].dport,
5077                                      client_tcp_out_port)
5078                 else:
5079                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5080                     self.assertEqual(inner[UDPerror].dport,
5081                                      client_udp_out_port)
5082             except:
5083                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5084                 raise
5085
5086     def test_prefix(self):
5087         """ NAT64 Network-Specific Prefix """
5088
5089         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5090                                                 self.nat_addr_n)
5091         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5092         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5093         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5094                                                 self.vrf1_nat_addr_n,
5095                                                 vrf_id=self.vrf1_id)
5096         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5097
5098         # Add global prefix
5099         global_pref64 = "2001:db8::"
5100         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5101         global_pref64_len = 32
5102         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5103
5104         prefix = self.vapi.nat64_prefix_dump()
5105         self.assertEqual(len(prefix), 1)
5106         self.assertEqual(prefix[0].prefix, global_pref64_n)
5107         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5108         self.assertEqual(prefix[0].vrf_id, 0)
5109
5110         # Add tenant specific prefix
5111         vrf1_pref64 = "2001:db8:122:300::"
5112         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5113         vrf1_pref64_len = 56
5114         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5115                                        vrf1_pref64_len,
5116                                        vrf_id=self.vrf1_id)
5117         prefix = self.vapi.nat64_prefix_dump()
5118         self.assertEqual(len(prefix), 2)
5119
5120         # Global prefix
5121         pkts = self.create_stream_in_ip6(self.pg0,
5122                                          self.pg1,
5123                                          pref=global_pref64,
5124                                          plen=global_pref64_len)
5125         self.pg0.add_stream(pkts)
5126         self.pg_enable_capture(self.pg_interfaces)
5127         self.pg_start()
5128         capture = self.pg1.get_capture(len(pkts))
5129         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5130                                 dst_ip=self.pg1.remote_ip4)
5131
5132         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5133         self.pg1.add_stream(pkts)
5134         self.pg_enable_capture(self.pg_interfaces)
5135         self.pg_start()
5136         capture = self.pg0.get_capture(len(pkts))
5137         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5138                                   global_pref64,
5139                                   global_pref64_len)
5140         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5141
5142         # Tenant specific prefix
5143         pkts = self.create_stream_in_ip6(self.pg2,
5144                                          self.pg1,
5145                                          pref=vrf1_pref64,
5146                                          plen=vrf1_pref64_len)
5147         self.pg2.add_stream(pkts)
5148         self.pg_enable_capture(self.pg_interfaces)
5149         self.pg_start()
5150         capture = self.pg1.get_capture(len(pkts))
5151         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5152                                 dst_ip=self.pg1.remote_ip4)
5153
5154         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5155         self.pg1.add_stream(pkts)
5156         self.pg_enable_capture(self.pg_interfaces)
5157         self.pg_start()
5158         capture = self.pg2.get_capture(len(pkts))
5159         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5160                                   vrf1_pref64,
5161                                   vrf1_pref64_len)
5162         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5163
5164     def test_unknown_proto(self):
5165         """ NAT64 translate packet with unknown protocol """
5166
5167         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5168                                                 self.nat_addr_n)
5169         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5170         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5171         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5172
5173         # in2out
5174         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5175              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5176              TCP(sport=self.tcp_port_in, dport=20))
5177         self.pg0.add_stream(p)
5178         self.pg_enable_capture(self.pg_interfaces)
5179         self.pg_start()
5180         p = self.pg1.get_capture(1)
5181
5182         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5183              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5184              GRE() /
5185              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5186              TCP(sport=1234, dport=1234))
5187         self.pg0.add_stream(p)
5188         self.pg_enable_capture(self.pg_interfaces)
5189         self.pg_start()
5190         p = self.pg1.get_capture(1)
5191         packet = p[0]
5192         try:
5193             self.assertEqual(packet[IP].src, self.nat_addr)
5194             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5195             self.assertTrue(packet.haslayer(GRE))
5196             self.check_ip_checksum(packet)
5197         except:
5198             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5199             raise
5200
5201         # out2in
5202         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5203              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5204              GRE() /
5205              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5206              TCP(sport=1234, dport=1234))
5207         self.pg1.add_stream(p)
5208         self.pg_enable_capture(self.pg_interfaces)
5209         self.pg_start()
5210         p = self.pg0.get_capture(1)
5211         packet = p[0]
5212         try:
5213             self.assertEqual(packet[IPv6].src, remote_ip6)
5214             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5215             self.assertEqual(packet[IPv6].nh, 47)
5216         except:
5217             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5218             raise
5219
5220     def test_hairpinning_unknown_proto(self):
5221         """ NAT64 translate packet with unknown protocol - hairpinning """
5222
5223         client = self.pg0.remote_hosts[0]
5224         server = self.pg0.remote_hosts[1]
5225         server_tcp_in_port = 22
5226         server_tcp_out_port = 4022
5227         client_tcp_in_port = 1234
5228         client_tcp_out_port = 1235
5229         server_nat_ip = "10.0.0.100"
5230         client_nat_ip = "10.0.0.110"
5231         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5232         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5233         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5234         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5235
5236         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5237                                                 client_nat_ip_n)
5238         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5239         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5240
5241         self.vapi.nat64_add_del_static_bib(server.ip6n,
5242                                            server_nat_ip_n,
5243                                            server_tcp_in_port,
5244                                            server_tcp_out_port,
5245                                            IP_PROTOS.tcp)
5246
5247         self.vapi.nat64_add_del_static_bib(server.ip6n,
5248                                            server_nat_ip_n,
5249                                            0,
5250                                            0,
5251                                            IP_PROTOS.gre)
5252
5253         self.vapi.nat64_add_del_static_bib(client.ip6n,
5254                                            client_nat_ip_n,
5255                                            client_tcp_in_port,
5256                                            client_tcp_out_port,
5257                                            IP_PROTOS.tcp)
5258
5259         # client to server
5260         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5261              IPv6(src=client.ip6, dst=server_nat_ip6) /
5262              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5263         self.pg0.add_stream(p)
5264         self.pg_enable_capture(self.pg_interfaces)
5265         self.pg_start()
5266         p = self.pg0.get_capture(1)
5267
5268         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5269              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5270              GRE() /
5271              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5272              TCP(sport=1234, dport=1234))
5273         self.pg0.add_stream(p)
5274         self.pg_enable_capture(self.pg_interfaces)
5275         self.pg_start()
5276         p = self.pg0.get_capture(1)
5277         packet = p[0]
5278         try:
5279             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5280             self.assertEqual(packet[IPv6].dst, server.ip6)
5281             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5282         except:
5283             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5284             raise
5285
5286         # server to client
5287         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5288              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5289              GRE() /
5290              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5291              TCP(sport=1234, dport=1234))
5292         self.pg0.add_stream(p)
5293         self.pg_enable_capture(self.pg_interfaces)
5294         self.pg_start()
5295         p = self.pg0.get_capture(1)
5296         packet = p[0]
5297         try:
5298             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5299             self.assertEqual(packet[IPv6].dst, client.ip6)
5300             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5301         except:
5302             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5303             raise
5304
5305     def test_one_armed_nat64(self):
5306         """ One armed NAT64 """
5307         external_port = 0
5308         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5309                                            '64:ff9b::',
5310                                            96)
5311
5312         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5313                                                 self.nat_addr_n)
5314         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5315         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5316
5317         # in2out
5318         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5319              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5320              TCP(sport=12345, dport=80))
5321         self.pg3.add_stream(p)
5322         self.pg_enable_capture(self.pg_interfaces)
5323         self.pg_start()
5324         capture = self.pg3.get_capture(1)
5325         p = capture[0]
5326         try:
5327             ip = p[IP]
5328             tcp = p[TCP]
5329             self.assertEqual(ip.src, self.nat_addr)
5330             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5331             self.assertNotEqual(tcp.sport, 12345)
5332             external_port = tcp.sport
5333             self.assertEqual(tcp.dport, 80)
5334             self.check_tcp_checksum(p)
5335             self.check_ip_checksum(p)
5336         except:
5337             self.logger.error(ppp("Unexpected or invalid packet:", p))
5338             raise
5339
5340         # out2in
5341         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5342              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5343              TCP(sport=80, dport=external_port))
5344         self.pg3.add_stream(p)
5345         self.pg_enable_capture(self.pg_interfaces)
5346         self.pg_start()
5347         capture = self.pg3.get_capture(1)
5348         p = capture[0]
5349         try:
5350             ip = p[IPv6]
5351             tcp = p[TCP]
5352             self.assertEqual(ip.src, remote_host_ip6)
5353             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5354             self.assertEqual(tcp.sport, 80)
5355             self.assertEqual(tcp.dport, 12345)
5356             self.check_tcp_checksum(p)
5357         except:
5358             self.logger.error(ppp("Unexpected or invalid packet:", p))
5359             raise
5360
5361     def test_frag_in_order(self):
5362         """ NAT64 translate fragments arriving in order """
5363         self.tcp_port_in = random.randint(1025, 65535)
5364
5365         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5366                                                 self.nat_addr_n)
5367         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5368         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5369
5370         reass = self.vapi.nat_reass_dump()
5371         reass_n_start = len(reass)
5372
5373         # in2out
5374         data = 'a' * 200
5375         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5376                                            self.tcp_port_in, 20, data)
5377         self.pg0.add_stream(pkts)
5378         self.pg_enable_capture(self.pg_interfaces)
5379         self.pg_start()
5380         frags = self.pg1.get_capture(len(pkts))
5381         p = self.reass_frags_and_verify(frags,
5382                                         self.nat_addr,
5383                                         self.pg1.remote_ip4)
5384         self.assertEqual(p[TCP].dport, 20)
5385         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5386         self.tcp_port_out = p[TCP].sport
5387         self.assertEqual(data, p[Raw].load)
5388
5389         # out2in
5390         data = "A" * 4 + "b" * 16 + "C" * 3
5391         pkts = self.create_stream_frag(self.pg1,
5392                                        self.nat_addr,
5393                                        20,
5394                                        self.tcp_port_out,
5395                                        data)
5396         self.pg1.add_stream(pkts)
5397         self.pg_enable_capture(self.pg_interfaces)
5398         self.pg_start()
5399         frags = self.pg0.get_capture(len(pkts))
5400         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5401         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5402         self.assertEqual(p[TCP].sport, 20)
5403         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5404         self.assertEqual(data, p[Raw].load)
5405
5406         reass = self.vapi.nat_reass_dump()
5407         reass_n_end = len(reass)
5408
5409         self.assertEqual(reass_n_end - reass_n_start, 2)
5410
5411     def test_reass_hairpinning(self):
5412         """ NAT64 fragments hairpinning """
5413         data = 'a' * 200
5414         client = self.pg0.remote_hosts[0]
5415         server = self.pg0.remote_hosts[1]
5416         server_in_port = random.randint(1025, 65535)
5417         server_out_port = random.randint(1025, 65535)
5418         client_in_port = random.randint(1025, 65535)
5419         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5420         nat_addr_ip6 = ip.src
5421
5422         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5423                                                 self.nat_addr_n)
5424         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5425         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5426
5427         # add static BIB entry for server
5428         self.vapi.nat64_add_del_static_bib(server.ip6n,
5429                                            self.nat_addr_n,
5430                                            server_in_port,
5431                                            server_out_port,
5432                                            IP_PROTOS.tcp)
5433
5434         # send packet from host to server
5435         pkts = self.create_stream_frag_ip6(self.pg0,
5436                                            self.nat_addr,
5437                                            client_in_port,
5438                                            server_out_port,
5439                                            data)
5440         self.pg0.add_stream(pkts)
5441         self.pg_enable_capture(self.pg_interfaces)
5442         self.pg_start()
5443         frags = self.pg0.get_capture(len(pkts))
5444         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5445         self.assertNotEqual(p[TCP].sport, client_in_port)
5446         self.assertEqual(p[TCP].dport, server_in_port)
5447         self.assertEqual(data, p[Raw].load)
5448
5449     def test_frag_out_of_order(self):
5450         """ NAT64 translate fragments arriving out of order """
5451         self.tcp_port_in = random.randint(1025, 65535)
5452
5453         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5454                                                 self.nat_addr_n)
5455         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5456         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5457
5458         # in2out
5459         data = 'a' * 200
5460         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5461                                            self.tcp_port_in, 20, data)
5462         pkts.reverse()
5463         self.pg0.add_stream(pkts)
5464         self.pg_enable_capture(self.pg_interfaces)
5465         self.pg_start()
5466         frags = self.pg1.get_capture(len(pkts))
5467         p = self.reass_frags_and_verify(frags,
5468                                         self.nat_addr,
5469                                         self.pg1.remote_ip4)
5470         self.assertEqual(p[TCP].dport, 20)
5471         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5472         self.tcp_port_out = p[TCP].sport
5473         self.assertEqual(data, p[Raw].load)
5474
5475         # out2in
5476         data = "A" * 4 + "B" * 16 + "C" * 3
5477         pkts = self.create_stream_frag(self.pg1,
5478                                        self.nat_addr,
5479                                        20,
5480                                        self.tcp_port_out,
5481                                        data)
5482         pkts.reverse()
5483         self.pg1.add_stream(pkts)
5484         self.pg_enable_capture(self.pg_interfaces)
5485         self.pg_start()
5486         frags = self.pg0.get_capture(len(pkts))
5487         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5488         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5489         self.assertEqual(p[TCP].sport, 20)
5490         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5491         self.assertEqual(data, p[Raw].load)
5492
5493     def test_interface_addr(self):
5494         """ Acquire NAT64 pool addresses from interface """
5495         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5496
5497         # no address in NAT64 pool
5498         adresses = self.vapi.nat44_address_dump()
5499         self.assertEqual(0, len(adresses))
5500
5501         # configure interface address and check NAT64 address pool
5502         self.pg4.config_ip4()
5503         addresses = self.vapi.nat64_pool_addr_dump()
5504         self.assertEqual(len(addresses), 1)
5505         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5506
5507         # remove interface address and check NAT64 address pool
5508         self.pg4.unconfig_ip4()
5509         addresses = self.vapi.nat64_pool_addr_dump()
5510         self.assertEqual(0, len(adresses))
5511
5512     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5513     def test_ipfix_max_bibs_sessions(self):
5514         """ IPFIX logging maximum session and BIB entries exceeded """
5515         max_bibs = 1280
5516         max_sessions = 2560
5517         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5518                                            '64:ff9b::',
5519                                            96)
5520
5521         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5522                                                 self.nat_addr_n)
5523         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5524         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5525
5526         pkts = []
5527         src = ""
5528         for i in range(0, max_bibs):
5529             src = "fd01:aa::%x" % (i)
5530             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5531                  IPv6(src=src, dst=remote_host_ip6) /
5532                  TCP(sport=12345, dport=80))
5533             pkts.append(p)
5534             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5535                  IPv6(src=src, dst=remote_host_ip6) /
5536                  TCP(sport=12345, dport=22))
5537             pkts.append(p)
5538         self.pg0.add_stream(pkts)
5539         self.pg_enable_capture(self.pg_interfaces)
5540         self.pg_start()
5541         self.pg1.get_capture(max_sessions)
5542
5543         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5544                                      src_address=self.pg3.local_ip4n,
5545                                      path_mtu=512,
5546                                      template_interval=10)
5547         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5548                             src_port=self.ipfix_src_port)
5549
5550         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5551              IPv6(src=src, dst=remote_host_ip6) /
5552              TCP(sport=12345, dport=25))
5553         self.pg0.add_stream(p)
5554         self.pg_enable_capture(self.pg_interfaces)
5555         self.pg_start()
5556         self.pg1.get_capture(0)
5557         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5558         capture = self.pg3.get_capture(9)
5559         ipfix = IPFIXDecoder()
5560         # first load template
5561         for p in capture:
5562             self.assertTrue(p.haslayer(IPFIX))
5563             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5564             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5565             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5566             self.assertEqual(p[UDP].dport, 4739)
5567             self.assertEqual(p[IPFIX].observationDomainID,
5568                              self.ipfix_domain_id)
5569             if p.haslayer(Template):
5570                 ipfix.add_template(p.getlayer(Template))
5571         # verify events in data set
5572         for p in capture:
5573             if p.haslayer(Data):
5574                 data = ipfix.decode_data_set(p.getlayer(Set))
5575                 self.verify_ipfix_max_sessions(data, max_sessions)
5576
5577         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5578              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5579              TCP(sport=12345, dport=80))
5580         self.pg0.add_stream(p)
5581         self.pg_enable_capture(self.pg_interfaces)
5582         self.pg_start()
5583         self.pg1.get_capture(0)
5584         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5585         capture = self.pg3.get_capture(1)
5586         # verify events in data set
5587         for p in capture:
5588             self.assertTrue(p.haslayer(IPFIX))
5589             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5590             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5591             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5592             self.assertEqual(p[UDP].dport, 4739)
5593             self.assertEqual(p[IPFIX].observationDomainID,
5594                              self.ipfix_domain_id)
5595             if p.haslayer(Data):
5596                 data = ipfix.decode_data_set(p.getlayer(Set))
5597                 self.verify_ipfix_max_bibs(data, max_bibs)
5598
5599     def test_ipfix_max_frags(self):
5600         """ IPFIX logging maximum fragments pending reassembly exceeded """
5601         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5602                                                 self.nat_addr_n)
5603         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5604         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5605         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5606         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5607                                      src_address=self.pg3.local_ip4n,
5608                                      path_mtu=512,
5609                                      template_interval=10)
5610         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5611                             src_port=self.ipfix_src_port)
5612
5613         data = 'a' * 200
5614         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5615                                            self.tcp_port_in, 20, data)
5616         self.pg0.add_stream(pkts[-1])
5617         self.pg_enable_capture(self.pg_interfaces)
5618         self.pg_start()
5619         self.pg1.get_capture(0)
5620         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5621         capture = self.pg3.get_capture(9)
5622         ipfix = IPFIXDecoder()
5623         # first load template
5624         for p in capture:
5625             self.assertTrue(p.haslayer(IPFIX))
5626             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5627             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5628             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5629             self.assertEqual(p[UDP].dport, 4739)
5630             self.assertEqual(p[IPFIX].observationDomainID,
5631                              self.ipfix_domain_id)
5632             if p.haslayer(Template):
5633                 ipfix.add_template(p.getlayer(Template))
5634         # verify events in data set
5635         for p in capture:
5636             if p.haslayer(Data):
5637                 data = ipfix.decode_data_set(p.getlayer(Set))
5638                 self.verify_ipfix_max_fragments_ip6(data, 0,
5639                                                     self.pg0.remote_ip6n)
5640
5641     def test_ipfix_bib_ses(self):
5642         """ IPFIX logging NAT64 BIB/session create and delete events """
5643         self.tcp_port_in = random.randint(1025, 65535)
5644         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5645                                            '64:ff9b::',
5646                                            96)
5647
5648         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5649                                                 self.nat_addr_n)
5650         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5651         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5652         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5653                                      src_address=self.pg3.local_ip4n,
5654                                      path_mtu=512,
5655                                      template_interval=10)
5656         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5657                             src_port=self.ipfix_src_port)
5658
5659         # Create
5660         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5661              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5662              TCP(sport=self.tcp_port_in, dport=25))
5663         self.pg0.add_stream(p)
5664         self.pg_enable_capture(self.pg_interfaces)
5665         self.pg_start()
5666         p = self.pg1.get_capture(1)
5667         self.tcp_port_out = p[0][TCP].sport
5668         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5669         capture = self.pg3.get_capture(10)
5670         ipfix = IPFIXDecoder()
5671         # first load template
5672         for p in capture:
5673             self.assertTrue(p.haslayer(IPFIX))
5674             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5675             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5676             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5677             self.assertEqual(p[UDP].dport, 4739)
5678             self.assertEqual(p[IPFIX].observationDomainID,
5679                              self.ipfix_domain_id)
5680             if p.haslayer(Template):
5681                 ipfix.add_template(p.getlayer(Template))
5682         # verify events in data set
5683         for p in capture:
5684             if p.haslayer(Data):
5685                 data = ipfix.decode_data_set(p.getlayer(Set))
5686                 if ord(data[0][230]) == 10:
5687                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5688                 elif ord(data[0][230]) == 6:
5689                     self.verify_ipfix_nat64_ses(data,
5690                                                 1,
5691                                                 self.pg0.remote_ip6n,
5692                                                 self.pg1.remote_ip4,
5693                                                 25)
5694                 else:
5695                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5696
5697         # Delete
5698         self.pg_enable_capture(self.pg_interfaces)
5699         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5700                                                 self.nat_addr_n,
5701                                                 is_add=0)
5702         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5703         capture = self.pg3.get_capture(2)
5704         # verify events in data set
5705         for p in capture:
5706             self.assertTrue(p.haslayer(IPFIX))
5707             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5708             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5709             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5710             self.assertEqual(p[UDP].dport, 4739)
5711             self.assertEqual(p[IPFIX].observationDomainID,
5712                              self.ipfix_domain_id)
5713             if p.haslayer(Data):
5714                 data = ipfix.decode_data_set(p.getlayer(Set))
5715                 if ord(data[0][230]) == 11:
5716                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5717                 elif ord(data[0][230]) == 7:
5718                     self.verify_ipfix_nat64_ses(data,
5719                                                 0,
5720                                                 self.pg0.remote_ip6n,
5721                                                 self.pg1.remote_ip4,
5722                                                 25)
5723                 else:
5724                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5725
5726     def nat64_get_ses_num(self):
5727         """
5728         Return number of active NAT64 sessions.
5729         """
5730         st = self.vapi.nat64_st_dump()
5731         return len(st)
5732
5733     def clear_nat64(self):
5734         """
5735         Clear NAT64 configuration.
5736         """
5737         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5738                             domain_id=self.ipfix_domain_id)
5739         self.ipfix_src_port = 4739
5740         self.ipfix_domain_id = 1
5741
5742         self.vapi.nat64_set_timeouts()
5743
5744         interfaces = self.vapi.nat64_interface_dump()
5745         for intf in interfaces:
5746             if intf.is_inside > 1:
5747                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5748                                                   0,
5749                                                   is_add=0)
5750             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5751                                               intf.is_inside,
5752                                               is_add=0)
5753
5754         bib = self.vapi.nat64_bib_dump(255)
5755         for bibe in bib:
5756             if bibe.is_static:
5757                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5758                                                    bibe.o_addr,
5759                                                    bibe.i_port,
5760                                                    bibe.o_port,
5761                                                    bibe.proto,
5762                                                    bibe.vrf_id,
5763                                                    is_add=0)
5764
5765         adresses = self.vapi.nat64_pool_addr_dump()
5766         for addr in adresses:
5767             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5768                                                     addr.address,
5769                                                     vrf_id=addr.vrf_id,
5770                                                     is_add=0)
5771
5772         prefixes = self.vapi.nat64_prefix_dump()
5773         for prefix in prefixes:
5774             self.vapi.nat64_add_del_prefix(prefix.prefix,
5775                                            prefix.prefix_len,
5776                                            vrf_id=prefix.vrf_id,
5777                                            is_add=0)
5778
5779     def tearDown(self):
5780         super(TestNAT64, self).tearDown()
5781         if not self.vpp_dead:
5782             self.logger.info(self.vapi.cli("show nat64 pool"))
5783             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5784             self.logger.info(self.vapi.cli("show nat64 prefix"))
5785             self.logger.info(self.vapi.cli("show nat64 bib all"))
5786             self.logger.info(self.vapi.cli("show nat64 session table all"))
5787             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5788             self.clear_nat64()
5789
5790
5791 class TestDSlite(MethodHolder):
5792     """ DS-Lite Test Cases """
5793
5794     @classmethod
5795     def setUpClass(cls):
5796         super(TestDSlite, cls).setUpClass()
5797
5798         try:
5799             cls.nat_addr = '10.0.0.3'
5800             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5801
5802             cls.create_pg_interfaces(range(2))
5803             cls.pg0.admin_up()
5804             cls.pg0.config_ip4()
5805             cls.pg0.resolve_arp()
5806             cls.pg1.admin_up()
5807             cls.pg1.config_ip6()
5808             cls.pg1.generate_remote_hosts(2)
5809             cls.pg1.configure_ipv6_neighbors()
5810
5811         except Exception:
5812             super(TestDSlite, cls).tearDownClass()
5813             raise
5814
5815     def test_dslite(self):
5816         """ Test DS-Lite """
5817         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5818                                                  self.nat_addr_n)
5819         aftr_ip4 = '192.0.0.1'
5820         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5821         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5822         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5823         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5824
5825         # UDP
5826         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5827              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5828              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5829              UDP(sport=20000, dport=10000))
5830         self.pg1.add_stream(p)
5831         self.pg_enable_capture(self.pg_interfaces)
5832         self.pg_start()
5833         capture = self.pg0.get_capture(1)
5834         capture = capture[0]
5835         self.assertFalse(capture.haslayer(IPv6))
5836         self.assertEqual(capture[IP].src, self.nat_addr)
5837         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5838         self.assertNotEqual(capture[UDP].sport, 20000)
5839         self.assertEqual(capture[UDP].dport, 10000)
5840         self.check_ip_checksum(capture)
5841         out_port = capture[UDP].sport
5842
5843         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5844              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5845              UDP(sport=10000, dport=out_port))
5846         self.pg0.add_stream(p)
5847         self.pg_enable_capture(self.pg_interfaces)
5848         self.pg_start()
5849         capture = self.pg1.get_capture(1)
5850         capture = capture[0]
5851         self.assertEqual(capture[IPv6].src, aftr_ip6)
5852         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5853         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5854         self.assertEqual(capture[IP].dst, '192.168.1.1')
5855         self.assertEqual(capture[UDP].sport, 10000)
5856         self.assertEqual(capture[UDP].dport, 20000)
5857         self.check_ip_checksum(capture)
5858
5859         # TCP
5860         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5861              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5862              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5863              TCP(sport=20001, dport=10001))
5864         self.pg1.add_stream(p)
5865         self.pg_enable_capture(self.pg_interfaces)
5866         self.pg_start()
5867         capture = self.pg0.get_capture(1)
5868         capture = capture[0]
5869         self.assertFalse(capture.haslayer(IPv6))
5870         self.assertEqual(capture[IP].src, self.nat_addr)
5871         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5872         self.assertNotEqual(capture[TCP].sport, 20001)
5873         self.assertEqual(capture[TCP].dport, 10001)
5874         self.check_ip_checksum(capture)
5875         self.check_tcp_checksum(capture)
5876         out_port = capture[TCP].sport
5877
5878         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5879              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5880              TCP(sport=10001, dport=out_port))
5881         self.pg0.add_stream(p)
5882         self.pg_enable_capture(self.pg_interfaces)
5883         self.pg_start()
5884         capture = self.pg1.get_capture(1)
5885         capture = capture[0]
5886         self.assertEqual(capture[IPv6].src, aftr_ip6)
5887         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5888         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5889         self.assertEqual(capture[IP].dst, '192.168.1.1')
5890         self.assertEqual(capture[TCP].sport, 10001)
5891         self.assertEqual(capture[TCP].dport, 20001)
5892         self.check_ip_checksum(capture)
5893         self.check_tcp_checksum(capture)
5894
5895         # ICMP
5896         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5897              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5898              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5899              ICMP(id=4000, type='echo-request'))
5900         self.pg1.add_stream(p)
5901         self.pg_enable_capture(self.pg_interfaces)
5902         self.pg_start()
5903         capture = self.pg0.get_capture(1)
5904         capture = capture[0]
5905         self.assertFalse(capture.haslayer(IPv6))
5906         self.assertEqual(capture[IP].src, self.nat_addr)
5907         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5908         self.assertNotEqual(capture[ICMP].id, 4000)
5909         self.check_ip_checksum(capture)
5910         self.check_icmp_checksum(capture)
5911         out_id = capture[ICMP].id
5912
5913         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5914              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5915              ICMP(id=out_id, type='echo-reply'))
5916         self.pg0.add_stream(p)
5917         self.pg_enable_capture(self.pg_interfaces)
5918         self.pg_start()
5919         capture = self.pg1.get_capture(1)
5920         capture = capture[0]
5921         self.assertEqual(capture[IPv6].src, aftr_ip6)
5922         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5923         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5924         self.assertEqual(capture[IP].dst, '192.168.1.1')
5925         self.assertEqual(capture[ICMP].id, 4000)
5926         self.check_ip_checksum(capture)
5927         self.check_icmp_checksum(capture)
5928
5929         # ping DS-Lite AFTR tunnel endpoint address
5930         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5931              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5932              ICMPv6EchoRequest())
5933         self.pg1.add_stream(p)
5934         self.pg_enable_capture(self.pg_interfaces)
5935         self.pg_start()
5936         capture = self.pg1.get_capture(1)
5937         self.assertEqual(1, len(capture))
5938         capture = capture[0]
5939         self.assertEqual(capture[IPv6].src, aftr_ip6)
5940         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5941         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5942
5943     def tearDown(self):
5944         super(TestDSlite, self).tearDown()
5945         if not self.vpp_dead:
5946             self.logger.info(self.vapi.cli("show dslite pool"))
5947             self.logger.info(
5948                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5949             self.logger.info(self.vapi.cli("show dslite sessions"))
5950
5951
5952 class TestDSliteCE(MethodHolder):
5953     """ DS-Lite CE Test Cases """
5954
5955     @classmethod
5956     def setUpConstants(cls):
5957         super(TestDSliteCE, cls).setUpConstants()
5958         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5959
5960     @classmethod
5961     def setUpClass(cls):
5962         super(TestDSliteCE, cls).setUpClass()
5963
5964         try:
5965             cls.create_pg_interfaces(range(2))
5966             cls.pg0.admin_up()
5967             cls.pg0.config_ip4()
5968             cls.pg0.resolve_arp()
5969             cls.pg1.admin_up()
5970             cls.pg1.config_ip6()
5971             cls.pg1.generate_remote_hosts(1)
5972             cls.pg1.configure_ipv6_neighbors()
5973
5974         except Exception:
5975             super(TestDSliteCE, cls).tearDownClass()
5976             raise
5977
5978     def test_dslite_ce(self):
5979         """ Test DS-Lite CE """
5980
5981         b4_ip4 = '192.0.0.2'
5982         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5983         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5984         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5985         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5986
5987         aftr_ip4 = '192.0.0.1'
5988         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5989         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5990         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5991         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5992
5993         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5994                                    dst_address_length=128,
5995                                    next_hop_address=self.pg1.remote_ip6n,
5996                                    next_hop_sw_if_index=self.pg1.sw_if_index,
5997                                    is_ipv6=1)
5998
5999         # UDP encapsulation
6000         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6001              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6002              UDP(sport=10000, dport=20000))
6003         self.pg0.add_stream(p)
6004         self.pg_enable_capture(self.pg_interfaces)
6005         self.pg_start()
6006         capture = self.pg1.get_capture(1)
6007         capture = capture[0]
6008         self.assertEqual(capture[IPv6].src, b4_ip6)
6009         self.assertEqual(capture[IPv6].dst, aftr_ip6)
6010         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6011         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6012         self.assertEqual(capture[UDP].sport, 10000)
6013         self.assertEqual(capture[UDP].dport, 20000)
6014         self.check_ip_checksum(capture)
6015
6016         # UDP decapsulation
6017         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6018              IPv6(dst=b4_ip6, src=aftr_ip6) /
6019              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6020              UDP(sport=20000, dport=10000))
6021         self.pg1.add_stream(p)
6022         self.pg_enable_capture(self.pg_interfaces)
6023         self.pg_start()
6024         capture = self.pg0.get_capture(1)
6025         capture = capture[0]
6026         self.assertFalse(capture.haslayer(IPv6))
6027         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6028         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6029         self.assertEqual(capture[UDP].sport, 20000)
6030         self.assertEqual(capture[UDP].dport, 10000)
6031         self.check_ip_checksum(capture)
6032
6033         # ping DS-Lite B4 tunnel endpoint address
6034         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6035              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6036              ICMPv6EchoRequest())
6037         self.pg1.add_stream(p)
6038         self.pg_enable_capture(self.pg_interfaces)
6039         self.pg_start()
6040         capture = self.pg1.get_capture(1)
6041         self.assertEqual(1, len(capture))
6042         capture = capture[0]
6043         self.assertEqual(capture[IPv6].src, b4_ip6)
6044         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6045         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6046
6047     def tearDown(self):
6048         super(TestDSliteCE, self).tearDown()
6049         if not self.vpp_dead:
6050             self.logger.info(
6051                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6052             self.logger.info(
6053                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6054
6055 if __name__ == '__main__':
6056     unittest.main(testRunner=VppTestRunner)