NAT44: interface output feature and service host direct access (VPP-1176)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param dst_ip: Destination address
144         :param ttl: TTL of generated packets
145         """
146         if dst_ip is None:
147             dst_ip = out_if.remote_ip4
148
149         pkts = []
150         # TCP
151         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153              TCP(sport=self.tcp_port_in, dport=20))
154         pkts.append(p)
155
156         # UDP
157         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159              UDP(sport=self.udp_port_in, dport=20))
160         pkts.append(p)
161
162         # ICMP
163         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165              ICMP(id=self.icmp_id_in, type='echo-request'))
166         pkts.append(p)
167
168         return pkts
169
170     def compose_ip6(self, ip4, pref, plen):
171         """
172         Compose IPv4-embedded IPv6 addresses
173
174         :param ip4: IPv4 address
175         :param pref: IPv6 prefix
176         :param plen: IPv6 prefix length
177         :returns: IPv4-embedded IPv6 addresses
178         """
179         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
181         if plen == 32:
182             pref_n[4] = ip4_n[0]
183             pref_n[5] = ip4_n[1]
184             pref_n[6] = ip4_n[2]
185             pref_n[7] = ip4_n[3]
186         elif plen == 40:
187             pref_n[5] = ip4_n[0]
188             pref_n[6] = ip4_n[1]
189             pref_n[7] = ip4_n[2]
190             pref_n[9] = ip4_n[3]
191         elif plen == 48:
192             pref_n[6] = ip4_n[0]
193             pref_n[7] = ip4_n[1]
194             pref_n[9] = ip4_n[2]
195             pref_n[10] = ip4_n[3]
196         elif plen == 56:
197             pref_n[7] = ip4_n[0]
198             pref_n[9] = ip4_n[1]
199             pref_n[10] = ip4_n[2]
200             pref_n[11] = ip4_n[3]
201         elif plen == 64:
202             pref_n[9] = ip4_n[0]
203             pref_n[10] = ip4_n[1]
204             pref_n[11] = ip4_n[2]
205             pref_n[12] = ip4_n[3]
206         elif plen == 96:
207             pref_n[12] = ip4_n[0]
208             pref_n[13] = ip4_n[1]
209             pref_n[14] = ip4_n[2]
210             pref_n[15] = ip4_n[3]
211         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
212
213     def extract_ip4(self, ip6, plen):
214         """
215         Extract IPv4 address embedded in IPv6 addresses
216
217         :param ip6: IPv6 address
218         :param plen: IPv6 prefix length
219         :returns: extracted IPv4 address
220         """
221         ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
222         ip4_n = [None] * 4
223         if plen == 32:
224             ip4_n[0] = ip6_n[4]
225             ip4_n[1] = ip6_n[5]
226             ip4_n[2] = ip6_n[6]
227             ip4_n[3] = ip6_n[7]
228         elif plen == 40:
229             ip4_n[0] = ip6_n[5]
230             ip4_n[1] = ip6_n[6]
231             ip4_n[2] = ip6_n[7]
232             ip4_n[3] = ip6_n[9]
233         elif plen == 48:
234             ip4_n[0] = ip6_n[6]
235             ip4_n[1] = ip6_n[7]
236             ip4_n[2] = ip6_n[9]
237             ip4_n[3] = ip6_n[10]
238         elif plen == 56:
239             ip4_n[0] = ip6_n[7]
240             ip4_n[1] = ip6_n[9]
241             ip4_n[2] = ip6_n[10]
242             ip4_n[3] = ip6_n[11]
243         elif plen == 64:
244             ip4_n[0] = ip6_n[9]
245             ip4_n[1] = ip6_n[10]
246             ip4_n[2] = ip6_n[11]
247             ip4_n[3] = ip6_n[12]
248         elif plen == 96:
249             ip4_n[0] = ip6_n[12]
250             ip4_n[1] = ip6_n[13]
251             ip4_n[2] = ip6_n[14]
252             ip4_n[3] = ip6_n[15]
253         return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
254
255     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
256         """
257         Create IPv6 packet stream for inside network
258
259         :param in_if: Inside interface
260         :param out_if: Outside interface
261         :param ttl: Hop Limit of generated packets
262         :param pref: NAT64 prefix
263         :param plen: NAT64 prefix length
264         """
265         pkts = []
266         if pref is None:
267             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
268         else:
269             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
270
271         # TCP
272         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274              TCP(sport=self.tcp_port_in, dport=20))
275         pkts.append(p)
276
277         # UDP
278         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280              UDP(sport=self.udp_port_in, dport=20))
281         pkts.append(p)
282
283         # ICMP
284         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286              ICMPv6EchoRequest(id=self.icmp_id_in))
287         pkts.append(p)
288
289         return pkts
290
291     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292                           use_inside_ports=False):
293         """
294         Create packet stream for outside network
295
296         :param out_if: Outside interface
297         :param dst_ip: Destination IP address (Default use global NAT address)
298         :param ttl: TTL of generated packets
299         :param use_inside_ports: Use inside NAT ports as destination ports
300                instead of outside ports
301         """
302         if dst_ip is None:
303             dst_ip = self.nat_addr
304         if not use_inside_ports:
305             tcp_port = self.tcp_port_out
306             udp_port = self.udp_port_out
307             icmp_id = self.icmp_id_out
308         else:
309             tcp_port = self.tcp_port_in
310             udp_port = self.udp_port_in
311             icmp_id = self.icmp_id_in
312         pkts = []
313         # TCP
314         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316              TCP(dport=tcp_port, sport=20))
317         pkts.append(p)
318
319         # UDP
320         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322              UDP(dport=udp_port, sport=20))
323         pkts.append(p)
324
325         # ICMP
326         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328              ICMP(id=icmp_id, type='echo-reply'))
329         pkts.append(p)
330
331         return pkts
332
333     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
334         """
335         Create packet stream for outside network
336
337         :param out_if: Outside interface
338         :param dst_ip: Destination IP address (Default use global NAT address)
339         :param hl: HL of generated packets
340         """
341         pkts = []
342         # TCP
343         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345              TCP(dport=self.tcp_port_out, sport=20))
346         pkts.append(p)
347
348         # UDP
349         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351              UDP(dport=self.udp_port_out, sport=20))
352         pkts.append(p)
353
354         # ICMP
355         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357              ICMPv6EchoReply(id=self.icmp_id_out))
358         pkts.append(p)
359
360         return pkts
361
362     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363                            packet_num=3, dst_ip=None, is_ip6=False):
364         """
365         Verify captured packets on outside network
366
367         :param capture: Captured packets
368         :param nat_ip: Translated IP address (Default use global NAT address)
369         :param same_port: Sorce port number is not translated (Default False)
370         :param packet_num: Expected number of packets (Default 3)
371         :param dst_ip: Destination IP address (Default do not verify)
372         :param is_ip6: If L3 protocol is IPv6 (Default False)
373         """
374         if is_ip6:
375             IP46 = IPv6
376             ICMP46 = ICMPv6EchoRequest
377         else:
378             IP46 = IP
379             ICMP46 = ICMP
380         if nat_ip is None:
381             nat_ip = self.nat_addr
382         self.assertEqual(packet_num, len(capture))
383         for packet in capture:
384             try:
385                 if not is_ip6:
386                     self.check_ip_checksum(packet)
387                 self.assertEqual(packet[IP46].src, nat_ip)
388                 if dst_ip is not None:
389                     self.assertEqual(packet[IP46].dst, dst_ip)
390                 if packet.haslayer(TCP):
391                     if same_port:
392                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
393                     else:
394                         self.assertNotEqual(
395                             packet[TCP].sport, self.tcp_port_in)
396                     self.tcp_port_out = packet[TCP].sport
397                     self.check_tcp_checksum(packet)
398                 elif packet.haslayer(UDP):
399                     if same_port:
400                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
401                     else:
402                         self.assertNotEqual(
403                             packet[UDP].sport, self.udp_port_in)
404                     self.udp_port_out = packet[UDP].sport
405                 else:
406                     if same_port:
407                         self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
408                     else:
409                         self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410                     self.icmp_id_out = packet[ICMP46].id
411                     if is_ip6:
412                         self.check_icmpv6_checksum(packet)
413                     else:
414                         self.check_icmp_checksum(packet)
415             except:
416                 self.logger.error(ppp("Unexpected or invalid packet "
417                                       "(outside network):", packet))
418                 raise
419
420     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421                                packet_num=3, dst_ip=None):
422         """
423         Verify captured packets on outside network
424
425         :param capture: Captured packets
426         :param nat_ip: Translated IP address
427         :param same_port: Sorce port number is not translated (Default False)
428         :param packet_num: Expected number of packets (Default 3)
429         :param dst_ip: Destination IP address (Default do not verify)
430         """
431         return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
432                                        dst_ip, True)
433
434     def verify_capture_in(self, capture, in_if, packet_num=3):
435         """
436         Verify captured packets on inside network
437
438         :param capture: Captured packets
439         :param in_if: Inside interface
440         :param packet_num: Expected number of packets (Default 3)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.check_ip_checksum(packet)
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 if packet.haslayer(TCP):
448                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449                     self.check_tcp_checksum(packet)
450                 elif packet.haslayer(UDP):
451                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
452                 else:
453                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454                     self.check_icmp_checksum(packet)
455             except:
456                 self.logger.error(ppp("Unexpected or invalid packet "
457                                       "(inside network):", packet))
458                 raise
459
460     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
461         """
462         Verify captured IPv6 packets on inside network
463
464         :param capture: Captured packets
465         :param src_ip: Source IP
466         :param dst_ip: Destination IP address
467         :param packet_num: Expected number of packets (Default 3)
468         """
469         self.assertEqual(packet_num, len(capture))
470         for packet in capture:
471             try:
472                 self.assertEqual(packet[IPv6].src, src_ip)
473                 self.assertEqual(packet[IPv6].dst, dst_ip)
474                 if packet.haslayer(TCP):
475                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476                     self.check_tcp_checksum(packet)
477                 elif packet.haslayer(UDP):
478                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
479                     self.check_udp_checksum(packet)
480                 else:
481                     self.assertEqual(packet[ICMPv6EchoReply].id,
482                                      self.icmp_id_in)
483                     self.check_icmpv6_checksum(packet)
484             except:
485                 self.logger.error(ppp("Unexpected or invalid packet "
486                                       "(inside network):", packet))
487                 raise
488
489     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
490         """
491         Verify captured packet that don't have to be translated
492
493         :param capture: Captured packets
494         :param ingress_if: Ingress interface
495         :param egress_if: Egress interface
496         """
497         for packet in capture:
498             try:
499                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501                 if packet.haslayer(TCP):
502                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503                 elif packet.haslayer(UDP):
504                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
505                 else:
506                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
507             except:
508                 self.logger.error(ppp("Unexpected or invalid packet "
509                                       "(inside network):", packet))
510                 raise
511
512     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513                                             packet_num=3, icmp_type=11):
514         """
515         Verify captured packets with ICMP errors on outside network
516
517         :param capture: Captured packets
518         :param src_ip: Translated IP address or IP address of VPP
519                        (Default use global NAT address)
520         :param packet_num: Expected number of packets (Default 3)
521         :param icmp_type: Type of error ICMP packet
522                           we are expecting (Default 11)
523         """
524         if src_ip is None:
525             src_ip = self.nat_addr
526         self.assertEqual(packet_num, len(capture))
527         for packet in capture:
528             try:
529                 self.assertEqual(packet[IP].src, src_ip)
530                 self.assertTrue(packet.haslayer(ICMP))
531                 icmp = packet[ICMP]
532                 self.assertEqual(icmp.type, icmp_type)
533                 self.assertTrue(icmp.haslayer(IPerror))
534                 inner_ip = icmp[IPerror]
535                 if inner_ip.haslayer(TCPerror):
536                     self.assertEqual(inner_ip[TCPerror].dport,
537                                      self.tcp_port_out)
538                 elif inner_ip.haslayer(UDPerror):
539                     self.assertEqual(inner_ip[UDPerror].dport,
540                                      self.udp_port_out)
541                 else:
542                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
543             except:
544                 self.logger.error(ppp("Unexpected or invalid packet "
545                                       "(outside network):", packet))
546                 raise
547
548     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
549                                            icmp_type=11):
550         """
551         Verify captured packets with ICMP errors on inside network
552
553         :param capture: Captured packets
554         :param in_if: Inside interface
555         :param packet_num: Expected number of packets (Default 3)
556         :param icmp_type: Type of error ICMP packet
557                           we are expecting (Default 11)
558         """
559         self.assertEqual(packet_num, len(capture))
560         for packet in capture:
561             try:
562                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563                 self.assertTrue(packet.haslayer(ICMP))
564                 icmp = packet[ICMP]
565                 self.assertEqual(icmp.type, icmp_type)
566                 self.assertTrue(icmp.haslayer(IPerror))
567                 inner_ip = icmp[IPerror]
568                 if inner_ip.haslayer(TCPerror):
569                     self.assertEqual(inner_ip[TCPerror].sport,
570                                      self.tcp_port_in)
571                 elif inner_ip.haslayer(UDPerror):
572                     self.assertEqual(inner_ip[UDPerror].sport,
573                                      self.udp_port_in)
574                 else:
575                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
576             except:
577                 self.logger.error(ppp("Unexpected or invalid packet "
578                                       "(inside network):", packet))
579                 raise
580
581     def create_stream_frag(self, src_if, dst, sport, dport, data):
582         """
583         Create fragmented packet stream
584
585         :param src_if: Source interface
586         :param dst: Destination IPv4 address
587         :param sport: Source TCP port
588         :param dport: Destination TCP port
589         :param data: Payload data
590         :returns: Fragmets
591         """
592         id = random.randint(0, 65535)
593         p = (IP(src=src_if.remote_ip4, dst=dst) /
594              TCP(sport=sport, dport=dport) /
595              Raw(data))
596         p = p.__class__(str(p))
597         chksum = p['TCP'].chksum
598         pkts = []
599         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601              TCP(sport=sport, dport=dport, chksum=chksum) /
602              Raw(data[0:4]))
603         pkts.append(p)
604         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606                 proto=IP_PROTOS.tcp) /
607              Raw(data[4:20]))
608         pkts.append(p)
609         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
611                 id=id) /
612              Raw(data[20:]))
613         pkts.append(p)
614         return pkts
615
616     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617                                pref=None, plen=0, frag_size=128):
618         """
619         Create fragmented packet stream
620
621         :param src_if: Source interface
622         :param dst: Destination IPv4 address
623         :param sport: Source TCP port
624         :param dport: Destination TCP port
625         :param data: Payload data
626         :param pref: NAT64 prefix
627         :param plen: NAT64 prefix length
628         :param fragsize: size of fragments
629         :returns: Fragmets
630         """
631         if pref is None:
632             dst_ip6 = ''.join(['64:ff9b::', dst])
633         else:
634             dst_ip6 = self.compose_ip6(dst, pref, plen)
635
636         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639              TCP(sport=sport, dport=dport) /
640              Raw(data))
641
642         return fragment6(p, frag_size)
643
644     def reass_frags_and_verify(self, frags, src, dst):
645         """
646         Reassemble and verify fragmented packet
647
648         :param frags: Captured fragments
649         :param src: Source IPv4 address to verify
650         :param dst: Destination IPv4 address to verify
651
652         :returns: Reassembled IPv4 packet
653         """
654         buffer = StringIO.StringIO()
655         for p in frags:
656             self.assertEqual(p[IP].src, src)
657             self.assertEqual(p[IP].dst, dst)
658             self.check_ip_checksum(p)
659             buffer.seek(p[IP].frag * 8)
660             buffer.write(p[IP].payload)
661         ip = frags[0].getlayer(IP)
662         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663                 proto=frags[0][IP].proto)
664         if ip.proto == IP_PROTOS.tcp:
665             p = (ip / TCP(buffer.getvalue()))
666             self.check_tcp_checksum(p)
667         elif ip.proto == IP_PROTOS.udp:
668             p = (ip / UDP(buffer.getvalue()))
669         return p
670
671     def reass_frags_and_verify_ip6(self, frags, src, dst):
672         """
673         Reassemble and verify fragmented packet
674
675         :param frags: Captured fragments
676         :param src: Source IPv6 address to verify
677         :param dst: Destination IPv6 address to verify
678
679         :returns: Reassembled IPv6 packet
680         """
681         buffer = StringIO.StringIO()
682         for p in frags:
683             self.assertEqual(p[IPv6].src, src)
684             self.assertEqual(p[IPv6].dst, dst)
685             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686             buffer.write(p[IPv6ExtHdrFragment].payload)
687         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688                   nh=frags[0][IPv6ExtHdrFragment].nh)
689         if ip.nh == IP_PROTOS.tcp:
690             p = (ip / TCP(buffer.getvalue()))
691             self.check_tcp_checksum(p)
692         elif ip.nh == IP_PROTOS.udp:
693             p = (ip / UDP(buffer.getvalue()))
694         return p
695
696     def verify_ipfix_nat44_ses(self, data):
697         """
698         Verify IPFIX NAT44 session create/delete event
699
700         :param data: Decoded IPFIX data records
701         """
702         nat44_ses_create_num = 0
703         nat44_ses_delete_num = 0
704         self.assertEqual(6, len(data))
705         for record in data:
706             # natEvent
707             self.assertIn(ord(record[230]), [4, 5])
708             if ord(record[230]) == 4:
709                 nat44_ses_create_num += 1
710             else:
711                 nat44_ses_delete_num += 1
712             # sourceIPv4Address
713             self.assertEqual(self.pg0.remote_ip4n, record[8])
714             # postNATSourceIPv4Address
715             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
716                              record[225])
717             # ingressVRFID
718             self.assertEqual(struct.pack("!I", 0), record[234])
719             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720             if IP_PROTOS.icmp == ord(record[4]):
721                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
723                                  record[227])
724             elif IP_PROTOS.tcp == ord(record[4]):
725                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
726                                  record[7])
727                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
728                                  record[227])
729             elif IP_PROTOS.udp == ord(record[4]):
730                 self.assertEqual(struct.pack("!H", self.udp_port_in),
731                                  record[7])
732                 self.assertEqual(struct.pack("!H", self.udp_port_out),
733                                  record[227])
734             else:
735                 self.fail("Invalid protocol")
736         self.assertEqual(3, nat44_ses_create_num)
737         self.assertEqual(3, nat44_ses_delete_num)
738
739     def verify_ipfix_addr_exhausted(self, data):
740         """
741         Verify IPFIX NAT addresses event
742
743         :param data: Decoded IPFIX data records
744         """
745         self.assertEqual(1, len(data))
746         record = data[0]
747         # natEvent
748         self.assertEqual(ord(record[230]), 3)
749         # natPoolID
750         self.assertEqual(struct.pack("!I", 0), record[283])
751
752     def verify_ipfix_max_sessions(self, data, limit):
753         """
754         Verify IPFIX maximum session entries exceeded event
755
756         :param data: Decoded IPFIX data records
757         :param limit: Number of maximum session entries that can be created.
758         """
759         self.assertEqual(1, len(data))
760         record = data[0]
761         # natEvent
762         self.assertEqual(ord(record[230]), 13)
763         # natQuotaExceededEvent
764         self.assertEqual(struct.pack("I", 1), record[466])
765         # maxSessionEntries
766         self.assertEqual(struct.pack("I", limit), record[471])
767
768     def verify_ipfix_max_bibs(self, data, limit):
769         """
770         Verify IPFIX maximum BIB entries exceeded event
771
772         :param data: Decoded IPFIX data records
773         :param limit: Number of maximum BIB entries that can be created.
774         """
775         self.assertEqual(1, len(data))
776         record = data[0]
777         # natEvent
778         self.assertEqual(ord(record[230]), 13)
779         # natQuotaExceededEvent
780         self.assertEqual(struct.pack("I", 2), record[466])
781         # maxBIBEntries
782         self.assertEqual(struct.pack("I", limit), record[472])
783
784     def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
785         """
786         Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
787
788         :param data: Decoded IPFIX data records
789         :param limit: Number of maximum fragments pending reassembly
790         :param src_addr: IPv6 source address
791         """
792         self.assertEqual(1, len(data))
793         record = data[0]
794         # natEvent
795         self.assertEqual(ord(record[230]), 13)
796         # natQuotaExceededEvent
797         self.assertEqual(struct.pack("I", 5), record[466])
798         # maxFragmentsPendingReassembly
799         self.assertEqual(struct.pack("I", limit), record[475])
800         # sourceIPv6Address
801         self.assertEqual(src_addr, record[27])
802
803     def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
804         """
805         Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
806
807         :param data: Decoded IPFIX data records
808         :param limit: Number of maximum fragments pending reassembly
809         :param src_addr: IPv4 source address
810         """
811         self.assertEqual(1, len(data))
812         record = data[0]
813         # natEvent
814         self.assertEqual(ord(record[230]), 13)
815         # natQuotaExceededEvent
816         self.assertEqual(struct.pack("I", 5), record[466])
817         # maxFragmentsPendingReassembly
818         self.assertEqual(struct.pack("I", limit), record[475])
819         # sourceIPv4Address
820         self.assertEqual(src_addr, record[8])
821
822     def verify_ipfix_bib(self, data, is_create, src_addr):
823         """
824         Verify IPFIX NAT64 BIB create and delete events
825
826         :param data: Decoded IPFIX data records
827         :param is_create: Create event if nonzero value otherwise delete event
828         :param src_addr: IPv6 source address
829         """
830         self.assertEqual(1, len(data))
831         record = data[0]
832         # natEvent
833         if is_create:
834             self.assertEqual(ord(record[230]), 10)
835         else:
836             self.assertEqual(ord(record[230]), 11)
837         # sourceIPv6Address
838         self.assertEqual(src_addr, record[27])
839         # postNATSourceIPv4Address
840         self.assertEqual(self.nat_addr_n, record[225])
841         # protocolIdentifier
842         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
843         # ingressVRFID
844         self.assertEqual(struct.pack("!I", 0), record[234])
845         # sourceTransportPort
846         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847         # postNAPTSourceTransportPort
848         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
849
850     def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
851                                dst_port):
852         """
853         Verify IPFIX NAT64 session create and delete events
854
855         :param data: Decoded IPFIX data records
856         :param is_create: Create event if nonzero value otherwise delete event
857         :param src_addr: IPv6 source address
858         :param dst_addr: IPv4 destination address
859         :param dst_port: destination TCP port
860         """
861         self.assertEqual(1, len(data))
862         record = data[0]
863         # natEvent
864         if is_create:
865             self.assertEqual(ord(record[230]), 6)
866         else:
867             self.assertEqual(ord(record[230]), 7)
868         # sourceIPv6Address
869         self.assertEqual(src_addr, record[27])
870         # destinationIPv6Address
871         self.assertEqual(socket.inet_pton(socket.AF_INET6,
872                                           self.compose_ip6(dst_addr,
873                                                            '64:ff9b::',
874                                                            96)),
875                          record[28])
876         # postNATSourceIPv4Address
877         self.assertEqual(self.nat_addr_n, record[225])
878         # postNATDestinationIPv4Address
879         self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
880                          record[226])
881         # protocolIdentifier
882         self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
883         # ingressVRFID
884         self.assertEqual(struct.pack("!I", 0), record[234])
885         # sourceTransportPort
886         self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887         # postNAPTSourceTransportPort
888         self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889         # destinationTransportPort
890         self.assertEqual(struct.pack("!H", dst_port), record[11])
891         # postNAPTDestinationTransportPort
892         self.assertEqual(struct.pack("!H", dst_port), record[228])
893
894
895 class TestNAT44(MethodHolder):
896     """ NAT44 Test Cases """
897
898     @classmethod
899     def setUpClass(cls):
900         super(TestNAT44, cls).setUpClass()
901
902         try:
903             cls.tcp_port_in = 6303
904             cls.tcp_port_out = 6303
905             cls.udp_port_in = 6304
906             cls.udp_port_out = 6304
907             cls.icmp_id_in = 6305
908             cls.icmp_id_out = 6305
909             cls.nat_addr = '10.0.0.3'
910             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911             cls.ipfix_src_port = 4739
912             cls.ipfix_domain_id = 1
913
914             cls.create_pg_interfaces(range(10))
915             cls.interfaces = list(cls.pg_interfaces[0:4])
916
917             for i in cls.interfaces:
918                 i.admin_up()
919                 i.config_ip4()
920                 i.resolve_arp()
921
922             cls.pg0.generate_remote_hosts(3)
923             cls.pg0.configure_ipv4_neighbors()
924
925             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926             cls.vapi.ip_table_add_del(10, is_add=1)
927             cls.vapi.ip_table_add_del(20, is_add=1)
928
929             cls.pg4._local_ip4 = "172.16.255.1"
930             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932             cls.pg4.set_table_ip4(10)
933             cls.pg5._local_ip4 = "172.17.255.3"
934             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936             cls.pg5.set_table_ip4(10)
937             cls.pg6._local_ip4 = "172.16.255.1"
938             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940             cls.pg6.set_table_ip4(20)
941             for i in cls.overlapping_interfaces:
942                 i.config_ip4()
943                 i.admin_up()
944                 i.resolve_arp()
945
946             cls.pg7.admin_up()
947             cls.pg8.admin_up()
948
949             cls.pg9.generate_remote_hosts(2)
950             cls.pg9.config_ip4()
951             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
953                                                   ip_addr_n,
954                                                   24)
955             cls.pg9.admin_up()
956             cls.pg9.resolve_arp()
957             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959             cls.pg9.resolve_arp()
960
961         except Exception:
962             super(TestNAT44, cls).tearDownClass()
963             raise
964
965     def clear_nat44(self):
966         """
967         Clear NAT44 configuration.
968         """
969         # I found no elegant way to do this
970         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971                                    dst_address_length=32,
972                                    next_hop_address=self.pg7.remote_ip4n,
973                                    next_hop_sw_if_index=self.pg7.sw_if_index,
974                                    is_add=0)
975         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976                                    dst_address_length=32,
977                                    next_hop_address=self.pg8.remote_ip4n,
978                                    next_hop_sw_if_index=self.pg8.sw_if_index,
979                                    is_add=0)
980
981         for intf in [self.pg7, self.pg8]:
982             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
983             for n in neighbors:
984                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
985                                               n.mac_address,
986                                               n.ip_address,
987                                               is_add=0)
988
989         if self.pg7.has_ip4_config:
990             self.pg7.unconfig_ip4()
991
992         self.vapi.nat44_forwarding_enable_disable(0)
993
994         interfaces = self.vapi.nat44_interface_addr_dump()
995         for intf in interfaces:
996             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997                                                twice_nat=intf.twice_nat,
998                                                is_add=0)
999
1000         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001                             domain_id=self.ipfix_domain_id)
1002         self.ipfix_src_port = 4739
1003         self.ipfix_domain_id = 1
1004
1005         interfaces = self.vapi.nat44_interface_dump()
1006         for intf in interfaces:
1007             if intf.is_inside > 1:
1008                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1009                                                           0,
1010                                                           is_add=0)
1011             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1012                                                       intf.is_inside,
1013                                                       is_add=0)
1014
1015         interfaces = self.vapi.nat44_interface_output_feature_dump()
1016         for intf in interfaces:
1017             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1018                                                              intf.is_inside,
1019                                                              is_add=0)
1020
1021         static_mappings = self.vapi.nat44_static_mapping_dump()
1022         for sm in static_mappings:
1023             self.vapi.nat44_add_del_static_mapping(
1024                 sm.local_ip_address,
1025                 sm.external_ip_address,
1026                 local_port=sm.local_port,
1027                 external_port=sm.external_port,
1028                 addr_only=sm.addr_only,
1029                 vrf_id=sm.vrf_id,
1030                 protocol=sm.protocol,
1031                 twice_nat=sm.twice_nat,
1032                 out2in_only=sm.out2in_only,
1033                 tag=sm.tag,
1034                 is_add=0)
1035
1036         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037         for lb_sm in lb_static_mappings:
1038             self.vapi.nat44_add_del_lb_static_mapping(
1039                 lb_sm.external_addr,
1040                 lb_sm.external_port,
1041                 lb_sm.protocol,
1042                 vrf_id=lb_sm.vrf_id,
1043                 twice_nat=lb_sm.twice_nat,
1044                 out2in_only=lb_sm.out2in_only,
1045                 tag=lb_sm.tag,
1046                 is_add=0,
1047                 local_num=0,
1048                 locals=[])
1049
1050         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051         for id_m in identity_mappings:
1052             self.vapi.nat44_add_del_identity_mapping(
1053                 addr_only=id_m.addr_only,
1054                 ip=id_m.ip_address,
1055                 port=id_m.port,
1056                 sw_if_index=id_m.sw_if_index,
1057                 vrf_id=id_m.vrf_id,
1058                 protocol=id_m.protocol,
1059                 is_add=0)
1060
1061         adresses = self.vapi.nat44_address_dump()
1062         for addr in adresses:
1063             self.vapi.nat44_add_del_address_range(addr.ip_address,
1064                                                   addr.ip_address,
1065                                                   twice_nat=addr.twice_nat,
1066                                                   is_add=0)
1067
1068         self.vapi.nat_set_reass()
1069         self.vapi.nat_set_reass(is_ip6=1)
1070
1071     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072                                  local_port=0, external_port=0, vrf_id=0,
1073                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
1074                                  proto=0, twice_nat=0, out2in_only=0, tag=""):
1075         """
1076         Add/delete NAT44 static mapping
1077
1078         :param local_ip: Local IP address
1079         :param external_ip: External IP address
1080         :param local_port: Local port number (Optional)
1081         :param external_port: External port number (Optional)
1082         :param vrf_id: VRF ID (Default 0)
1083         :param is_add: 1 if add, 0 if delete (Default add)
1084         :param external_sw_if_index: External interface instead of IP address
1085         :param proto: IP protocol (Mandatory if port specified)
1086         :param twice_nat: 1 if translate external host address and port
1087         :param out2in_only: if 1 rule is matching only out2in direction
1088         :param tag: Opaque string tag
1089         """
1090         addr_only = 1
1091         if local_port and external_port:
1092             addr_only = 0
1093         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095         self.vapi.nat44_add_del_static_mapping(
1096             l_ip,
1097             e_ip,
1098             external_sw_if_index,
1099             local_port,
1100             external_port,
1101             addr_only,
1102             vrf_id,
1103             proto,
1104             twice_nat,
1105             out2in_only,
1106             tag,
1107             is_add)
1108
1109     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1110         """
1111         Add/delete NAT44 address
1112
1113         :param ip: IP address
1114         :param is_add: 1 if add, 0 if delete (Default add)
1115         :param twice_nat: twice NAT address for extenal hosts
1116         """
1117         nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1119                                               vrf_id=vrf_id,
1120                                               twice_nat=twice_nat)
1121
1122     def test_dynamic(self):
1123         """ NAT44 dynamic translation test """
1124
1125         self.nat44_add_address(self.nat_addr)
1126         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1128                                                   is_inside=0)
1129
1130         # in2out
1131         pkts = self.create_stream_in(self.pg0, self.pg1)
1132         self.pg0.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135         capture = self.pg1.get_capture(len(pkts))
1136         self.verify_capture_out(capture)
1137
1138         # out2in
1139         pkts = self.create_stream_out(self.pg1)
1140         self.pg1.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg0.get_capture(len(pkts))
1144         self.verify_capture_in(capture, self.pg0)
1145
1146     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147         """ NAT44 handling of client packets with TTL=1 """
1148
1149         self.nat44_add_address(self.nat_addr)
1150         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                   is_inside=0)
1153
1154         # Client side - generate traffic
1155         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156         self.pg0.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159
1160         # Client side - verify ICMP type 11 packets
1161         capture = self.pg0.get_capture(len(pkts))
1162         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163
1164     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165         """ NAT44 handling of server packets with TTL=1 """
1166
1167         self.nat44_add_address(self.nat_addr)
1168         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170                                                   is_inside=0)
1171
1172         # Client side - create sessions
1173         pkts = self.create_stream_in(self.pg0, self.pg1)
1174         self.pg0.add_stream(pkts)
1175         self.pg_enable_capture(self.pg_interfaces)
1176         self.pg_start()
1177
1178         # Server side - generate traffic
1179         capture = self.pg1.get_capture(len(pkts))
1180         self.verify_capture_out(capture)
1181         pkts = self.create_stream_out(self.pg1, ttl=1)
1182         self.pg1.add_stream(pkts)
1183         self.pg_enable_capture(self.pg_interfaces)
1184         self.pg_start()
1185
1186         # Server side - verify ICMP type 11 packets
1187         capture = self.pg1.get_capture(len(pkts))
1188         self.verify_capture_out_with_icmp_errors(capture,
1189                                                  src_ip=self.pg1.local_ip4)
1190
1191     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192         """ NAT44 handling of error responses to client packets with TTL=2 """
1193
1194         self.nat44_add_address(self.nat_addr)
1195         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1197                                                   is_inside=0)
1198
1199         # Client side - generate traffic
1200         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201         self.pg0.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204
1205         # Server side - simulate ICMP type 11 response
1206         capture = self.pg1.get_capture(len(pkts))
1207         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209                 ICMP(type=11) / packet[IP] for packet in capture]
1210         self.pg1.add_stream(pkts)
1211         self.pg_enable_capture(self.pg_interfaces)
1212         self.pg_start()
1213
1214         # Client side - verify ICMP type 11 packets
1215         capture = self.pg0.get_capture(len(pkts))
1216         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1217
1218     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219         """ NAT44 handling of error responses to server packets with TTL=2 """
1220
1221         self.nat44_add_address(self.nat_addr)
1222         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1224                                                   is_inside=0)
1225
1226         # Client side - create sessions
1227         pkts = self.create_stream_in(self.pg0, self.pg1)
1228         self.pg0.add_stream(pkts)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231
1232         # Server side - generate traffic
1233         capture = self.pg1.get_capture(len(pkts))
1234         self.verify_capture_out(capture)
1235         pkts = self.create_stream_out(self.pg1, ttl=2)
1236         self.pg1.add_stream(pkts)
1237         self.pg_enable_capture(self.pg_interfaces)
1238         self.pg_start()
1239
1240         # Client side - simulate ICMP type 11 response
1241         capture = self.pg0.get_capture(len(pkts))
1242         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244                 ICMP(type=11) / packet[IP] for packet in capture]
1245         self.pg0.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248
1249         # Server side - verify ICMP type 11 packets
1250         capture = self.pg1.get_capture(len(pkts))
1251         self.verify_capture_out_with_icmp_errors(capture)
1252
1253     def test_ping_out_interface_from_outside(self):
1254         """ Ping NAT44 out interface from outside network """
1255
1256         self.nat44_add_address(self.nat_addr)
1257         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1259                                                   is_inside=0)
1260
1261         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263              ICMP(id=self.icmp_id_out, type='echo-request'))
1264         pkts = [p]
1265         self.pg1.add_stream(pkts)
1266         self.pg_enable_capture(self.pg_interfaces)
1267         self.pg_start()
1268         capture = self.pg1.get_capture(len(pkts))
1269         self.assertEqual(1, len(capture))
1270         packet = capture[0]
1271         try:
1272             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1276         except:
1277             self.logger.error(ppp("Unexpected or invalid packet "
1278                                   "(outside network):", packet))
1279             raise
1280
1281     def test_ping_internal_host_from_outside(self):
1282         """ Ping internal host from outside network """
1283
1284         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287                                                   is_inside=0)
1288
1289         # out2in
1290         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292                ICMP(id=self.icmp_id_out, type='echo-request'))
1293         self.pg1.add_stream(pkt)
1294         self.pg_enable_capture(self.pg_interfaces)
1295         self.pg_start()
1296         capture = self.pg0.get_capture(1)
1297         self.verify_capture_in(capture, self.pg0, packet_num=1)
1298         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1299
1300         # in2out
1301         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303                ICMP(id=self.icmp_id_in, type='echo-reply'))
1304         self.pg0.add_stream(pkt)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         capture = self.pg1.get_capture(1)
1308         self.verify_capture_out(capture, same_port=True, packet_num=1)
1309         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1310
1311     def test_forwarding(self):
1312         """ NAT44 forwarding test """
1313
1314         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1316                                                   is_inside=0)
1317         self.vapi.nat44_forwarding_enable_disable(1)
1318
1319         real_ip = self.pg0.remote_ip4n
1320         alias_ip = self.nat_addr_n
1321         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322                                                external_ip=alias_ip)
1323
1324         try:
1325             # in2out - static mapping match
1326
1327             pkts = self.create_stream_out(self.pg1)
1328             self.pg1.add_stream(pkts)
1329             self.pg_enable_capture(self.pg_interfaces)
1330             self.pg_start()
1331             capture = self.pg0.get_capture(len(pkts))
1332             self.verify_capture_in(capture, self.pg0)
1333
1334             pkts = self.create_stream_in(self.pg0, self.pg1)
1335             self.pg0.add_stream(pkts)
1336             self.pg_enable_capture(self.pg_interfaces)
1337             self.pg_start()
1338             capture = self.pg1.get_capture(len(pkts))
1339             self.verify_capture_out(capture, same_port=True)
1340
1341             # in2out - no static mapping match
1342
1343             host0 = self.pg0.remote_hosts[0]
1344             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1345             try:
1346                 pkts = self.create_stream_out(self.pg1,
1347                                               dst_ip=self.pg0.remote_ip4,
1348                                               use_inside_ports=True)
1349                 self.pg1.add_stream(pkts)
1350                 self.pg_enable_capture(self.pg_interfaces)
1351                 self.pg_start()
1352                 capture = self.pg0.get_capture(len(pkts))
1353                 self.verify_capture_in(capture, self.pg0)
1354
1355                 pkts = self.create_stream_in(self.pg0, self.pg1)
1356                 self.pg0.add_stream(pkts)
1357                 self.pg_enable_capture(self.pg_interfaces)
1358                 self.pg_start()
1359                 capture = self.pg1.get_capture(len(pkts))
1360                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1361                                         same_port=True)
1362             finally:
1363                 self.pg0.remote_hosts[0] = host0
1364
1365         finally:
1366             self.vapi.nat44_forwarding_enable_disable(0)
1367             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368                                                    external_ip=alias_ip,
1369                                                    is_add=0)
1370
1371     def test_static_in(self):
1372         """ 1:1 NAT initialized from inside network """
1373
1374         nat_ip = "10.0.0.10"
1375         self.tcp_port_out = 6303
1376         self.udp_port_out = 6304
1377         self.icmp_id_out = 6305
1378
1379         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1382                                                   is_inside=0)
1383         sm = self.vapi.nat44_static_mapping_dump()
1384         self.assertEqual(len(sm), 1)
1385         self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386         self.assertEqual(sm[0].protocol, 0)
1387         self.assertEqual(sm[0].local_port, 0)
1388         self.assertEqual(sm[0].external_port, 0)
1389
1390         # in2out
1391         pkts = self.create_stream_in(self.pg0, self.pg1)
1392         self.pg0.add_stream(pkts)
1393         self.pg_enable_capture(self.pg_interfaces)
1394         self.pg_start()
1395         capture = self.pg1.get_capture(len(pkts))
1396         self.verify_capture_out(capture, nat_ip, True)
1397
1398         # out2in
1399         pkts = self.create_stream_out(self.pg1, nat_ip)
1400         self.pg1.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg0.get_capture(len(pkts))
1404         self.verify_capture_in(capture, self.pg0)
1405
1406     def test_static_out(self):
1407         """ 1:1 NAT initialized from outside network """
1408
1409         nat_ip = "10.0.0.20"
1410         self.tcp_port_out = 6303
1411         self.udp_port_out = 6304
1412         self.icmp_id_out = 6305
1413         tag = "testTAG"
1414
1415         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1418                                                   is_inside=0)
1419         sm = self.vapi.nat44_static_mapping_dump()
1420         self.assertEqual(len(sm), 1)
1421         self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1422
1423         # out2in
1424         pkts = self.create_stream_out(self.pg1, nat_ip)
1425         self.pg1.add_stream(pkts)
1426         self.pg_enable_capture(self.pg_interfaces)
1427         self.pg_start()
1428         capture = self.pg0.get_capture(len(pkts))
1429         self.verify_capture_in(capture, self.pg0)
1430
1431         # in2out
1432         pkts = self.create_stream_in(self.pg0, self.pg1)
1433         self.pg0.add_stream(pkts)
1434         self.pg_enable_capture(self.pg_interfaces)
1435         self.pg_start()
1436         capture = self.pg1.get_capture(len(pkts))
1437         self.verify_capture_out(capture, nat_ip, True)
1438
1439     def test_static_with_port_in(self):
1440         """ 1:1 NAPT initialized from inside network """
1441
1442         self.tcp_port_out = 3606
1443         self.udp_port_out = 3607
1444         self.icmp_id_out = 3608
1445
1446         self.nat44_add_address(self.nat_addr)
1447         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448                                       self.tcp_port_in, self.tcp_port_out,
1449                                       proto=IP_PROTOS.tcp)
1450         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451                                       self.udp_port_in, self.udp_port_out,
1452                                       proto=IP_PROTOS.udp)
1453         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454                                       self.icmp_id_in, self.icmp_id_out,
1455                                       proto=IP_PROTOS.icmp)
1456         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1458                                                   is_inside=0)
1459
1460         # in2out
1461         pkts = self.create_stream_in(self.pg0, self.pg1)
1462         self.pg0.add_stream(pkts)
1463         self.pg_enable_capture(self.pg_interfaces)
1464         self.pg_start()
1465         capture = self.pg1.get_capture(len(pkts))
1466         self.verify_capture_out(capture)
1467
1468         # out2in
1469         pkts = self.create_stream_out(self.pg1)
1470         self.pg1.add_stream(pkts)
1471         self.pg_enable_capture(self.pg_interfaces)
1472         self.pg_start()
1473         capture = self.pg0.get_capture(len(pkts))
1474         self.verify_capture_in(capture, self.pg0)
1475
1476     def test_static_with_port_out(self):
1477         """ 1:1 NAPT initialized from outside network """
1478
1479         self.tcp_port_out = 30606
1480         self.udp_port_out = 30607
1481         self.icmp_id_out = 30608
1482
1483         self.nat44_add_address(self.nat_addr)
1484         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485                                       self.tcp_port_in, self.tcp_port_out,
1486                                       proto=IP_PROTOS.tcp)
1487         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488                                       self.udp_port_in, self.udp_port_out,
1489                                       proto=IP_PROTOS.udp)
1490         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491                                       self.icmp_id_in, self.icmp_id_out,
1492                                       proto=IP_PROTOS.icmp)
1493         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1495                                                   is_inside=0)
1496
1497         # out2in
1498         pkts = self.create_stream_out(self.pg1)
1499         self.pg1.add_stream(pkts)
1500         self.pg_enable_capture(self.pg_interfaces)
1501         self.pg_start()
1502         capture = self.pg0.get_capture(len(pkts))
1503         self.verify_capture_in(capture, self.pg0)
1504
1505         # in2out
1506         pkts = self.create_stream_in(self.pg0, self.pg1)
1507         self.pg0.add_stream(pkts)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         capture = self.pg1.get_capture(len(pkts))
1511         self.verify_capture_out(capture)
1512
1513     def test_static_with_port_out2(self):
1514         """ 1:1 NAPT symmetrical rule """
1515
1516         external_port = 80
1517         local_port = 8080
1518
1519         self.vapi.nat44_forwarding_enable_disable(1)
1520         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521                                       local_port, external_port,
1522                                       proto=IP_PROTOS.tcp, out2in_only=1)
1523         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1525                                                   is_inside=0)
1526
1527         # from client to service
1528         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530              TCP(sport=12345, dport=external_port))
1531         self.pg1.add_stream(p)
1532         self.pg_enable_capture(self.pg_interfaces)
1533         self.pg_start()
1534         capture = self.pg0.get_capture(1)
1535         p = capture[0]
1536         server = None
1537         try:
1538             ip = p[IP]
1539             tcp = p[TCP]
1540             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541             self.assertEqual(tcp.dport, local_port)
1542             self.check_tcp_checksum(p)
1543             self.check_ip_checksum(p)
1544         except:
1545             self.logger.error(ppp("Unexpected or invalid packet:", p))
1546             raise
1547
1548         # ICMP error
1549         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551              ICMP(type=11) / capture[0][IP])
1552         self.pg0.add_stream(p)
1553         self.pg_enable_capture(self.pg_interfaces)
1554         self.pg_start()
1555         capture = self.pg1.get_capture(1)
1556         p = capture[0]
1557         try:
1558             self.assertEqual(p[IP].src, self.nat_addr)
1559             inner = p[IPerror]
1560             self.assertEqual(inner.dst, self.nat_addr)
1561             self.assertEqual(inner[TCPerror].dport, external_port)
1562         except:
1563             self.logger.error(ppp("Unexpected or invalid packet:", p))
1564             raise
1565
1566         # from service back to client
1567         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569              TCP(sport=local_port, dport=12345))
1570         self.pg0.add_stream(p)
1571         self.pg_enable_capture(self.pg_interfaces)
1572         self.pg_start()
1573         capture = self.pg1.get_capture(1)
1574         p = capture[0]
1575         try:
1576             ip = p[IP]
1577             tcp = p[TCP]
1578             self.assertEqual(ip.src, self.nat_addr)
1579             self.assertEqual(tcp.sport, external_port)
1580             self.check_tcp_checksum(p)
1581             self.check_ip_checksum(p)
1582         except:
1583             self.logger.error(ppp("Unexpected or invalid packet:", p))
1584             raise
1585
1586         # ICMP error
1587         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589              ICMP(type=11) / capture[0][IP])
1590         self.pg1.add_stream(p)
1591         self.pg_enable_capture(self.pg_interfaces)
1592         self.pg_start()
1593         capture = self.pg0.get_capture(1)
1594         p = capture[0]
1595         try:
1596             self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1597             inner = p[IPerror]
1598             self.assertEqual(inner.src, self.pg0.remote_ip4)
1599             self.assertEqual(inner[TCPerror].sport, local_port)
1600         except:
1601             self.logger.error(ppp("Unexpected or invalid packet:", p))
1602             raise
1603
1604         # from client to server (no translation)
1605         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607              TCP(sport=12346, dport=local_port))
1608         self.pg1.add_stream(p)
1609         self.pg_enable_capture(self.pg_interfaces)
1610         self.pg_start()
1611         capture = self.pg0.get_capture(1)
1612         p = capture[0]
1613         server = None
1614         try:
1615             ip = p[IP]
1616             tcp = p[TCP]
1617             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618             self.assertEqual(tcp.dport, local_port)
1619             self.check_tcp_checksum(p)
1620             self.check_ip_checksum(p)
1621         except:
1622             self.logger.error(ppp("Unexpected or invalid packet:", p))
1623             raise
1624
1625         # from service back to client (no translation)
1626         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628              TCP(sport=local_port, dport=12346))
1629         self.pg0.add_stream(p)
1630         self.pg_enable_capture(self.pg_interfaces)
1631         self.pg_start()
1632         capture = self.pg1.get_capture(1)
1633         p = capture[0]
1634         try:
1635             ip = p[IP]
1636             tcp = p[TCP]
1637             self.assertEqual(ip.src, self.pg0.remote_ip4)
1638             self.assertEqual(tcp.sport, local_port)
1639             self.check_tcp_checksum(p)
1640             self.check_ip_checksum(p)
1641         except:
1642             self.logger.error(ppp("Unexpected or invalid packet:", p))
1643             raise
1644
1645     def test_static_vrf_aware(self):
1646         """ 1:1 NAT VRF awareness """
1647
1648         nat_ip1 = "10.0.0.30"
1649         nat_ip2 = "10.0.0.40"
1650         self.tcp_port_out = 6303
1651         self.udp_port_out = 6304
1652         self.icmp_id_out = 6305
1653
1654         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1655                                       vrf_id=10)
1656         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1657                                       vrf_id=10)
1658         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1659                                                   is_inside=0)
1660         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1662
1663         # inside interface VRF match NAT44 static mapping VRF
1664         pkts = self.create_stream_in(self.pg4, self.pg3)
1665         self.pg4.add_stream(pkts)
1666         self.pg_enable_capture(self.pg_interfaces)
1667         self.pg_start()
1668         capture = self.pg3.get_capture(len(pkts))
1669         self.verify_capture_out(capture, nat_ip1, True)
1670
1671         # inside interface VRF don't match NAT44 static mapping VRF (packets
1672         # are dropped)
1673         pkts = self.create_stream_in(self.pg0, self.pg3)
1674         self.pg0.add_stream(pkts)
1675         self.pg_enable_capture(self.pg_interfaces)
1676         self.pg_start()
1677         self.pg3.assert_nothing_captured()
1678
1679     def test_dynamic_to_static(self):
1680         """ Switch from dynamic translation to 1:1NAT """
1681         nat_ip = "10.0.0.10"
1682         self.tcp_port_out = 6303
1683         self.udp_port_out = 6304
1684         self.icmp_id_out = 6305
1685
1686         self.nat44_add_address(self.nat_addr)
1687         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1689                                                   is_inside=0)
1690
1691         # dynamic
1692         pkts = self.create_stream_in(self.pg0, self.pg1)
1693         self.pg0.add_stream(pkts)
1694         self.pg_enable_capture(self.pg_interfaces)
1695         self.pg_start()
1696         capture = self.pg1.get_capture(len(pkts))
1697         self.verify_capture_out(capture)
1698
1699         # 1:1NAT
1700         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702         self.assertEqual(len(sessions), 0)
1703         pkts = self.create_stream_in(self.pg0, self.pg1)
1704         self.pg0.add_stream(pkts)
1705         self.pg_enable_capture(self.pg_interfaces)
1706         self.pg_start()
1707         capture = self.pg1.get_capture(len(pkts))
1708         self.verify_capture_out(capture, nat_ip, True)
1709
1710     def test_identity_nat(self):
1711         """ Identity NAT """
1712
1713         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1716                                                   is_inside=0)
1717
1718         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720              TCP(sport=12345, dport=56789))
1721         self.pg1.add_stream(p)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         capture = self.pg0.get_capture(1)
1725         p = capture[0]
1726         try:
1727             ip = p[IP]
1728             tcp = p[TCP]
1729             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730             self.assertEqual(ip.src, self.pg1.remote_ip4)
1731             self.assertEqual(tcp.dport, 56789)
1732             self.assertEqual(tcp.sport, 12345)
1733             self.check_tcp_checksum(p)
1734             self.check_ip_checksum(p)
1735         except:
1736             self.logger.error(ppp("Unexpected or invalid packet:", p))
1737             raise
1738
1739     def test_static_lb(self):
1740         """ NAT44 local service load balancing """
1741         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1742         external_port = 80
1743         local_port = 8080
1744         server1 = self.pg0.remote_hosts[0]
1745         server2 = self.pg0.remote_hosts[1]
1746
1747         locals = [{'addr': server1.ip4n,
1748                    'port': local_port,
1749                    'probability': 70},
1750                   {'addr': server2.ip4n,
1751                    'port': local_port,
1752                    'probability': 30}]
1753
1754         self.nat44_add_address(self.nat_addr)
1755         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1756                                                   external_port,
1757                                                   IP_PROTOS.tcp,
1758                                                   local_num=len(locals),
1759                                                   locals=locals)
1760         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1762                                                   is_inside=0)
1763
1764         # from client to service
1765         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767              TCP(sport=12345, dport=external_port))
1768         self.pg1.add_stream(p)
1769         self.pg_enable_capture(self.pg_interfaces)
1770         self.pg_start()
1771         capture = self.pg0.get_capture(1)
1772         p = capture[0]
1773         server = None
1774         try:
1775             ip = p[IP]
1776             tcp = p[TCP]
1777             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778             if ip.dst == server1.ip4:
1779                 server = server1
1780             else:
1781                 server = server2
1782             self.assertEqual(tcp.dport, local_port)
1783             self.check_tcp_checksum(p)
1784             self.check_ip_checksum(p)
1785         except:
1786             self.logger.error(ppp("Unexpected or invalid packet:", p))
1787             raise
1788
1789         # from service back to client
1790         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792              TCP(sport=local_port, dport=12345))
1793         self.pg0.add_stream(p)
1794         self.pg_enable_capture(self.pg_interfaces)
1795         self.pg_start()
1796         capture = self.pg1.get_capture(1)
1797         p = capture[0]
1798         try:
1799             ip = p[IP]
1800             tcp = p[TCP]
1801             self.assertEqual(ip.src, self.nat_addr)
1802             self.assertEqual(tcp.sport, external_port)
1803             self.check_tcp_checksum(p)
1804             self.check_ip_checksum(p)
1805         except:
1806             self.logger.error(ppp("Unexpected or invalid packet:", p))
1807             raise
1808
1809         # multiple clients
1810         server1_n = 0
1811         server2_n = 0
1812         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1813         pkts = []
1814         for client in clients:
1815             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1816                  IP(src=client, dst=self.nat_addr) /
1817                  TCP(sport=12345, dport=external_port))
1818             pkts.append(p)
1819         self.pg1.add_stream(pkts)
1820         self.pg_enable_capture(self.pg_interfaces)
1821         self.pg_start()
1822         capture = self.pg0.get_capture(len(pkts))
1823         for p in capture:
1824             if p[IP].dst == server1.ip4:
1825                 server1_n += 1
1826             else:
1827                 server2_n += 1
1828         self.assertTrue(server1_n > server2_n)
1829
1830     def test_static_lb_2(self):
1831         """ NAT44 local service load balancing (asymmetrical rule) """
1832         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1833         external_port = 80
1834         local_port = 8080
1835         server1 = self.pg0.remote_hosts[0]
1836         server2 = self.pg0.remote_hosts[1]
1837
1838         locals = [{'addr': server1.ip4n,
1839                    'port': local_port,
1840                    'probability': 70},
1841                   {'addr': server2.ip4n,
1842                    'port': local_port,
1843                    'probability': 30}]
1844
1845         self.vapi.nat44_forwarding_enable_disable(1)
1846         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1847                                                   external_port,
1848                                                   IP_PROTOS.tcp,
1849                                                   out2in_only=1,
1850                                                   local_num=len(locals),
1851                                                   locals=locals)
1852         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1853         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1854                                                   is_inside=0)
1855
1856         # from client to service
1857         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1858              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1859              TCP(sport=12345, dport=external_port))
1860         self.pg1.add_stream(p)
1861         self.pg_enable_capture(self.pg_interfaces)
1862         self.pg_start()
1863         capture = self.pg0.get_capture(1)
1864         p = capture[0]
1865         server = None
1866         try:
1867             ip = p[IP]
1868             tcp = p[TCP]
1869             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1870             if ip.dst == server1.ip4:
1871                 server = server1
1872             else:
1873                 server = server2
1874             self.assertEqual(tcp.dport, local_port)
1875             self.check_tcp_checksum(p)
1876             self.check_ip_checksum(p)
1877         except:
1878             self.logger.error(ppp("Unexpected or invalid packet:", p))
1879             raise
1880
1881         # from service back to client
1882         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1883              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1884              TCP(sport=local_port, dport=12345))
1885         self.pg0.add_stream(p)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg1.get_capture(1)
1889         p = capture[0]
1890         try:
1891             ip = p[IP]
1892             tcp = p[TCP]
1893             self.assertEqual(ip.src, self.nat_addr)
1894             self.assertEqual(tcp.sport, external_port)
1895             self.check_tcp_checksum(p)
1896             self.check_ip_checksum(p)
1897         except:
1898             self.logger.error(ppp("Unexpected or invalid packet:", p))
1899             raise
1900
1901         # from client to server (no translation)
1902         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1903              IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1904              TCP(sport=12346, dport=local_port))
1905         self.pg1.add_stream(p)
1906         self.pg_enable_capture(self.pg_interfaces)
1907         self.pg_start()
1908         capture = self.pg0.get_capture(1)
1909         p = capture[0]
1910         server = None
1911         try:
1912             ip = p[IP]
1913             tcp = p[TCP]
1914             self.assertEqual(ip.dst, server1.ip4)
1915             self.assertEqual(tcp.dport, local_port)
1916             self.check_tcp_checksum(p)
1917             self.check_ip_checksum(p)
1918         except:
1919             self.logger.error(ppp("Unexpected or invalid packet:", p))
1920             raise
1921
1922         # from service back to client (no translation)
1923         p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1924              IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1925              TCP(sport=local_port, dport=12346))
1926         self.pg0.add_stream(p)
1927         self.pg_enable_capture(self.pg_interfaces)
1928         self.pg_start()
1929         capture = self.pg1.get_capture(1)
1930         p = capture[0]
1931         try:
1932             ip = p[IP]
1933             tcp = p[TCP]
1934             self.assertEqual(ip.src, server1.ip4)
1935             self.assertEqual(tcp.sport, local_port)
1936             self.check_tcp_checksum(p)
1937             self.check_ip_checksum(p)
1938         except:
1939             self.logger.error(ppp("Unexpected or invalid packet:", p))
1940             raise
1941
1942     def test_multiple_inside_interfaces(self):
1943         """ NAT44 multiple non-overlapping address space inside interfaces """
1944
1945         self.nat44_add_address(self.nat_addr)
1946         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1947         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1948         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1949                                                   is_inside=0)
1950
1951         # between two NAT44 inside interfaces (no translation)
1952         pkts = self.create_stream_in(self.pg0, self.pg1)
1953         self.pg0.add_stream(pkts)
1954         self.pg_enable_capture(self.pg_interfaces)
1955         self.pg_start()
1956         capture = self.pg1.get_capture(len(pkts))
1957         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1958
1959         # from NAT44 inside to interface without NAT44 feature (no translation)
1960         pkts = self.create_stream_in(self.pg0, self.pg2)
1961         self.pg0.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg2.get_capture(len(pkts))
1965         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1966
1967         # in2out 1st interface
1968         pkts = self.create_stream_in(self.pg0, self.pg3)
1969         self.pg0.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg3.get_capture(len(pkts))
1973         self.verify_capture_out(capture)
1974
1975         # out2in 1st interface
1976         pkts = self.create_stream_out(self.pg3)
1977         self.pg3.add_stream(pkts)
1978         self.pg_enable_capture(self.pg_interfaces)
1979         self.pg_start()
1980         capture = self.pg0.get_capture(len(pkts))
1981         self.verify_capture_in(capture, self.pg0)
1982
1983         # in2out 2nd interface
1984         pkts = self.create_stream_in(self.pg1, self.pg3)
1985         self.pg1.add_stream(pkts)
1986         self.pg_enable_capture(self.pg_interfaces)
1987         self.pg_start()
1988         capture = self.pg3.get_capture(len(pkts))
1989         self.verify_capture_out(capture)
1990
1991         # out2in 2nd interface
1992         pkts = self.create_stream_out(self.pg3)
1993         self.pg3.add_stream(pkts)
1994         self.pg_enable_capture(self.pg_interfaces)
1995         self.pg_start()
1996         capture = self.pg1.get_capture(len(pkts))
1997         self.verify_capture_in(capture, self.pg1)
1998
1999     def test_inside_overlapping_interfaces(self):
2000         """ NAT44 multiple inside interfaces with overlapping address space """
2001
2002         static_nat_ip = "10.0.0.10"
2003         self.nat44_add_address(self.nat_addr)
2004         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2005                                                   is_inside=0)
2006         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2007         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2008         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2009         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2010                                       vrf_id=20)
2011
2012         # between NAT44 inside interfaces with same VRF (no translation)
2013         pkts = self.create_stream_in(self.pg4, self.pg5)
2014         self.pg4.add_stream(pkts)
2015         self.pg_enable_capture(self.pg_interfaces)
2016         self.pg_start()
2017         capture = self.pg5.get_capture(len(pkts))
2018         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2019
2020         # between NAT44 inside interfaces with different VRF (hairpinning)
2021         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2022              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2023              TCP(sport=1234, dport=5678))
2024         self.pg4.add_stream(p)
2025         self.pg_enable_capture(self.pg_interfaces)
2026         self.pg_start()
2027         capture = self.pg6.get_capture(1)
2028         p = capture[0]
2029         try:
2030             ip = p[IP]
2031             tcp = p[TCP]
2032             self.assertEqual(ip.src, self.nat_addr)
2033             self.assertEqual(ip.dst, self.pg6.remote_ip4)
2034             self.assertNotEqual(tcp.sport, 1234)
2035             self.assertEqual(tcp.dport, 5678)
2036         except:
2037             self.logger.error(ppp("Unexpected or invalid packet:", p))
2038             raise
2039
2040         # in2out 1st interface
2041         pkts = self.create_stream_in(self.pg4, self.pg3)
2042         self.pg4.add_stream(pkts)
2043         self.pg_enable_capture(self.pg_interfaces)
2044         self.pg_start()
2045         capture = self.pg3.get_capture(len(pkts))
2046         self.verify_capture_out(capture)
2047
2048         # out2in 1st interface
2049         pkts = self.create_stream_out(self.pg3)
2050         self.pg3.add_stream(pkts)
2051         self.pg_enable_capture(self.pg_interfaces)
2052         self.pg_start()
2053         capture = self.pg4.get_capture(len(pkts))
2054         self.verify_capture_in(capture, self.pg4)
2055
2056         # in2out 2nd interface
2057         pkts = self.create_stream_in(self.pg5, self.pg3)
2058         self.pg5.add_stream(pkts)
2059         self.pg_enable_capture(self.pg_interfaces)
2060         self.pg_start()
2061         capture = self.pg3.get_capture(len(pkts))
2062         self.verify_capture_out(capture)
2063
2064         # out2in 2nd interface
2065         pkts = self.create_stream_out(self.pg3)
2066         self.pg3.add_stream(pkts)
2067         self.pg_enable_capture(self.pg_interfaces)
2068         self.pg_start()
2069         capture = self.pg5.get_capture(len(pkts))
2070         self.verify_capture_in(capture, self.pg5)
2071
2072         # pg5 session dump
2073         addresses = self.vapi.nat44_address_dump()
2074         self.assertEqual(len(addresses), 1)
2075         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2076         self.assertEqual(len(sessions), 3)
2077         for session in sessions:
2078             self.assertFalse(session.is_static)
2079             self.assertEqual(session.inside_ip_address[0:4],
2080                              self.pg5.remote_ip4n)
2081             self.assertEqual(session.outside_ip_address,
2082                              addresses[0].ip_address)
2083         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2084         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2085         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2086         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2087         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2088         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2089         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2090         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2091         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2092
2093         # in2out 3rd interface
2094         pkts = self.create_stream_in(self.pg6, self.pg3)
2095         self.pg6.add_stream(pkts)
2096         self.pg_enable_capture(self.pg_interfaces)
2097         self.pg_start()
2098         capture = self.pg3.get_capture(len(pkts))
2099         self.verify_capture_out(capture, static_nat_ip, True)
2100
2101         # out2in 3rd interface
2102         pkts = self.create_stream_out(self.pg3, static_nat_ip)
2103         self.pg3.add_stream(pkts)
2104         self.pg_enable_capture(self.pg_interfaces)
2105         self.pg_start()
2106         capture = self.pg6.get_capture(len(pkts))
2107         self.verify_capture_in(capture, self.pg6)
2108
2109         # general user and session dump verifications
2110         users = self.vapi.nat44_user_dump()
2111         self.assertTrue(len(users) >= 3)
2112         addresses = self.vapi.nat44_address_dump()
2113         self.assertEqual(len(addresses), 1)
2114         for user in users:
2115             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2116                                                          user.vrf_id)
2117             for session in sessions:
2118                 self.assertEqual(user.ip_address, session.inside_ip_address)
2119                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2120                 self.assertTrue(session.protocol in
2121                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
2122                                  IP_PROTOS.icmp])
2123
2124         # pg4 session dump
2125         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2126         self.assertTrue(len(sessions) >= 4)
2127         for session in sessions:
2128             self.assertFalse(session.is_static)
2129             self.assertEqual(session.inside_ip_address[0:4],
2130                              self.pg4.remote_ip4n)
2131             self.assertEqual(session.outside_ip_address,
2132                              addresses[0].ip_address)
2133
2134         # pg6 session dump
2135         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2136         self.assertTrue(len(sessions) >= 3)
2137         for session in sessions:
2138             self.assertTrue(session.is_static)
2139             self.assertEqual(session.inside_ip_address[0:4],
2140                              self.pg6.remote_ip4n)
2141             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2142                              map(int, static_nat_ip.split('.')))
2143             self.assertTrue(session.inside_port in
2144                             [self.tcp_port_in, self.udp_port_in,
2145                              self.icmp_id_in])
2146
2147     def test_hairpinning(self):
2148         """ NAT44 hairpinning - 1:1 NAPT """
2149
2150         host = self.pg0.remote_hosts[0]
2151         server = self.pg0.remote_hosts[1]
2152         host_in_port = 1234
2153         host_out_port = 0
2154         server_in_port = 5678
2155         server_out_port = 8765
2156
2157         self.nat44_add_address(self.nat_addr)
2158         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2159         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2160                                                   is_inside=0)
2161         # add static mapping for server
2162         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2163                                       server_in_port, server_out_port,
2164                                       proto=IP_PROTOS.tcp)
2165
2166         # send packet from host to server
2167         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2168              IP(src=host.ip4, dst=self.nat_addr) /
2169              TCP(sport=host_in_port, dport=server_out_port))
2170         self.pg0.add_stream(p)
2171         self.pg_enable_capture(self.pg_interfaces)
2172         self.pg_start()
2173         capture = self.pg0.get_capture(1)
2174         p = capture[0]
2175         try:
2176             ip = p[IP]
2177             tcp = p[TCP]
2178             self.assertEqual(ip.src, self.nat_addr)
2179             self.assertEqual(ip.dst, server.ip4)
2180             self.assertNotEqual(tcp.sport, host_in_port)
2181             self.assertEqual(tcp.dport, server_in_port)
2182             self.check_tcp_checksum(p)
2183             host_out_port = tcp.sport
2184         except:
2185             self.logger.error(ppp("Unexpected or invalid packet:", p))
2186             raise
2187
2188         # send reply from server to host
2189         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2190              IP(src=server.ip4, dst=self.nat_addr) /
2191              TCP(sport=server_in_port, dport=host_out_port))
2192         self.pg0.add_stream(p)
2193         self.pg_enable_capture(self.pg_interfaces)
2194         self.pg_start()
2195         capture = self.pg0.get_capture(1)
2196         p = capture[0]
2197         try:
2198             ip = p[IP]
2199             tcp = p[TCP]
2200             self.assertEqual(ip.src, self.nat_addr)
2201             self.assertEqual(ip.dst, host.ip4)
2202             self.assertEqual(tcp.sport, server_out_port)
2203             self.assertEqual(tcp.dport, host_in_port)
2204             self.check_tcp_checksum(p)
2205         except:
2206             self.logger.error(ppp("Unexpected or invalid packet:", p))
2207             raise
2208
2209     def test_hairpinning2(self):
2210         """ NAT44 hairpinning - 1:1 NAT"""
2211
2212         server1_nat_ip = "10.0.0.10"
2213         server2_nat_ip = "10.0.0.11"
2214         host = self.pg0.remote_hosts[0]
2215         server1 = self.pg0.remote_hosts[1]
2216         server2 = self.pg0.remote_hosts[2]
2217         server_tcp_port = 22
2218         server_udp_port = 20
2219
2220         self.nat44_add_address(self.nat_addr)
2221         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2222         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2223                                                   is_inside=0)
2224
2225         # add static mapping for servers
2226         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2227         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2228
2229         # host to server1
2230         pkts = []
2231         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232              IP(src=host.ip4, dst=server1_nat_ip) /
2233              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2234         pkts.append(p)
2235         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236              IP(src=host.ip4, dst=server1_nat_ip) /
2237              UDP(sport=self.udp_port_in, dport=server_udp_port))
2238         pkts.append(p)
2239         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240              IP(src=host.ip4, dst=server1_nat_ip) /
2241              ICMP(id=self.icmp_id_in, type='echo-request'))
2242         pkts.append(p)
2243         self.pg0.add_stream(pkts)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         capture = self.pg0.get_capture(len(pkts))
2247         for packet in capture:
2248             try:
2249                 self.assertEqual(packet[IP].src, self.nat_addr)
2250                 self.assertEqual(packet[IP].dst, server1.ip4)
2251                 if packet.haslayer(TCP):
2252                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2253                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2254                     self.tcp_port_out = packet[TCP].sport
2255                     self.check_tcp_checksum(packet)
2256                 elif packet.haslayer(UDP):
2257                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2258                     self.assertEqual(packet[UDP].dport, server_udp_port)
2259                     self.udp_port_out = packet[UDP].sport
2260                 else:
2261                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2262                     self.icmp_id_out = packet[ICMP].id
2263             except:
2264                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2265                 raise
2266
2267         # server1 to host
2268         pkts = []
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=server1.ip4, dst=self.nat_addr) /
2271              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2272         pkts.append(p)
2273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274              IP(src=server1.ip4, dst=self.nat_addr) /
2275              UDP(sport=server_udp_port, dport=self.udp_port_out))
2276         pkts.append(p)
2277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278              IP(src=server1.ip4, dst=self.nat_addr) /
2279              ICMP(id=self.icmp_id_out, type='echo-reply'))
2280         pkts.append(p)
2281         self.pg0.add_stream(pkts)
2282         self.pg_enable_capture(self.pg_interfaces)
2283         self.pg_start()
2284         capture = self.pg0.get_capture(len(pkts))
2285         for packet in capture:
2286             try:
2287                 self.assertEqual(packet[IP].src, server1_nat_ip)
2288                 self.assertEqual(packet[IP].dst, host.ip4)
2289                 if packet.haslayer(TCP):
2290                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2291                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2292                     self.check_tcp_checksum(packet)
2293                 elif packet.haslayer(UDP):
2294                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2295                     self.assertEqual(packet[UDP].sport, server_udp_port)
2296                 else:
2297                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2298             except:
2299                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2300                 raise
2301
2302         # server2 to server1
2303         pkts = []
2304         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2305              IP(src=server2.ip4, dst=server1_nat_ip) /
2306              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2307         pkts.append(p)
2308         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309              IP(src=server2.ip4, dst=server1_nat_ip) /
2310              UDP(sport=self.udp_port_in, dport=server_udp_port))
2311         pkts.append(p)
2312         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313              IP(src=server2.ip4, dst=server1_nat_ip) /
2314              ICMP(id=self.icmp_id_in, type='echo-request'))
2315         pkts.append(p)
2316         self.pg0.add_stream(pkts)
2317         self.pg_enable_capture(self.pg_interfaces)
2318         self.pg_start()
2319         capture = self.pg0.get_capture(len(pkts))
2320         for packet in capture:
2321             try:
2322                 self.assertEqual(packet[IP].src, server2_nat_ip)
2323                 self.assertEqual(packet[IP].dst, server1.ip4)
2324                 if packet.haslayer(TCP):
2325                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2326                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2327                     self.tcp_port_out = packet[TCP].sport
2328                     self.check_tcp_checksum(packet)
2329                 elif packet.haslayer(UDP):
2330                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2331                     self.assertEqual(packet[UDP].dport, server_udp_port)
2332                     self.udp_port_out = packet[UDP].sport
2333                 else:
2334                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2335                     self.icmp_id_out = packet[ICMP].id
2336             except:
2337                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2338                 raise
2339
2340         # server1 to server2
2341         pkts = []
2342         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2343              IP(src=server1.ip4, dst=server2_nat_ip) /
2344              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2345         pkts.append(p)
2346         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2347              IP(src=server1.ip4, dst=server2_nat_ip) /
2348              UDP(sport=server_udp_port, dport=self.udp_port_out))
2349         pkts.append(p)
2350         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2351              IP(src=server1.ip4, dst=server2_nat_ip) /
2352              ICMP(id=self.icmp_id_out, type='echo-reply'))
2353         pkts.append(p)
2354         self.pg0.add_stream(pkts)
2355         self.pg_enable_capture(self.pg_interfaces)
2356         self.pg_start()
2357         capture = self.pg0.get_capture(len(pkts))
2358         for packet in capture:
2359             try:
2360                 self.assertEqual(packet[IP].src, server1_nat_ip)
2361                 self.assertEqual(packet[IP].dst, server2.ip4)
2362                 if packet.haslayer(TCP):
2363                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2364                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2365                     self.check_tcp_checksum(packet)
2366                 elif packet.haslayer(UDP):
2367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2368                     self.assertEqual(packet[UDP].sport, server_udp_port)
2369                 else:
2370                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2371             except:
2372                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2373                 raise
2374
2375     def test_max_translations_per_user(self):
2376         """ MAX translations per user - recycle the least recently used """
2377
2378         self.nat44_add_address(self.nat_addr)
2379         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2380         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2381                                                   is_inside=0)
2382
2383         # get maximum number of translations per user
2384         nat44_config = self.vapi.nat_show_config()
2385
2386         # send more than maximum number of translations per user packets
2387         pkts_num = nat44_config.max_translations_per_user + 5
2388         pkts = []
2389         for port in range(0, pkts_num):
2390             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392                  TCP(sport=1025 + port))
2393             pkts.append(p)
2394         self.pg0.add_stream(pkts)
2395         self.pg_enable_capture(self.pg_interfaces)
2396         self.pg_start()
2397
2398         # verify number of translated packet
2399         self.pg1.get_capture(pkts_num)
2400
2401     def test_interface_addr(self):
2402         """ Acquire NAT44 addresses from interface """
2403         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2404
2405         # no address in NAT pool
2406         adresses = self.vapi.nat44_address_dump()
2407         self.assertEqual(0, len(adresses))
2408
2409         # configure interface address and check NAT address pool
2410         self.pg7.config_ip4()
2411         adresses = self.vapi.nat44_address_dump()
2412         self.assertEqual(1, len(adresses))
2413         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2414
2415         # remove interface address and check NAT address pool
2416         self.pg7.unconfig_ip4()
2417         adresses = self.vapi.nat44_address_dump()
2418         self.assertEqual(0, len(adresses))
2419
2420     def test_interface_addr_static_mapping(self):
2421         """ Static mapping with addresses from interface """
2422         tag = "testTAG"
2423
2424         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425         self.nat44_add_static_mapping(
2426             '1.2.3.4',
2427             external_sw_if_index=self.pg7.sw_if_index,
2428             tag=tag)
2429
2430         # static mappings with external interface
2431         static_mappings = self.vapi.nat44_static_mapping_dump()
2432         self.assertEqual(1, len(static_mappings))
2433         self.assertEqual(self.pg7.sw_if_index,
2434                          static_mappings[0].external_sw_if_index)
2435         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2436
2437         # configure interface address and check static mappings
2438         self.pg7.config_ip4()
2439         static_mappings = self.vapi.nat44_static_mapping_dump()
2440         self.assertEqual(1, len(static_mappings))
2441         self.assertEqual(static_mappings[0].external_ip_address[0:4],
2442                          self.pg7.local_ip4n)
2443         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2444         self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2445
2446         # remove interface address and check static mappings
2447         self.pg7.unconfig_ip4()
2448         static_mappings = self.vapi.nat44_static_mapping_dump()
2449         self.assertEqual(0, len(static_mappings))
2450
2451     def test_interface_addr_identity_nat(self):
2452         """ Identity NAT with addresses from interface """
2453
2454         port = 53053
2455         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2456         self.vapi.nat44_add_del_identity_mapping(
2457             sw_if_index=self.pg7.sw_if_index,
2458             port=port,
2459             protocol=IP_PROTOS.tcp,
2460             addr_only=0)
2461
2462         # identity mappings with external interface
2463         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2464         self.assertEqual(1, len(identity_mappings))
2465         self.assertEqual(self.pg7.sw_if_index,
2466                          identity_mappings[0].sw_if_index)
2467
2468         # configure interface address and check identity mappings
2469         self.pg7.config_ip4()
2470         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2471         self.assertEqual(1, len(identity_mappings))
2472         self.assertEqual(identity_mappings[0].ip_address,
2473                          self.pg7.local_ip4n)
2474         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2475         self.assertEqual(port, identity_mappings[0].port)
2476         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2477
2478         # remove interface address and check identity mappings
2479         self.pg7.unconfig_ip4()
2480         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2481         self.assertEqual(0, len(identity_mappings))
2482
2483     def test_ipfix_nat44_sess(self):
2484         """ IPFIX logging NAT44 session created/delted """
2485         self.ipfix_domain_id = 10
2486         self.ipfix_src_port = 20202
2487         colector_port = 30303
2488         bind_layers(UDP, IPFIX, dport=30303)
2489         self.nat44_add_address(self.nat_addr)
2490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2492                                                   is_inside=0)
2493         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2494                                      src_address=self.pg3.local_ip4n,
2495                                      path_mtu=512,
2496                                      template_interval=10,
2497                                      collector_port=colector_port)
2498         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2499                             src_port=self.ipfix_src_port)
2500
2501         pkts = self.create_stream_in(self.pg0, self.pg1)
2502         self.pg0.add_stream(pkts)
2503         self.pg_enable_capture(self.pg_interfaces)
2504         self.pg_start()
2505         capture = self.pg1.get_capture(len(pkts))
2506         self.verify_capture_out(capture)
2507         self.nat44_add_address(self.nat_addr, is_add=0)
2508         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2509         capture = self.pg3.get_capture(9)
2510         ipfix = IPFIXDecoder()
2511         # first load template
2512         for p in capture:
2513             self.assertTrue(p.haslayer(IPFIX))
2514             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2515             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2516             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2517             self.assertEqual(p[UDP].dport, colector_port)
2518             self.assertEqual(p[IPFIX].observationDomainID,
2519                              self.ipfix_domain_id)
2520             if p.haslayer(Template):
2521                 ipfix.add_template(p.getlayer(Template))
2522         # verify events in data set
2523         for p in capture:
2524             if p.haslayer(Data):
2525                 data = ipfix.decode_data_set(p.getlayer(Set))
2526                 self.verify_ipfix_nat44_ses(data)
2527
2528     def test_ipfix_addr_exhausted(self):
2529         """ IPFIX logging NAT addresses exhausted """
2530         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2531         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2532                                                   is_inside=0)
2533         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2534                                      src_address=self.pg3.local_ip4n,
2535                                      path_mtu=512,
2536                                      template_interval=10)
2537         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2538                             src_port=self.ipfix_src_port)
2539
2540         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2541              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2542              TCP(sport=3025))
2543         self.pg0.add_stream(p)
2544         self.pg_enable_capture(self.pg_interfaces)
2545         self.pg_start()
2546         capture = self.pg1.get_capture(0)
2547         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2548         capture = self.pg3.get_capture(9)
2549         ipfix = IPFIXDecoder()
2550         # first load template
2551         for p in capture:
2552             self.assertTrue(p.haslayer(IPFIX))
2553             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2554             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2555             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2556             self.assertEqual(p[UDP].dport, 4739)
2557             self.assertEqual(p[IPFIX].observationDomainID,
2558                              self.ipfix_domain_id)
2559             if p.haslayer(Template):
2560                 ipfix.add_template(p.getlayer(Template))
2561         # verify events in data set
2562         for p in capture:
2563             if p.haslayer(Data):
2564                 data = ipfix.decode_data_set(p.getlayer(Set))
2565                 self.verify_ipfix_addr_exhausted(data)
2566
2567     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2568     def test_ipfix_max_sessions(self):
2569         """ IPFIX logging maximum session entries exceeded """
2570         self.nat44_add_address(self.nat_addr)
2571         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2572         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2573                                                   is_inside=0)
2574
2575         nat44_config = self.vapi.nat_show_config()
2576         max_sessions = 10 * nat44_config.translation_buckets
2577
2578         pkts = []
2579         for i in range(0, max_sessions):
2580             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2581             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2582                  IP(src=src, dst=self.pg1.remote_ip4) /
2583                  TCP(sport=1025))
2584             pkts.append(p)
2585         self.pg0.add_stream(pkts)
2586         self.pg_enable_capture(self.pg_interfaces)
2587         self.pg_start()
2588
2589         self.pg1.get_capture(max_sessions)
2590         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2591                                      src_address=self.pg3.local_ip4n,
2592                                      path_mtu=512,
2593                                      template_interval=10)
2594         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2595                             src_port=self.ipfix_src_port)
2596
2597         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2598              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2599              TCP(sport=1025))
2600         self.pg0.add_stream(p)
2601         self.pg_enable_capture(self.pg_interfaces)
2602         self.pg_start()
2603         self.pg1.get_capture(0)
2604         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2605         capture = self.pg3.get_capture(9)
2606         ipfix = IPFIXDecoder()
2607         # first load template
2608         for p in capture:
2609             self.assertTrue(p.haslayer(IPFIX))
2610             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2611             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2612             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2613             self.assertEqual(p[UDP].dport, 4739)
2614             self.assertEqual(p[IPFIX].observationDomainID,
2615                              self.ipfix_domain_id)
2616             if p.haslayer(Template):
2617                 ipfix.add_template(p.getlayer(Template))
2618         # verify events in data set
2619         for p in capture:
2620             if p.haslayer(Data):
2621                 data = ipfix.decode_data_set(p.getlayer(Set))
2622                 self.verify_ipfix_max_sessions(data, max_sessions)
2623
2624     def test_pool_addr_fib(self):
2625         """ NAT44 add pool addresses to FIB """
2626         static_addr = '10.0.0.10'
2627         self.nat44_add_address(self.nat_addr)
2628         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2629         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2630                                                   is_inside=0)
2631         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2632
2633         # NAT44 address
2634         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2635              ARP(op=ARP.who_has, pdst=self.nat_addr,
2636                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2637         self.pg1.add_stream(p)
2638         self.pg_enable_capture(self.pg_interfaces)
2639         self.pg_start()
2640         capture = self.pg1.get_capture(1)
2641         self.assertTrue(capture[0].haslayer(ARP))
2642         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2643
2644         # 1:1 NAT address
2645         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2646              ARP(op=ARP.who_has, pdst=static_addr,
2647                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2648         self.pg1.add_stream(p)
2649         self.pg_enable_capture(self.pg_interfaces)
2650         self.pg_start()
2651         capture = self.pg1.get_capture(1)
2652         self.assertTrue(capture[0].haslayer(ARP))
2653         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2654
2655         # send ARP to non-NAT44 interface
2656         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2657              ARP(op=ARP.who_has, pdst=self.nat_addr,
2658                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2659         self.pg2.add_stream(p)
2660         self.pg_enable_capture(self.pg_interfaces)
2661         self.pg_start()
2662         capture = self.pg1.get_capture(0)
2663
2664         # remove addresses and verify
2665         self.nat44_add_address(self.nat_addr, is_add=0)
2666         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2667                                       is_add=0)
2668
2669         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2670              ARP(op=ARP.who_has, pdst=self.nat_addr,
2671                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2672         self.pg1.add_stream(p)
2673         self.pg_enable_capture(self.pg_interfaces)
2674         self.pg_start()
2675         capture = self.pg1.get_capture(0)
2676
2677         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2678              ARP(op=ARP.who_has, pdst=static_addr,
2679                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2680         self.pg1.add_stream(p)
2681         self.pg_enable_capture(self.pg_interfaces)
2682         self.pg_start()
2683         capture = self.pg1.get_capture(0)
2684
2685     def test_vrf_mode(self):
2686         """ NAT44 tenant VRF aware address pool mode """
2687
2688         vrf_id1 = 1
2689         vrf_id2 = 2
2690         nat_ip1 = "10.0.0.10"
2691         nat_ip2 = "10.0.0.11"
2692
2693         self.pg0.unconfig_ip4()
2694         self.pg1.unconfig_ip4()
2695         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2696         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2697         self.pg0.set_table_ip4(vrf_id1)
2698         self.pg1.set_table_ip4(vrf_id2)
2699         self.pg0.config_ip4()
2700         self.pg1.config_ip4()
2701
2702         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2703         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2704         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2705         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2706         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2707                                                   is_inside=0)
2708
2709         # first VRF
2710         pkts = self.create_stream_in(self.pg0, self.pg2)
2711         self.pg0.add_stream(pkts)
2712         self.pg_enable_capture(self.pg_interfaces)
2713         self.pg_start()
2714         capture = self.pg2.get_capture(len(pkts))
2715         self.verify_capture_out(capture, nat_ip1)
2716
2717         # second VRF
2718         pkts = self.create_stream_in(self.pg1, self.pg2)
2719         self.pg1.add_stream(pkts)
2720         self.pg_enable_capture(self.pg_interfaces)
2721         self.pg_start()
2722         capture = self.pg2.get_capture(len(pkts))
2723         self.verify_capture_out(capture, nat_ip2)
2724
2725         self.pg0.unconfig_ip4()
2726         self.pg1.unconfig_ip4()
2727         self.pg0.set_table_ip4(0)
2728         self.pg1.set_table_ip4(0)
2729         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2730         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2731
2732     def test_vrf_feature_independent(self):
2733         """ NAT44 tenant VRF independent address pool mode """
2734
2735         nat_ip1 = "10.0.0.10"
2736         nat_ip2 = "10.0.0.11"
2737
2738         self.nat44_add_address(nat_ip1)
2739         self.nat44_add_address(nat_ip2, vrf_id=99)
2740         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2741         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2742         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2743                                                   is_inside=0)
2744
2745         # first VRF
2746         pkts = self.create_stream_in(self.pg0, self.pg2)
2747         self.pg0.add_stream(pkts)
2748         self.pg_enable_capture(self.pg_interfaces)
2749         self.pg_start()
2750         capture = self.pg2.get_capture(len(pkts))
2751         self.verify_capture_out(capture, nat_ip1)
2752
2753         # second VRF
2754         pkts = self.create_stream_in(self.pg1, self.pg2)
2755         self.pg1.add_stream(pkts)
2756         self.pg_enable_capture(self.pg_interfaces)
2757         self.pg_start()
2758         capture = self.pg2.get_capture(len(pkts))
2759         self.verify_capture_out(capture, nat_ip1)
2760
2761     def test_dynamic_ipless_interfaces(self):
2762         """ NAT44 interfaces without configured IP address """
2763
2764         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2765                                       mactobinary(self.pg7.remote_mac),
2766                                       self.pg7.remote_ip4n,
2767                                       is_static=1)
2768         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2769                                       mactobinary(self.pg8.remote_mac),
2770                                       self.pg8.remote_ip4n,
2771                                       is_static=1)
2772
2773         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2774                                    dst_address_length=32,
2775                                    next_hop_address=self.pg7.remote_ip4n,
2776                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2777         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2778                                    dst_address_length=32,
2779                                    next_hop_address=self.pg8.remote_ip4n,
2780                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2781
2782         self.nat44_add_address(self.nat_addr)
2783         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2784         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2785                                                   is_inside=0)
2786
2787         # in2out
2788         pkts = self.create_stream_in(self.pg7, self.pg8)
2789         self.pg7.add_stream(pkts)
2790         self.pg_enable_capture(self.pg_interfaces)
2791         self.pg_start()
2792         capture = self.pg8.get_capture(len(pkts))
2793         self.verify_capture_out(capture)
2794
2795         # out2in
2796         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2797         self.pg8.add_stream(pkts)
2798         self.pg_enable_capture(self.pg_interfaces)
2799         self.pg_start()
2800         capture = self.pg7.get_capture(len(pkts))
2801         self.verify_capture_in(capture, self.pg7)
2802
2803     def test_static_ipless_interfaces(self):
2804         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2805
2806         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2807                                       mactobinary(self.pg7.remote_mac),
2808                                       self.pg7.remote_ip4n,
2809                                       is_static=1)
2810         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2811                                       mactobinary(self.pg8.remote_mac),
2812                                       self.pg8.remote_ip4n,
2813                                       is_static=1)
2814
2815         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2816                                    dst_address_length=32,
2817                                    next_hop_address=self.pg7.remote_ip4n,
2818                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2819         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2820                                    dst_address_length=32,
2821                                    next_hop_address=self.pg8.remote_ip4n,
2822                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2823
2824         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2825         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2826         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2827                                                   is_inside=0)
2828
2829         # out2in
2830         pkts = self.create_stream_out(self.pg8)
2831         self.pg8.add_stream(pkts)
2832         self.pg_enable_capture(self.pg_interfaces)
2833         self.pg_start()
2834         capture = self.pg7.get_capture(len(pkts))
2835         self.verify_capture_in(capture, self.pg7)
2836
2837         # in2out
2838         pkts = self.create_stream_in(self.pg7, self.pg8)
2839         self.pg7.add_stream(pkts)
2840         self.pg_enable_capture(self.pg_interfaces)
2841         self.pg_start()
2842         capture = self.pg8.get_capture(len(pkts))
2843         self.verify_capture_out(capture, self.nat_addr, True)
2844
2845     def test_static_with_port_ipless_interfaces(self):
2846         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2847
2848         self.tcp_port_out = 30606
2849         self.udp_port_out = 30607
2850         self.icmp_id_out = 30608
2851
2852         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2853                                       mactobinary(self.pg7.remote_mac),
2854                                       self.pg7.remote_ip4n,
2855                                       is_static=1)
2856         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2857                                       mactobinary(self.pg8.remote_mac),
2858                                       self.pg8.remote_ip4n,
2859                                       is_static=1)
2860
2861         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2862                                    dst_address_length=32,
2863                                    next_hop_address=self.pg7.remote_ip4n,
2864                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2865         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2866                                    dst_address_length=32,
2867                                    next_hop_address=self.pg8.remote_ip4n,
2868                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2869
2870         self.nat44_add_address(self.nat_addr)
2871         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2872                                       self.tcp_port_in, self.tcp_port_out,
2873                                       proto=IP_PROTOS.tcp)
2874         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2875                                       self.udp_port_in, self.udp_port_out,
2876                                       proto=IP_PROTOS.udp)
2877         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2878                                       self.icmp_id_in, self.icmp_id_out,
2879                                       proto=IP_PROTOS.icmp)
2880         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2881         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2882                                                   is_inside=0)
2883
2884         # out2in
2885         pkts = self.create_stream_out(self.pg8)
2886         self.pg8.add_stream(pkts)
2887         self.pg_enable_capture(self.pg_interfaces)
2888         self.pg_start()
2889         capture = self.pg7.get_capture(len(pkts))
2890         self.verify_capture_in(capture, self.pg7)
2891
2892         # in2out
2893         pkts = self.create_stream_in(self.pg7, self.pg8)
2894         self.pg7.add_stream(pkts)
2895         self.pg_enable_capture(self.pg_interfaces)
2896         self.pg_start()
2897         capture = self.pg8.get_capture(len(pkts))
2898         self.verify_capture_out(capture)
2899
2900     def test_static_unknown_proto(self):
2901         """ 1:1 NAT translate packet with unknown protocol """
2902         nat_ip = "10.0.0.10"
2903         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2904         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2905         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2906                                                   is_inside=0)
2907
2908         # in2out
2909         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2911              GRE() /
2912              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2913              TCP(sport=1234, dport=1234))
2914         self.pg0.add_stream(p)
2915         self.pg_enable_capture(self.pg_interfaces)
2916         self.pg_start()
2917         p = self.pg1.get_capture(1)
2918         packet = p[0]
2919         try:
2920             self.assertEqual(packet[IP].src, nat_ip)
2921             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2922             self.assertTrue(packet.haslayer(GRE))
2923             self.check_ip_checksum(packet)
2924         except:
2925             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2926             raise
2927
2928         # out2in
2929         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2930              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2931              GRE() /
2932              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2933              TCP(sport=1234, dport=1234))
2934         self.pg1.add_stream(p)
2935         self.pg_enable_capture(self.pg_interfaces)
2936         self.pg_start()
2937         p = self.pg0.get_capture(1)
2938         packet = p[0]
2939         try:
2940             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2941             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2942             self.assertTrue(packet.haslayer(GRE))
2943             self.check_ip_checksum(packet)
2944         except:
2945             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2946             raise
2947
2948     def test_hairpinning_static_unknown_proto(self):
2949         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2950
2951         host = self.pg0.remote_hosts[0]
2952         server = self.pg0.remote_hosts[1]
2953
2954         host_nat_ip = "10.0.0.10"
2955         server_nat_ip = "10.0.0.11"
2956
2957         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2958         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2959         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2961                                                   is_inside=0)
2962
2963         # host to server
2964         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2965              IP(src=host.ip4, dst=server_nat_ip) /
2966              GRE() /
2967              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2968              TCP(sport=1234, dport=1234))
2969         self.pg0.add_stream(p)
2970         self.pg_enable_capture(self.pg_interfaces)
2971         self.pg_start()
2972         p = self.pg0.get_capture(1)
2973         packet = p[0]
2974         try:
2975             self.assertEqual(packet[IP].src, host_nat_ip)
2976             self.assertEqual(packet[IP].dst, server.ip4)
2977             self.assertTrue(packet.haslayer(GRE))
2978             self.check_ip_checksum(packet)
2979         except:
2980             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2981             raise
2982
2983         # server to host
2984         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2985              IP(src=server.ip4, dst=host_nat_ip) /
2986              GRE() /
2987              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2988              TCP(sport=1234, dport=1234))
2989         self.pg0.add_stream(p)
2990         self.pg_enable_capture(self.pg_interfaces)
2991         self.pg_start()
2992         p = self.pg0.get_capture(1)
2993         packet = p[0]
2994         try:
2995             self.assertEqual(packet[IP].src, server_nat_ip)
2996             self.assertEqual(packet[IP].dst, host.ip4)
2997             self.assertTrue(packet.haslayer(GRE))
2998             self.check_ip_checksum(packet)
2999         except:
3000             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3001             raise
3002
3003     def test_unknown_proto(self):
3004         """ NAT44 translate packet with unknown protocol """
3005         self.nat44_add_address(self.nat_addr)
3006         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3007         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3008                                                   is_inside=0)
3009
3010         # in2out
3011         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3012              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3013              TCP(sport=self.tcp_port_in, dport=20))
3014         self.pg0.add_stream(p)
3015         self.pg_enable_capture(self.pg_interfaces)
3016         self.pg_start()
3017         p = self.pg1.get_capture(1)
3018
3019         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3021              GRE() /
3022              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3023              TCP(sport=1234, dport=1234))
3024         self.pg0.add_stream(p)
3025         self.pg_enable_capture(self.pg_interfaces)
3026         self.pg_start()
3027         p = self.pg1.get_capture(1)
3028         packet = p[0]
3029         try:
3030             self.assertEqual(packet[IP].src, self.nat_addr)
3031             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3032             self.assertTrue(packet.haslayer(GRE))
3033             self.check_ip_checksum(packet)
3034         except:
3035             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3036             raise
3037
3038         # out2in
3039         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3040              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3041              GRE() /
3042              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3043              TCP(sport=1234, dport=1234))
3044         self.pg1.add_stream(p)
3045         self.pg_enable_capture(self.pg_interfaces)
3046         self.pg_start()
3047         p = self.pg0.get_capture(1)
3048         packet = p[0]
3049         try:
3050             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3051             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3052             self.assertTrue(packet.haslayer(GRE))
3053             self.check_ip_checksum(packet)
3054         except:
3055             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3056             raise
3057
3058     def test_hairpinning_unknown_proto(self):
3059         """ NAT44 translate packet with unknown protocol - hairpinning """
3060         host = self.pg0.remote_hosts[0]
3061         server = self.pg0.remote_hosts[1]
3062         host_in_port = 1234
3063         host_out_port = 0
3064         server_in_port = 5678
3065         server_out_port = 8765
3066         server_nat_ip = "10.0.0.11"
3067
3068         self.nat44_add_address(self.nat_addr)
3069         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3070         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3071                                                   is_inside=0)
3072
3073         # add static mapping for server
3074         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3075
3076         # host to server
3077         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3078              IP(src=host.ip4, dst=server_nat_ip) /
3079              TCP(sport=host_in_port, dport=server_out_port))
3080         self.pg0.add_stream(p)
3081         self.pg_enable_capture(self.pg_interfaces)
3082         self.pg_start()
3083         capture = self.pg0.get_capture(1)
3084
3085         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3086              IP(src=host.ip4, dst=server_nat_ip) /
3087              GRE() /
3088              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3089              TCP(sport=1234, dport=1234))
3090         self.pg0.add_stream(p)
3091         self.pg_enable_capture(self.pg_interfaces)
3092         self.pg_start()
3093         p = self.pg0.get_capture(1)
3094         packet = p[0]
3095         try:
3096             self.assertEqual(packet[IP].src, self.nat_addr)
3097             self.assertEqual(packet[IP].dst, server.ip4)
3098             self.assertTrue(packet.haslayer(GRE))
3099             self.check_ip_checksum(packet)
3100         except:
3101             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3102             raise
3103
3104         # server to host
3105         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3106              IP(src=server.ip4, dst=self.nat_addr) /
3107              GRE() /
3108              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3109              TCP(sport=1234, dport=1234))
3110         self.pg0.add_stream(p)
3111         self.pg_enable_capture(self.pg_interfaces)
3112         self.pg_start()
3113         p = self.pg0.get_capture(1)
3114         packet = p[0]
3115         try:
3116             self.assertEqual(packet[IP].src, server_nat_ip)
3117             self.assertEqual(packet[IP].dst, host.ip4)
3118             self.assertTrue(packet.haslayer(GRE))
3119             self.check_ip_checksum(packet)
3120         except:
3121             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3122             raise
3123
3124     def test_output_feature(self):
3125         """ NAT44 interface output feature (in2out postrouting) """
3126         self.nat44_add_address(self.nat_addr)
3127         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3128         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3129         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3130                                                          is_inside=0)
3131
3132         # in2out
3133         pkts = self.create_stream_in(self.pg0, self.pg3)
3134         self.pg0.add_stream(pkts)
3135         self.pg_enable_capture(self.pg_interfaces)
3136         self.pg_start()
3137         capture = self.pg3.get_capture(len(pkts))
3138         self.verify_capture_out(capture)
3139
3140         # out2in
3141         pkts = self.create_stream_out(self.pg3)
3142         self.pg3.add_stream(pkts)
3143         self.pg_enable_capture(self.pg_interfaces)
3144         self.pg_start()
3145         capture = self.pg0.get_capture(len(pkts))
3146         self.verify_capture_in(capture, self.pg0)
3147
3148         # from non-NAT interface to NAT inside interface
3149         pkts = self.create_stream_in(self.pg2, self.pg0)
3150         self.pg2.add_stream(pkts)
3151         self.pg_enable_capture(self.pg_interfaces)
3152         self.pg_start()
3153         capture = self.pg0.get_capture(len(pkts))
3154         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3155
3156     def test_output_feature_vrf_aware(self):
3157         """ NAT44 interface output feature VRF aware (in2out postrouting) """
3158         nat_ip_vrf10 = "10.0.0.10"
3159         nat_ip_vrf20 = "10.0.0.20"
3160
3161         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3162                                    dst_address_length=32,
3163                                    next_hop_address=self.pg3.remote_ip4n,
3164                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3165                                    table_id=10)
3166         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3167                                    dst_address_length=32,
3168                                    next_hop_address=self.pg3.remote_ip4n,
3169                                    next_hop_sw_if_index=self.pg3.sw_if_index,
3170                                    table_id=20)
3171
3172         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3173         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3174         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3175         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3176         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3177                                                          is_inside=0)
3178
3179         # in2out VRF 10
3180         pkts = self.create_stream_in(self.pg4, self.pg3)
3181         self.pg4.add_stream(pkts)
3182         self.pg_enable_capture(self.pg_interfaces)
3183         self.pg_start()
3184         capture = self.pg3.get_capture(len(pkts))
3185         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3186
3187         # out2in VRF 10
3188         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3189         self.pg3.add_stream(pkts)
3190         self.pg_enable_capture(self.pg_interfaces)
3191         self.pg_start()
3192         capture = self.pg4.get_capture(len(pkts))
3193         self.verify_capture_in(capture, self.pg4)
3194
3195         # in2out VRF 20
3196         pkts = self.create_stream_in(self.pg6, self.pg3)
3197         self.pg6.add_stream(pkts)
3198         self.pg_enable_capture(self.pg_interfaces)
3199         self.pg_start()
3200         capture = self.pg3.get_capture(len(pkts))
3201         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3202
3203         # out2in VRF 20
3204         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3205         self.pg3.add_stream(pkts)
3206         self.pg_enable_capture(self.pg_interfaces)
3207         self.pg_start()
3208         capture = self.pg6.get_capture(len(pkts))
3209         self.verify_capture_in(capture, self.pg6)
3210
3211     def test_output_feature_hairpinning(self):
3212         """ NAT44 interface output feature hairpinning (in2out postrouting) """
3213         host = self.pg0.remote_hosts[0]
3214         server = self.pg0.remote_hosts[1]
3215         host_in_port = 1234
3216         host_out_port = 0
3217         server_in_port = 5678
3218         server_out_port = 8765
3219
3220         self.nat44_add_address(self.nat_addr)
3221         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3222         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3223                                                          is_inside=0)
3224
3225         # add static mapping for server
3226         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3227                                       server_in_port, server_out_port,
3228                                       proto=IP_PROTOS.tcp)
3229
3230         # send packet from host to server
3231         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3232              IP(src=host.ip4, dst=self.nat_addr) /
3233              TCP(sport=host_in_port, dport=server_out_port))
3234         self.pg0.add_stream(p)
3235         self.pg_enable_capture(self.pg_interfaces)
3236         self.pg_start()
3237         capture = self.pg0.get_capture(1)
3238         p = capture[0]
3239         try:
3240             ip = p[IP]
3241             tcp = p[TCP]
3242             self.assertEqual(ip.src, self.nat_addr)
3243             self.assertEqual(ip.dst, server.ip4)
3244             self.assertNotEqual(tcp.sport, host_in_port)
3245             self.assertEqual(tcp.dport, server_in_port)
3246             self.check_tcp_checksum(p)
3247             host_out_port = tcp.sport
3248         except:
3249             self.logger.error(ppp("Unexpected or invalid packet:", p))
3250             raise
3251
3252         # send reply from server to host
3253         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3254              IP(src=server.ip4, dst=self.nat_addr) /
3255              TCP(sport=server_in_port, dport=host_out_port))
3256         self.pg0.add_stream(p)
3257         self.pg_enable_capture(self.pg_interfaces)
3258         self.pg_start()
3259         capture = self.pg0.get_capture(1)
3260         p = capture[0]
3261         try:
3262             ip = p[IP]
3263             tcp = p[TCP]
3264             self.assertEqual(ip.src, self.nat_addr)
3265             self.assertEqual(ip.dst, host.ip4)
3266             self.assertEqual(tcp.sport, server_out_port)
3267             self.assertEqual(tcp.dport, host_in_port)
3268             self.check_tcp_checksum(p)
3269         except:
3270             self.logger.error(ppp("Unexpected or invalid packet:", p))
3271             raise
3272
3273     def test_output_feature_and_service(self):
3274         """ NAT44 interface output feature and services """
3275         external_addr = '1.2.3.4'
3276         external_port = 80
3277         local_port = 8080
3278
3279         self.vapi.nat44_forwarding_enable_disable(1)
3280         self.nat44_add_address(self.nat_addr)
3281         self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3282                                       local_port, external_port,
3283                                       proto=IP_PROTOS.tcp, out2in_only=1)
3284         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3285         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3286                                                          is_inside=0)
3287
3288         # from client to service
3289         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3290              IP(src=self.pg1.remote_ip4, dst=external_addr) /
3291              TCP(sport=12345, dport=external_port))
3292         self.pg1.add_stream(p)
3293         self.pg_enable_capture(self.pg_interfaces)
3294         self.pg_start()
3295         capture = self.pg0.get_capture(1)
3296         p = capture[0]
3297         server = None
3298         try:
3299             ip = p[IP]
3300             tcp = p[TCP]
3301             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3302             self.assertEqual(tcp.dport, local_port)
3303             self.check_tcp_checksum(p)
3304             self.check_ip_checksum(p)
3305         except:
3306             self.logger.error(ppp("Unexpected or invalid packet:", p))
3307             raise
3308
3309         # from service back to client
3310         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3311              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3312              TCP(sport=local_port, dport=12345))
3313         self.pg0.add_stream(p)
3314         self.pg_enable_capture(self.pg_interfaces)
3315         self.pg_start()
3316         capture = self.pg1.get_capture(1)
3317         p = capture[0]
3318         try:
3319             ip = p[IP]
3320             tcp = p[TCP]
3321             self.assertEqual(ip.src, external_addr)
3322             self.assertEqual(tcp.sport, external_port)
3323             self.check_tcp_checksum(p)
3324             self.check_ip_checksum(p)
3325         except:
3326             self.logger.error(ppp("Unexpected or invalid packet:", p))
3327             raise
3328
3329         # from local network host to external network
3330         ext_port = 0
3331         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3332              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3333              TCP(sport=23456, dport=34567))
3334         self.pg0.add_stream(p)
3335         self.pg_enable_capture(self.pg_interfaces)
3336         self.pg_start()
3337         capture = self.pg1.get_capture(1)
3338         p = capture[0]
3339         try:
3340             ip = p[IP]
3341             tcp = p[TCP]
3342             self.assertEqual(ip.src, self.nat_addr)
3343             self.assertNotEqual(tcp.sport, 23456)
3344             ext_port = tcp.sport
3345             self.check_tcp_checksum(p)
3346             self.check_ip_checksum(p)
3347         except:
3348             self.logger.error(ppp("Unexpected or invalid packet:", p))
3349             raise
3350
3351         # from external network back to local network host
3352         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3353              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3354              TCP(sport=34567, dport=ext_port))
3355         self.pg1.add_stream(p)
3356         self.pg_enable_capture(self.pg_interfaces)
3357         self.pg_start()
3358         capture = self.pg0.get_capture(1)
3359         p = capture[0]
3360         server = None
3361         try:
3362             ip = p[IP]
3363             tcp = p[TCP]
3364             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3365             self.assertEqual(tcp.dport, 23456)
3366             self.check_tcp_checksum(p)
3367             self.check_ip_checksum(p)
3368         except:
3369             self.logger.error(ppp("Unexpected or invalid packet:", p))
3370             raise
3371
3372     def test_output_feature_and_service2(self):
3373         """ NAT44 interface output feature and service host direct access """
3374         self.vapi.nat44_forwarding_enable_disable(1)
3375         self.nat44_add_address(self.nat_addr)
3376         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3377                                                          is_inside=0)
3378
3379         # session initiaded from service host - translate
3380         pkts = self.create_stream_in(self.pg0, self.pg1)
3381         self.pg0.add_stream(pkts)
3382         self.pg_enable_capture(self.pg_interfaces)
3383         self.pg_start()
3384         capture = self.pg1.get_capture(len(pkts))
3385         self.verify_capture_out(capture)
3386
3387         pkts = self.create_stream_out(self.pg1)
3388         self.pg1.add_stream(pkts)
3389         self.pg_enable_capture(self.pg_interfaces)
3390         self.pg_start()
3391         capture = self.pg0.get_capture(len(pkts))
3392         self.verify_capture_in(capture, self.pg0)
3393
3394         tcp_port_out = self.tcp_port_out
3395         udp_port_out = self.udp_port_out
3396         icmp_id_out = self.icmp_id_out
3397
3398         # session initiaded from remote host - do not translate
3399         pkts = self.create_stream_out(self.pg1,
3400                                       self.pg0.remote_ip4,
3401                                       use_inside_ports=True)
3402         self.pg1.add_stream(pkts)
3403         self.pg_enable_capture(self.pg_interfaces)
3404         self.pg_start()
3405         capture = self.pg0.get_capture(len(pkts))
3406         self.verify_capture_in(capture, self.pg0)
3407
3408         pkts = self.create_stream_in(self.pg0, self.pg1)
3409         self.pg0.add_stream(pkts)
3410         self.pg_enable_capture(self.pg_interfaces)
3411         self.pg_start()
3412         capture = self.pg1.get_capture(len(pkts))
3413         self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3414                                 same_port=True)
3415
3416     def test_one_armed_nat44(self):
3417         """ One armed NAT44 """
3418         remote_host = self.pg9.remote_hosts[0]
3419         local_host = self.pg9.remote_hosts[1]
3420         external_port = 0
3421
3422         self.nat44_add_address(self.nat_addr)
3423         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3424         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3425                                                   is_inside=0)
3426
3427         # in2out
3428         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3429              IP(src=local_host.ip4, dst=remote_host.ip4) /
3430              TCP(sport=12345, dport=80))
3431         self.pg9.add_stream(p)
3432         self.pg_enable_capture(self.pg_interfaces)
3433         self.pg_start()
3434         capture = self.pg9.get_capture(1)
3435         p = capture[0]
3436         try:
3437             ip = p[IP]
3438             tcp = p[TCP]
3439             self.assertEqual(ip.src, self.nat_addr)
3440             self.assertEqual(ip.dst, remote_host.ip4)
3441             self.assertNotEqual(tcp.sport, 12345)
3442             external_port = tcp.sport
3443             self.assertEqual(tcp.dport, 80)
3444             self.check_tcp_checksum(p)
3445             self.check_ip_checksum(p)
3446         except:
3447             self.logger.error(ppp("Unexpected or invalid packet:", p))
3448             raise
3449
3450         # out2in
3451         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3452              IP(src=remote_host.ip4, dst=self.nat_addr) /
3453              TCP(sport=80, dport=external_port))
3454         self.pg9.add_stream(p)
3455         self.pg_enable_capture(self.pg_interfaces)
3456         self.pg_start()
3457         capture = self.pg9.get_capture(1)
3458         p = capture[0]
3459         try:
3460             ip = p[IP]
3461             tcp = p[TCP]
3462             self.assertEqual(ip.src, remote_host.ip4)
3463             self.assertEqual(ip.dst, local_host.ip4)
3464             self.assertEqual(tcp.sport, 80)
3465             self.assertEqual(tcp.dport, 12345)
3466             self.check_tcp_checksum(p)
3467             self.check_ip_checksum(p)
3468         except:
3469             self.logger.error(ppp("Unexpected or invalid packet:", p))
3470             raise
3471
3472     def test_one_armed_nat44_static(self):
3473         """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3474         remote_host = self.pg9.remote_hosts[0]
3475         local_host = self.pg9.remote_hosts[1]
3476         external_port = 80
3477         local_port = 8080
3478         eh_port_in = 0
3479
3480         self.vapi.nat44_forwarding_enable_disable(1)
3481         self.nat44_add_address(self.nat_addr, twice_nat=1)
3482         self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3483                                       local_port, external_port,
3484                                       proto=IP_PROTOS.tcp, out2in_only=1,
3485                                       twice_nat=1)
3486         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3487         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3488                                                   is_inside=0)
3489
3490         # from client to service
3491         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3492              IP(src=remote_host.ip4, dst=self.nat_addr) /
3493              TCP(sport=12345, dport=external_port))
3494         self.pg9.add_stream(p)
3495         self.pg_enable_capture(self.pg_interfaces)
3496         self.pg_start()
3497         capture = self.pg9.get_capture(1)
3498         p = capture[0]
3499         server = None
3500         try:
3501             ip = p[IP]
3502             tcp = p[TCP]
3503             self.assertEqual(ip.dst, local_host.ip4)
3504             self.assertEqual(ip.src, self.nat_addr)
3505             self.assertEqual(tcp.dport, local_port)
3506             self.assertNotEqual(tcp.sport, 12345)
3507             eh_port_in = tcp.sport
3508             self.check_tcp_checksum(p)
3509             self.check_ip_checksum(p)
3510         except:
3511             self.logger.error(ppp("Unexpected or invalid packet:", p))
3512             raise
3513
3514         # from service back to client
3515         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3516              IP(src=local_host.ip4, dst=self.nat_addr) /
3517              TCP(sport=local_port, dport=eh_port_in))
3518         self.pg9.add_stream(p)
3519         self.pg_enable_capture(self.pg_interfaces)
3520         self.pg_start()
3521         capture = self.pg9.get_capture(1)
3522         p = capture[0]
3523         try:
3524             ip = p[IP]
3525             tcp = p[TCP]
3526             self.assertEqual(ip.src, self.nat_addr)
3527             self.assertEqual(ip.dst, remote_host.ip4)
3528             self.assertEqual(tcp.sport, external_port)
3529             self.assertEqual(tcp.dport, 12345)
3530             self.check_tcp_checksum(p)
3531             self.check_ip_checksum(p)
3532         except:
3533             self.logger.error(ppp("Unexpected or invalid packet:", p))
3534             raise
3535
3536     def test_del_session(self):
3537         """ Delete NAT44 session """
3538         self.nat44_add_address(self.nat_addr)
3539         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3540         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3541                                                   is_inside=0)
3542
3543         pkts = self.create_stream_in(self.pg0, self.pg1)
3544         self.pg0.add_stream(pkts)
3545         self.pg_enable_capture(self.pg_interfaces)
3546         self.pg_start()
3547         capture = self.pg1.get_capture(len(pkts))
3548
3549         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3550         nsessions = len(sessions)
3551
3552         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3553                                     sessions[0].inside_port,
3554                                     sessions[0].protocol)
3555         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3556                                     sessions[1].outside_port,
3557                                     sessions[1].protocol,
3558                                     is_in=0)
3559
3560         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3561         self.assertEqual(nsessions - len(sessions), 2)
3562
3563     def test_set_get_reass(self):
3564         """ NAT44 set/get virtual fragmentation reassembly """
3565         reas_cfg1 = self.vapi.nat_get_reass()
3566
3567         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3568                                 max_reass=reas_cfg1.ip4_max_reass * 2,
3569                                 max_frag=reas_cfg1.ip4_max_frag * 2)
3570
3571         reas_cfg2 = self.vapi.nat_get_reass()
3572
3573         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3574         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3575         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3576
3577         self.vapi.nat_set_reass(drop_frag=1)
3578         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3579
3580     def test_frag_in_order(self):
3581         """ NAT44 translate fragments arriving in order """
3582         self.nat44_add_address(self.nat_addr)
3583         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3584         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3585                                                   is_inside=0)
3586
3587         data = "A" * 4 + "B" * 16 + "C" * 3
3588         self.tcp_port_in = random.randint(1025, 65535)
3589
3590         reass = self.vapi.nat_reass_dump()
3591         reass_n_start = len(reass)
3592
3593         # in2out
3594         pkts = self.create_stream_frag(self.pg0,
3595                                        self.pg1.remote_ip4,
3596                                        self.tcp_port_in,
3597                                        20,
3598                                        data)
3599         self.pg0.add_stream(pkts)
3600         self.pg_enable_capture(self.pg_interfaces)
3601         self.pg_start()
3602         frags = self.pg1.get_capture(len(pkts))
3603         p = self.reass_frags_and_verify(frags,
3604                                         self.nat_addr,
3605                                         self.pg1.remote_ip4)
3606         self.assertEqual(p[TCP].dport, 20)
3607         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3608         self.tcp_port_out = p[TCP].sport
3609         self.assertEqual(data, p[Raw].load)
3610
3611         # out2in
3612         pkts = self.create_stream_frag(self.pg1,
3613                                        self.nat_addr,
3614                                        20,
3615                                        self.tcp_port_out,
3616                                        data)
3617         self.pg1.add_stream(pkts)
3618         self.pg_enable_capture(self.pg_interfaces)
3619         self.pg_start()
3620         frags = self.pg0.get_capture(len(pkts))
3621         p = self.reass_frags_and_verify(frags,
3622                                         self.pg1.remote_ip4,
3623                                         self.pg0.remote_ip4)
3624         self.assertEqual(p[TCP].sport, 20)
3625         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3626         self.assertEqual(data, p[Raw].load)
3627
3628         reass = self.vapi.nat_reass_dump()
3629         reass_n_end = len(reass)
3630
3631         self.assertEqual(reass_n_end - reass_n_start, 2)
3632
3633     def test_reass_hairpinning(self):
3634         """ NAT44 fragments hairpinning """
3635         host = self.pg0.remote_hosts[0]
3636         server = self.pg0.remote_hosts[1]
3637         host_in_port = random.randint(1025, 65535)
3638         host_out_port = 0
3639         server_in_port = random.randint(1025, 65535)
3640         server_out_port = random.randint(1025, 65535)
3641         data = "A" * 4 + "B" * 16 + "C" * 3
3642
3643         self.nat44_add_address(self.nat_addr)
3644         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3645         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3646                                                   is_inside=0)
3647         # add static mapping for server
3648         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3649                                       server_in_port, server_out_port,
3650                                       proto=IP_PROTOS.tcp)
3651
3652         # send packet from host to server
3653         pkts = self.create_stream_frag(self.pg0,
3654                                        self.nat_addr,
3655                                        host_in_port,
3656                                        server_out_port,
3657                                        data)
3658         self.pg0.add_stream(pkts)
3659         self.pg_enable_capture(self.pg_interfaces)
3660         self.pg_start()
3661         frags = self.pg0.get_capture(len(pkts))
3662         p = self.reass_frags_and_verify(frags,
3663                                         self.nat_addr,
3664                                         server.ip4)
3665         self.assertNotEqual(p[TCP].sport, host_in_port)
3666         self.assertEqual(p[TCP].dport, server_in_port)
3667         self.assertEqual(data, p[Raw].load)
3668
3669     def test_frag_out_of_order(self):
3670         """ NAT44 translate fragments arriving out of order """
3671         self.nat44_add_address(self.nat_addr)
3672         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3673         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3674                                                   is_inside=0)
3675
3676         data = "A" * 4 + "B" * 16 + "C" * 3
3677         random.randint(1025, 65535)
3678
3679         # in2out
3680         pkts = self.create_stream_frag(self.pg0,
3681                                        self.pg1.remote_ip4,
3682                                        self.tcp_port_in,
3683                                        20,
3684                                        data)
3685         pkts.reverse()
3686         self.pg0.add_stream(pkts)
3687         self.pg_enable_capture(self.pg_interfaces)
3688         self.pg_start()
3689         frags = self.pg1.get_capture(len(pkts))
3690         p = self.reass_frags_and_verify(frags,
3691                                         self.nat_addr,
3692                                         self.pg1.remote_ip4)
3693         self.assertEqual(p[TCP].dport, 20)
3694         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3695         self.tcp_port_out = p[TCP].sport
3696         self.assertEqual(data, p[Raw].load)
3697
3698         # out2in
3699         pkts = self.create_stream_frag(self.pg1,
3700                                        self.nat_addr,
3701                                        20,
3702                                        self.tcp_port_out,
3703                                        data)
3704         pkts.reverse()
3705         self.pg1.add_stream(pkts)
3706         self.pg_enable_capture(self.pg_interfaces)
3707         self.pg_start()
3708         frags = self.pg0.get_capture(len(pkts))
3709         p = self.reass_frags_and_verify(frags,
3710                                         self.pg1.remote_ip4,
3711                                         self.pg0.remote_ip4)
3712         self.assertEqual(p[TCP].sport, 20)
3713         self.assertEqual(p[TCP].dport, self.tcp_port_in)
3714         self.assertEqual(data, p[Raw].load)
3715
3716     def test_port_restricted(self):
3717         """ Port restricted NAT44 (MAP-E CE) """
3718         self.nat44_add_address(self.nat_addr)
3719         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3720         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3721                                                   is_inside=0)
3722         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3723                       "psid-offset 6 psid-len 6")
3724
3725         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3726              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3727              TCP(sport=4567, dport=22))
3728         self.pg0.add_stream(p)
3729         self.pg_enable_capture(self.pg_interfaces)
3730         self.pg_start()
3731         capture = self.pg1.get_capture(1)
3732         p = capture[0]
3733         try:
3734             ip = p[IP]
3735             tcp = p[TCP]
3736             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3737             self.assertEqual(ip.src, self.nat_addr)
3738             self.assertEqual(tcp.dport, 22)
3739             self.assertNotEqual(tcp.sport, 4567)
3740             self.assertEqual((tcp.sport >> 6) & 63, 10)
3741             self.check_tcp_checksum(p)
3742             self.check_ip_checksum(p)
3743         except:
3744             self.logger.error(ppp("Unexpected or invalid packet:", p))
3745             raise
3746
3747     def test_twice_nat(self):
3748         """ Twice NAT44 """
3749         twice_nat_addr = '10.0.1.3'
3750         port_in = 8080
3751         port_out = 80
3752         eh_port_out = 4567
3753         eh_port_in = 0
3754         self.nat44_add_address(self.nat_addr)
3755         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3756         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3757                                       port_in, port_out, proto=IP_PROTOS.tcp,
3758                                       twice_nat=1)
3759         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3760         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3761                                                   is_inside=0)
3762
3763         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3764              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3765              TCP(sport=eh_port_out, dport=port_out))
3766         self.pg1.add_stream(p)
3767         self.pg_enable_capture(self.pg_interfaces)
3768         self.pg_start()
3769         capture = self.pg0.get_capture(1)
3770         p = capture[0]
3771         try:
3772             ip = p[IP]
3773             tcp = p[TCP]
3774             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3775             self.assertEqual(ip.src, twice_nat_addr)
3776             self.assertEqual(tcp.dport, port_in)
3777             self.assertNotEqual(tcp.sport, eh_port_out)
3778             eh_port_in = tcp.sport
3779             self.check_tcp_checksum(p)
3780             self.check_ip_checksum(p)
3781         except:
3782             self.logger.error(ppp("Unexpected or invalid packet:", p))
3783             raise
3784
3785         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3786              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3787              TCP(sport=port_in, dport=eh_port_in))
3788         self.pg0.add_stream(p)
3789         self.pg_enable_capture(self.pg_interfaces)
3790         self.pg_start()
3791         capture = self.pg1.get_capture(1)
3792         p = capture[0]
3793         try:
3794             ip = p[IP]
3795             tcp = p[TCP]
3796             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3797             self.assertEqual(ip.src, self.nat_addr)
3798             self.assertEqual(tcp.dport, eh_port_out)
3799             self.assertEqual(tcp.sport, port_out)
3800             self.check_tcp_checksum(p)
3801             self.check_ip_checksum(p)
3802         except:
3803             self.logger.error(ppp("Unexpected or invalid packet:", p))
3804             raise
3805
3806     def test_twice_nat_lb(self):
3807         """ Twice NAT44 local service load balancing """
3808         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3809         twice_nat_addr = '10.0.1.3'
3810         local_port = 8080
3811         external_port = 80
3812         eh_port_out = 4567
3813         eh_port_in = 0
3814         server1 = self.pg0.remote_hosts[0]
3815         server2 = self.pg0.remote_hosts[1]
3816
3817         locals = [{'addr': server1.ip4n,
3818                    'port': local_port,
3819                    'probability': 50},
3820                   {'addr': server2.ip4n,
3821                    'port': local_port,
3822                    'probability': 50}]
3823
3824         self.nat44_add_address(self.nat_addr)
3825         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3826
3827         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3828                                                   external_port,
3829                                                   IP_PROTOS.tcp,
3830                                                   twice_nat=1,
3831                                                   local_num=len(locals),
3832                                                   locals=locals)
3833         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3834         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3835                                                   is_inside=0)
3836
3837         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3838              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3839              TCP(sport=eh_port_out, dport=external_port))
3840         self.pg1.add_stream(p)
3841         self.pg_enable_capture(self.pg_interfaces)
3842         self.pg_start()
3843         capture = self.pg0.get_capture(1)
3844         p = capture[0]
3845         server = None
3846         try:
3847             ip = p[IP]
3848             tcp = p[TCP]
3849             self.assertEqual(ip.src, twice_nat_addr)
3850             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3851             if ip.dst == server1.ip4:
3852                 server = server1
3853             else:
3854                 server = server2
3855             self.assertNotEqual(tcp.sport, eh_port_out)
3856             eh_port_in = tcp.sport
3857             self.assertEqual(tcp.dport, local_port)
3858             self.check_tcp_checksum(p)
3859             self.check_ip_checksum(p)
3860         except:
3861             self.logger.error(ppp("Unexpected or invalid packet:", p))
3862             raise
3863
3864         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3865              IP(src=server.ip4, dst=twice_nat_addr) /
3866              TCP(sport=local_port, dport=eh_port_in))
3867         self.pg0.add_stream(p)
3868         self.pg_enable_capture(self.pg_interfaces)
3869         self.pg_start()
3870         capture = self.pg1.get_capture(1)
3871         p = capture[0]
3872         try:
3873             ip = p[IP]
3874             tcp = p[TCP]
3875             self.assertEqual(ip.src, self.nat_addr)
3876             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3877             self.assertEqual(tcp.sport, external_port)
3878             self.assertEqual(tcp.dport, eh_port_out)
3879             self.check_tcp_checksum(p)
3880             self.check_ip_checksum(p)
3881         except:
3882             self.logger.error(ppp("Unexpected or invalid packet:", p))
3883             raise
3884
3885     def test_twice_nat_interface_addr(self):
3886         """ Acquire twice NAT44 addresses from interface """
3887         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3888
3889         # no address in NAT pool
3890         adresses = self.vapi.nat44_address_dump()
3891         self.assertEqual(0, len(adresses))
3892
3893         # configure interface address and check NAT address pool
3894         self.pg7.config_ip4()
3895         adresses = self.vapi.nat44_address_dump()
3896         self.assertEqual(1, len(adresses))
3897         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3898         self.assertEqual(adresses[0].twice_nat, 1)
3899
3900         # remove interface address and check NAT address pool
3901         self.pg7.unconfig_ip4()
3902         adresses = self.vapi.nat44_address_dump()
3903         self.assertEqual(0, len(adresses))
3904
3905     def test_ipfix_max_frags(self):
3906         """ IPFIX logging maximum fragments pending reassembly exceeded """
3907         self.nat44_add_address(self.nat_addr)
3908         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3909         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3910                                                   is_inside=0)
3911         self.vapi.nat_set_reass(max_frag=0)
3912         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3913                                      src_address=self.pg3.local_ip4n,
3914                                      path_mtu=512,
3915                                      template_interval=10)
3916         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3917                             src_port=self.ipfix_src_port)
3918
3919         data = "A" * 4 + "B" * 16 + "C" * 3
3920         self.tcp_port_in = random.randint(1025, 65535)
3921         pkts = self.create_stream_frag(self.pg0,
3922                                        self.pg1.remote_ip4,
3923                                        self.tcp_port_in,
3924                                        20,
3925                                        data)
3926         self.pg0.add_stream(pkts[-1])
3927         self.pg_enable_capture(self.pg_interfaces)
3928         self.pg_start()
3929         frags = self.pg1.get_capture(0)
3930         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3931         capture = self.pg3.get_capture(9)
3932         ipfix = IPFIXDecoder()
3933         # first load template
3934         for p in capture:
3935             self.assertTrue(p.haslayer(IPFIX))
3936             self.assertEqual(p[IP].src, self.pg3.local_ip4)
3937             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3938             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3939             self.assertEqual(p[UDP].dport, 4739)
3940             self.assertEqual(p[IPFIX].observationDomainID,
3941                              self.ipfix_domain_id)
3942             if p.haslayer(Template):
3943                 ipfix.add_template(p.getlayer(Template))
3944         # verify events in data set
3945         for p in capture:
3946             if p.haslayer(Data):
3947                 data = ipfix.decode_data_set(p.getlayer(Set))
3948                 self.verify_ipfix_max_fragments_ip4(data, 0,
3949                                                     self.pg0.remote_ip4n)
3950
3951     def tearDown(self):
3952         super(TestNAT44, self).tearDown()
3953         if not self.vpp_dead:
3954             self.logger.info(self.vapi.cli("show nat44 addresses"))
3955             self.logger.info(self.vapi.cli("show nat44 interfaces"))
3956             self.logger.info(self.vapi.cli("show nat44 static mappings"))
3957             self.logger.info(self.vapi.cli("show nat44 interface address"))
3958             self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3959             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3960             self.vapi.cli("nat addr-port-assignment-alg default")
3961             self.clear_nat44()
3962
3963
3964 class TestNAT44Out2InDPO(MethodHolder):
3965     """ NAT44 Test Cases using out2in DPO """
3966
3967     @classmethod
3968     def setUpConstants(cls):
3969         super(TestNAT44Out2InDPO, cls).setUpConstants()
3970         cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3971
3972     @classmethod
3973     def setUpClass(cls):
3974         super(TestNAT44Out2InDPO, cls).setUpClass()
3975
3976         try:
3977             cls.tcp_port_in = 6303
3978             cls.tcp_port_out = 6303
3979             cls.udp_port_in = 6304
3980             cls.udp_port_out = 6304
3981             cls.icmp_id_in = 6305
3982             cls.icmp_id_out = 6305
3983             cls.nat_addr = '10.0.0.3'
3984             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3985             cls.dst_ip4 = '192.168.70.1'
3986
3987             cls.create_pg_interfaces(range(2))
3988
3989             cls.pg0.admin_up()
3990             cls.pg0.config_ip4()
3991             cls.pg0.resolve_arp()
3992
3993             cls.pg1.admin_up()
3994             cls.pg1.config_ip6()
3995             cls.pg1.resolve_ndp()
3996
3997             cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3998                                       dst_address_length=0,
3999                                       next_hop_address=cls.pg1.remote_ip6n,
4000                                       next_hop_sw_if_index=cls.pg1.sw_if_index)
4001
4002         except Exception:
4003             super(TestNAT44Out2InDPO, cls).tearDownClass()
4004             raise
4005
4006     def configure_xlat(self):
4007         self.dst_ip6_pfx = '1:2:3::'
4008         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4009                                               self.dst_ip6_pfx)
4010         self.dst_ip6_pfx_len = 96
4011         self.src_ip6_pfx = '4:5:6::'
4012         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4013                                               self.src_ip6_pfx)
4014         self.src_ip6_pfx_len = 96
4015         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4016                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4017                                  '\x00\x00\x00\x00', 0, is_translation=1,
4018                                  is_rfc6052=1)
4019
4020     def test_464xlat_ce(self):
4021         """ Test 464XLAT CE with NAT44 """
4022
4023         self.configure_xlat()
4024
4025         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4026         self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4027
4028         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4029                                        self.dst_ip6_pfx_len)
4030         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4031                                        self.src_ip6_pfx_len)
4032
4033         try:
4034             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4035             self.pg0.add_stream(pkts)
4036             self.pg_enable_capture(self.pg_interfaces)
4037             self.pg_start()
4038             capture = self.pg1.get_capture(len(pkts))
4039             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4040                                         dst_ip=out_src_ip6)
4041
4042             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4043                                               out_dst_ip6)
4044             self.pg1.add_stream(pkts)
4045             self.pg_enable_capture(self.pg_interfaces)
4046             self.pg_start()
4047             capture = self.pg0.get_capture(len(pkts))
4048             self.verify_capture_in(capture, self.pg0)
4049         finally:
4050             self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4051                                                       is_add=0)
4052             self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4053                                                   self.nat_addr_n, is_add=0)
4054
4055     def test_464xlat_ce_no_nat(self):
4056         """ Test 464XLAT CE without NAT44 """
4057
4058         self.configure_xlat()
4059
4060         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4061                                        self.dst_ip6_pfx_len)
4062         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4063                                        self.src_ip6_pfx_len)
4064
4065         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4066         self.pg0.add_stream(pkts)
4067         self.pg_enable_capture(self.pg_interfaces)
4068         self.pg_start()
4069         capture = self.pg1.get_capture(len(pkts))
4070         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4071                                     nat_ip=out_dst_ip6, same_port=True)
4072
4073         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4074         self.pg1.add_stream(pkts)
4075         self.pg_enable_capture(self.pg_interfaces)
4076         self.pg_start()
4077         capture = self.pg0.get_capture(len(pkts))
4078         self.verify_capture_in(capture, self.pg0)
4079
4080
4081 class TestDeterministicNAT(MethodHolder):
4082     """ Deterministic NAT Test Cases """
4083
4084     @classmethod
4085     def setUpConstants(cls):
4086         super(TestDeterministicNAT, cls).setUpConstants()
4087         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4088
4089     @classmethod
4090     def setUpClass(cls):
4091         super(TestDeterministicNAT, cls).setUpClass()
4092
4093         try:
4094             cls.tcp_port_in = 6303
4095             cls.tcp_external_port = 6303
4096             cls.udp_port_in = 6304
4097             cls.udp_external_port = 6304
4098             cls.icmp_id_in = 6305
4099             cls.nat_addr = '10.0.0.3'
4100
4101             cls.create_pg_interfaces(range(3))
4102             cls.interfaces = list(cls.pg_interfaces)
4103
4104             for i in cls.interfaces:
4105                 i.admin_up()
4106                 i.config_ip4()
4107                 i.resolve_arp()
4108
4109             cls.pg0.generate_remote_hosts(2)
4110             cls.pg0.configure_ipv4_neighbors()
4111
4112         except Exception:
4113             super(TestDeterministicNAT, cls).tearDownClass()
4114             raise
4115
4116     def create_stream_in(self, in_if, out_if, ttl=64):
4117         """
4118         Create packet stream for inside network
4119
4120         :param in_if: Inside interface
4121         :param out_if: Outside interface
4122         :param ttl: TTL of generated packets
4123         """
4124         pkts = []
4125         # TCP
4126         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4127              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4128              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4129         pkts.append(p)
4130
4131         # UDP
4132         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4133              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4134              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4135         pkts.append(p)
4136
4137         # ICMP
4138         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4139              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4140              ICMP(id=self.icmp_id_in, type='echo-request'))
4141         pkts.append(p)
4142
4143         return pkts
4144
4145     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4146         """
4147         Create packet stream for outside network
4148
4149         :param out_if: Outside interface
4150         :param dst_ip: Destination IP address (Default use global NAT address)
4151         :param ttl: TTL of generated packets
4152         """
4153         if dst_ip is None:
4154             dst_ip = self.nat_addr
4155         pkts = []
4156         # TCP
4157         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4158              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4159              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4160         pkts.append(p)
4161
4162         # UDP
4163         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4164              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4165              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4166         pkts.append(p)
4167
4168         # ICMP
4169         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4170              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4171              ICMP(id=self.icmp_external_id, type='echo-reply'))
4172         pkts.append(p)
4173
4174         return pkts
4175
4176     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4177         """
4178         Verify captured packets on outside network
4179
4180         :param capture: Captured packets
4181         :param nat_ip: Translated IP address (Default use global NAT address)
4182         :param same_port: Sorce port number is not translated (Default False)
4183         :param packet_num: Expected number of packets (Default 3)
4184         """
4185         if nat_ip is None:
4186             nat_ip = self.nat_addr
4187         self.assertEqual(packet_num, len(capture))
4188         for packet in capture:
4189             try:
4190                 self.assertEqual(packet[IP].src, nat_ip)
4191                 if packet.haslayer(TCP):
4192                     self.tcp_port_out = packet[TCP].sport
4193                 elif packet.haslayer(UDP):
4194                     self.udp_port_out = packet[UDP].sport
4195                 else:
4196                     self.icmp_external_id = packet[ICMP].id
4197             except:
4198                 self.logger.error(ppp("Unexpected or invalid packet "
4199                                       "(outside network):", packet))
4200                 raise
4201
4202     def initiate_tcp_session(self, in_if, out_if):
4203         """
4204         Initiates TCP session
4205
4206         :param in_if: Inside interface
4207         :param out_if: Outside interface
4208         """
4209         try:
4210             # SYN packet in->out
4211             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4212                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4213                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4214                      flags="S"))
4215             in_if.add_stream(p)
4216             self.pg_enable_capture(self.pg_interfaces)
4217             self.pg_start()
4218             capture = out_if.get_capture(1)
4219             p = capture[0]
4220             self.tcp_port_out = p[TCP].sport
4221
4222             # SYN + ACK packet out->in
4223             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4224                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4225                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4226                      flags="SA"))
4227             out_if.add_stream(p)
4228             self.pg_enable_capture(self.pg_interfaces)
4229             self.pg_start()
4230             in_if.get_capture(1)
4231
4232             # ACK packet in->out
4233             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4234                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4235                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4236                      flags="A"))
4237             in_if.add_stream(p)
4238             self.pg_enable_capture(self.pg_interfaces)
4239             self.pg_start()
4240             out_if.get_capture(1)
4241
4242         except:
4243             self.logger.error("TCP 3 way handshake failed")
4244             raise
4245
4246     def verify_ipfix_max_entries_per_user(self, data):
4247         """
4248         Verify IPFIX maximum entries per user exceeded event
4249
4250         :param data: Decoded IPFIX data records
4251         """
4252         self.assertEqual(1, len(data))
4253         record = data[0]
4254         # natEvent
4255         self.assertEqual(ord(record[230]), 13)
4256         # natQuotaExceededEvent
4257         self.assertEqual('\x03\x00\x00\x00', record[466])
4258         # maxEntriesPerUser
4259         self.assertEqual('\xe8\x03\x00\x00', record[473])
4260         # sourceIPv4Address
4261         self.assertEqual(self.pg0.remote_ip4n, record[8])
4262
4263     def test_deterministic_mode(self):
4264         """ NAT plugin run deterministic mode """
4265         in_addr = '172.16.255.0'
4266         out_addr = '172.17.255.50'
4267         in_addr_t = '172.16.255.20'
4268         in_addr_n = socket.inet_aton(in_addr)
4269         out_addr_n = socket.inet_aton(out_addr)
4270         in_addr_t_n = socket.inet_aton(in_addr_t)
4271         in_plen = 24
4272         out_plen = 32
4273
4274         nat_config = self.vapi.nat_show_config()
4275         self.assertEqual(1, nat_config.deterministic)
4276
4277         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4278
4279         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4280         self.assertEqual(rep1.out_addr[:4], out_addr_n)
4281         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4282         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4283
4284         deterministic_mappings = self.vapi.nat_det_map_dump()
4285         self.assertEqual(len(deterministic_mappings), 1)
4286         dsm = deterministic_mappings[0]
4287         self.assertEqual(in_addr_n, dsm.in_addr[:4])
4288         self.assertEqual(in_plen, dsm.in_plen)
4289         self.assertEqual(out_addr_n, dsm.out_addr[:4])
4290         self.assertEqual(out_plen, dsm.out_plen)
4291
4292         self.clear_nat_det()
4293         deterministic_mappings = self.vapi.nat_det_map_dump()
4294         self.assertEqual(len(deterministic_mappings), 0)
4295
4296     def test_set_timeouts(self):
4297         """ Set deterministic NAT timeouts """
4298         timeouts_before = self.vapi.nat_det_get_timeouts()
4299
4300         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4301                                        timeouts_before.tcp_established + 10,
4302                                        timeouts_before.tcp_transitory + 10,
4303                                        timeouts_before.icmp + 10)
4304
4305         timeouts_after = self.vapi.nat_det_get_timeouts()
4306
4307         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4308         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4309         self.assertNotEqual(timeouts_before.tcp_established,
4310                             timeouts_after.tcp_established)
4311         self.assertNotEqual(timeouts_before.tcp_transitory,
4312                             timeouts_after.tcp_transitory)
4313
4314     def test_det_in(self):
4315         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4316
4317         nat_ip = "10.0.0.10"
4318
4319         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4320                                       32,
4321                                       socket.inet_aton(nat_ip),
4322                                       32)
4323         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4324         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4325                                                   is_inside=0)
4326
4327         # in2out
4328         pkts = self.create_stream_in(self.pg0, self.pg1)
4329         self.pg0.add_stream(pkts)
4330         self.pg_enable_capture(self.pg_interfaces)
4331         self.pg_start()
4332         capture = self.pg1.get_capture(len(pkts))
4333         self.verify_capture_out(capture, nat_ip)
4334
4335         # out2in
4336         pkts = self.create_stream_out(self.pg1, nat_ip)
4337         self.pg1.add_stream(pkts)
4338         self.pg_enable_capture(self.pg_interfaces)
4339         self.pg_start()
4340         capture = self.pg0.get_capture(len(pkts))
4341         self.verify_capture_in(capture, self.pg0)
4342
4343         # session dump test
4344         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4345         self.assertEqual(len(sessions), 3)
4346
4347         # TCP session
4348         s = sessions[0]
4349         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4350         self.assertEqual(s.in_port, self.tcp_port_in)
4351         self.assertEqual(s.out_port, self.tcp_port_out)
4352         self.assertEqual(s.ext_port, self.tcp_external_port)
4353
4354         # UDP session
4355         s = sessions[1]
4356         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4357         self.assertEqual(s.in_port, self.udp_port_in)
4358         self.assertEqual(s.out_port, self.udp_port_out)
4359         self.assertEqual(s.ext_port, self.udp_external_port)
4360
4361         # ICMP session
4362         s = sessions[2]
4363         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4364         self.assertEqual(s.in_port, self.icmp_id_in)
4365         self.assertEqual(s.out_port, self.icmp_external_id)
4366
4367     def test_multiple_users(self):
4368         """ Deterministic NAT multiple users """
4369
4370         nat_ip = "10.0.0.10"
4371         port_in = 80
4372         external_port = 6303
4373
4374         host0 = self.pg0.remote_hosts[0]
4375         host1 = self.pg0.remote_hosts[1]
4376
4377         self.vapi.nat_det_add_del_map(host0.ip4n,
4378                                       24,
4379                                       socket.inet_aton(nat_ip),
4380                                       32)
4381         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4382         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4383                                                   is_inside=0)
4384
4385         # host0 to out
4386         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4387              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4388              TCP(sport=port_in, dport=external_port))
4389         self.pg0.add_stream(p)
4390         self.pg_enable_capture(self.pg_interfaces)
4391         self.pg_start()
4392         capture = self.pg1.get_capture(1)
4393         p = capture[0]
4394         try:
4395             ip = p[IP]
4396             tcp = p[TCP]
4397             self.assertEqual(ip.src, nat_ip)
4398             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4399             self.assertEqual(tcp.dport, external_port)
4400             port_out0 = tcp.sport
4401         except:
4402             self.logger.error(ppp("Unexpected or invalid packet:", p))
4403             raise
4404
4405         # host1 to out
4406         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4407              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4408              TCP(sport=port_in, dport=external_port))
4409         self.pg0.add_stream(p)
4410         self.pg_enable_capture(self.pg_interfaces)
4411         self.pg_start()
4412         capture = self.pg1.get_capture(1)
4413         p = capture[0]
4414         try:
4415             ip = p[IP]
4416             tcp = p[TCP]
4417             self.assertEqual(ip.src, nat_ip)
4418             self.assertEqual(ip.dst, self.pg1.remote_ip4)
4419             self.assertEqual(tcp.dport, external_port)
4420             port_out1 = tcp.sport
4421         except:
4422             self.logger.error(ppp("Unexpected or invalid packet:", p))
4423             raise
4424
4425         dms = self.vapi.nat_det_map_dump()
4426         self.assertEqual(1, len(dms))
4427         self.assertEqual(2, dms[0].ses_num)
4428
4429         # out to host0
4430         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4431              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4432              TCP(sport=external_port, dport=port_out0))
4433         self.pg1.add_stream(p)
4434         self.pg_enable_capture(self.pg_interfaces)
4435         self.pg_start()
4436         capture = self.pg0.get_capture(1)
4437         p = capture[0]
4438         try:
4439             ip = p[IP]
4440             tcp = p[TCP]
4441             self.assertEqual(ip.src, self.pg1.remote_ip4)
4442             self.assertEqual(ip.dst, host0.ip4)
4443             self.assertEqual(tcp.dport, port_in)
4444             self.assertEqual(tcp.sport, external_port)
4445         except:
4446             self.logger.error(ppp("Unexpected or invalid packet:", p))
4447             raise
4448
4449         # out to host1
4450         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4451              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4452              TCP(sport=external_port, dport=port_out1))
4453         self.pg1.add_stream(p)
4454         self.pg_enable_capture(self.pg_interfaces)
4455         self.pg_start()
4456         capture = self.pg0.get_capture(1)
4457         p = capture[0]
4458         try:
4459             ip = p[IP]
4460             tcp = p[TCP]
4461             self.assertEqual(ip.src, self.pg1.remote_ip4)
4462             self.assertEqual(ip.dst, host1.ip4)
4463             self.assertEqual(tcp.dport, port_in)
4464             self.assertEqual(tcp.sport, external_port)
4465         except:
4466             self.logger.error(ppp("Unexpected or invalid packet", p))
4467             raise
4468
4469         # session close api test
4470         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4471                                             port_out1,
4472                                             self.pg1.remote_ip4n,
4473                                             external_port)
4474         dms = self.vapi.nat_det_map_dump()
4475         self.assertEqual(dms[0].ses_num, 1)
4476
4477         self.vapi.nat_det_close_session_in(host0.ip4n,
4478                                            port_in,
4479                                            self.pg1.remote_ip4n,
4480                                            external_port)
4481         dms = self.vapi.nat_det_map_dump()
4482         self.assertEqual(dms[0].ses_num, 0)
4483
4484     def test_tcp_session_close_detection_in(self):
4485         """ Deterministic NAT TCP session close from inside network """
4486         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4487                                       32,
4488                                       socket.inet_aton(self.nat_addr),
4489                                       32)
4490         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4491         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4492                                                   is_inside=0)
4493
4494         self.initiate_tcp_session(self.pg0, self.pg1)
4495
4496         # close the session from inside
4497         try:
4498             # FIN packet in -> out
4499             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4500                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4501                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4502                      flags="F"))
4503             self.pg0.add_stream(p)
4504             self.pg_enable_capture(self.pg_interfaces)
4505             self.pg_start()
4506             self.pg1.get_capture(1)
4507
4508             pkts = []
4509
4510             # ACK packet out -> in
4511             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4512                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4513                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4514                      flags="A"))
4515             pkts.append(p)
4516
4517             # FIN packet out -> in
4518             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4519                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4520                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4521                      flags="F"))
4522             pkts.append(p)
4523
4524             self.pg1.add_stream(pkts)
4525             self.pg_enable_capture(self.pg_interfaces)
4526             self.pg_start()
4527             self.pg0.get_capture(2)
4528
4529             # ACK packet in -> out
4530             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4531                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4532                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4533                      flags="A"))
4534             self.pg0.add_stream(p)
4535             self.pg_enable_capture(self.pg_interfaces)
4536             self.pg_start()
4537             self.pg1.get_capture(1)
4538
4539             # Check if deterministic NAT44 closed the session
4540             dms = self.vapi.nat_det_map_dump()
4541             self.assertEqual(0, dms[0].ses_num)
4542         except:
4543             self.logger.error("TCP session termination failed")
4544             raise
4545
4546     def test_tcp_session_close_detection_out(self):
4547         """ Deterministic NAT TCP session close from outside network """
4548         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4549                                       32,
4550                                       socket.inet_aton(self.nat_addr),
4551                                       32)
4552         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4553         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4554                                                   is_inside=0)
4555
4556         self.initiate_tcp_session(self.pg0, self.pg1)
4557
4558         # close the session from outside
4559         try:
4560             # FIN packet out -> in
4561             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4562                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4563                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4564                      flags="F"))
4565             self.pg1.add_stream(p)
4566             self.pg_enable_capture(self.pg_interfaces)
4567             self.pg_start()
4568             self.pg0.get_capture(1)
4569
4570             pkts = []
4571
4572             # ACK packet in -> out
4573             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4574                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4575                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4576                      flags="A"))
4577             pkts.append(p)
4578
4579             # ACK packet in -> out
4580             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4581                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4582                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4583                      flags="F"))
4584             pkts.append(p)
4585
4586             self.pg0.add_stream(pkts)
4587             self.pg_enable_capture(self.pg_interfaces)
4588             self.pg_start()
4589             self.pg1.get_capture(2)
4590
4591             # ACK packet out -> in
4592             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4593                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4594                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4595                      flags="A"))
4596             self.pg1.add_stream(p)
4597             self.pg_enable_capture(self.pg_interfaces)
4598             self.pg_start()
4599             self.pg0.get_capture(1)
4600
4601             # Check if deterministic NAT44 closed the session
4602             dms = self.vapi.nat_det_map_dump()
4603             self.assertEqual(0, dms[0].ses_num)
4604         except:
4605             self.logger.error("TCP session termination failed")
4606             raise
4607
4608     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4609     def test_session_timeout(self):
4610         """ Deterministic NAT session timeouts """
4611         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4612                                       32,
4613                                       socket.inet_aton(self.nat_addr),
4614                                       32)
4615         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4616         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4617                                                   is_inside=0)
4618
4619         self.initiate_tcp_session(self.pg0, self.pg1)
4620         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4621         pkts = self.create_stream_in(self.pg0, self.pg1)
4622         self.pg0.add_stream(pkts)
4623         self.pg_enable_capture(self.pg_interfaces)
4624         self.pg_start()
4625         capture = self.pg1.get_capture(len(pkts))
4626         sleep(15)
4627
4628         dms = self.vapi.nat_det_map_dump()
4629         self.assertEqual(0, dms[0].ses_num)
4630
4631     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4632     def test_session_limit_per_user(self):
4633         """ Deterministic NAT maximum sessions per user limit """
4634         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4635                                       32,
4636                                       socket.inet_aton(self.nat_addr),
4637                                       32)
4638         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4639         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4640                                                   is_inside=0)
4641         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4642                                      src_address=self.pg2.local_ip4n,
4643                                      path_mtu=512,
4644                                      template_interval=10)
4645         self.vapi.nat_ipfix()
4646
4647         pkts = []
4648         for port in range(1025, 2025):
4649             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4650                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4651                  UDP(sport=port, dport=port))
4652             pkts.append(p)
4653
4654         self.pg0.add_stream(pkts)
4655         self.pg_enable_capture(self.pg_interfaces)
4656         self.pg_start()
4657         capture = self.pg1.get_capture(len(pkts))
4658
4659         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4660              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4661              UDP(sport=3001, dport=3002))
4662         self.pg0.add_stream(p)
4663         self.pg_enable_capture(self.pg_interfaces)
4664         self.pg_start()
4665         capture = self.pg1.assert_nothing_captured()
4666
4667         # verify ICMP error packet
4668         capture = self.pg0.get_capture(1)
4669         p = capture[0]
4670         self.assertTrue(p.haslayer(ICMP))
4671         icmp = p[ICMP]
4672         self.assertEqual(icmp.type, 3)
4673         self.assertEqual(icmp.code, 1)
4674         self.assertTrue(icmp.haslayer(IPerror))
4675         inner_ip = icmp[IPerror]
4676         self.assertEqual(inner_ip[UDPerror].sport, 3001)
4677         self.assertEqual(inner_ip[UDPerror].dport, 3002)
4678
4679         dms = self.vapi.nat_det_map_dump()
4680
4681         self.assertEqual(1000, dms[0].ses_num)
4682
4683         # verify IPFIX logging
4684         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
4685         sleep(1)
4686         capture = self.pg2.get_capture(2)
4687         ipfix = IPFIXDecoder()
4688         # first load template
4689         for p in capture:
4690             self.assertTrue(p.haslayer(IPFIX))
4691             if p.haslayer(Template):
4692                 ipfix.add_template(p.getlayer(Template))
4693         # verify events in data set
4694         for p in capture:
4695             if p.haslayer(Data):
4696                 data = ipfix.decode_data_set(p.getlayer(Set))
4697                 self.verify_ipfix_max_entries_per_user(data)
4698
4699     def clear_nat_det(self):
4700         """
4701         Clear deterministic NAT configuration.
4702         """
4703         self.vapi.nat_ipfix(enable=0)
4704         self.vapi.nat_det_set_timeouts()
4705         deterministic_mappings = self.vapi.nat_det_map_dump()
4706         for dsm in deterministic_mappings:
4707             self.vapi.nat_det_add_del_map(dsm.in_addr,
4708                                           dsm.in_plen,
4709                                           dsm.out_addr,
4710                                           dsm.out_plen,
4711                                           is_add=0)
4712
4713         interfaces = self.vapi.nat44_interface_dump()
4714         for intf in interfaces:
4715             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4716                                                       intf.is_inside,
4717                                                       is_add=0)
4718
4719     def tearDown(self):
4720         super(TestDeterministicNAT, self).tearDown()
4721         if not self.vpp_dead:
4722             self.logger.info(self.vapi.cli("show nat44 interfaces"))
4723             self.logger.info(
4724                 self.vapi.cli("show nat44 deterministic mappings"))
4725             self.logger.info(
4726                 self.vapi.cli("show nat44 deterministic timeouts"))
4727             self.logger.info(
4728                 self.vapi.cli("show nat44 deterministic sessions"))
4729             self.clear_nat_det()
4730
4731
4732 class TestNAT64(MethodHolder):
4733     """ NAT64 Test Cases """
4734
4735     @classmethod
4736     def setUpConstants(cls):
4737         super(TestNAT64, cls).setUpConstants()
4738         cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4739                                 "nat64 st hash buckets 256", "}"])
4740
4741     @classmethod
4742     def setUpClass(cls):
4743         super(TestNAT64, cls).setUpClass()
4744
4745         try:
4746             cls.tcp_port_in = 6303
4747             cls.tcp_port_out = 6303
4748             cls.udp_port_in = 6304
4749             cls.udp_port_out = 6304
4750             cls.icmp_id_in = 6305
4751             cls.icmp_id_out = 6305
4752             cls.nat_addr = '10.0.0.3'
4753             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4754             cls.vrf1_id = 10
4755             cls.vrf1_nat_addr = '10.0.10.3'
4756             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4757                                                    cls.vrf1_nat_addr)
4758             cls.ipfix_src_port = 4739
4759             cls.ipfix_domain_id = 1
4760
4761             cls.create_pg_interfaces(range(5))
4762             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4763             cls.ip6_interfaces.append(cls.pg_interfaces[2])
4764             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4765
4766             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4767
4768             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4769
4770             cls.pg0.generate_remote_hosts(2)
4771
4772             for i in cls.ip6_interfaces:
4773                 i.admin_up()
4774                 i.config_ip6()
4775                 i.configure_ipv6_neighbors()
4776
4777             for i in cls.ip4_interfaces:
4778                 i.admin_up()
4779                 i.config_ip4()
4780                 i.resolve_arp()
4781
4782             cls.pg3.admin_up()
4783             cls.pg3.config_ip4()
4784             cls.pg3.resolve_arp()
4785             cls.pg3.config_ip6()
4786             cls.pg3.configure_ipv6_neighbors()
4787
4788         except Exception:
4789             super(TestNAT64, cls).tearDownClass()
4790             raise
4791
4792     def test_pool(self):
4793         """ Add/delete address to NAT64 pool """
4794         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4795
4796         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4797
4798         addresses = self.vapi.nat64_pool_addr_dump()
4799         self.assertEqual(len(addresses), 1)
4800         self.assertEqual(addresses[0].address, nat_addr)
4801
4802         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4803
4804         addresses = self.vapi.nat64_pool_addr_dump()
4805         self.assertEqual(len(addresses), 0)
4806
4807     def test_interface(self):
4808         """ Enable/disable NAT64 feature on the interface """
4809         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4810         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4811
4812         interfaces = self.vapi.nat64_interface_dump()
4813         self.assertEqual(len(interfaces), 2)
4814         pg0_found = False
4815         pg1_found = False
4816         for intf in interfaces:
4817             if intf.sw_if_index == self.pg0.sw_if_index:
4818                 self.assertEqual(intf.is_inside, 1)
4819                 pg0_found = True
4820             elif intf.sw_if_index == self.pg1.sw_if_index:
4821                 self.assertEqual(intf.is_inside, 0)
4822                 pg1_found = True
4823         self.assertTrue(pg0_found)
4824         self.assertTrue(pg1_found)
4825
4826         features = self.vapi.cli("show interface features pg0")
4827         self.assertNotEqual(features.find('nat64-in2out'), -1)
4828         features = self.vapi.cli("show interface features pg1")
4829         self.assertNotEqual(features.find('nat64-out2in'), -1)
4830
4831         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4832         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4833
4834         interfaces = self.vapi.nat64_interface_dump()
4835         self.assertEqual(len(interfaces), 0)
4836
4837     def test_static_bib(self):
4838         """ Add/delete static BIB entry """
4839         in_addr = socket.inet_pton(socket.AF_INET6,
4840                                    '2001:db8:85a3::8a2e:370:7334')
4841         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4842         in_port = 1234
4843         out_port = 5678
4844         proto = IP_PROTOS.tcp
4845
4846         self.vapi.nat64_add_del_static_bib(in_addr,
4847                                            out_addr,
4848                                            in_port,
4849                                            out_port,
4850                                            proto)
4851         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4852         static_bib_num = 0
4853         for bibe in bib:
4854             if bibe.is_static:
4855                 static_bib_num += 1
4856                 self.assertEqual(bibe.i_addr, in_addr)
4857                 self.assertEqual(bibe.o_addr, out_addr)
4858                 self.assertEqual(bibe.i_port, in_port)
4859                 self.assertEqual(bibe.o_port, out_port)
4860         self.assertEqual(static_bib_num, 1)
4861
4862         self.vapi.nat64_add_del_static_bib(in_addr,
4863                                            out_addr,
4864                                            in_port,
4865                                            out_port,
4866                                            proto,
4867                                            is_add=0)
4868         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4869         static_bib_num = 0
4870         for bibe in bib:
4871             if bibe.is_static:
4872                 static_bib_num += 1
4873         self.assertEqual(static_bib_num, 0)
4874
4875     def test_set_timeouts(self):
4876         """ Set NAT64 timeouts """
4877         # verify default values
4878         timeouts = self.vapi.nat64_get_timeouts()
4879         self.assertEqual(timeouts.udp, 300)
4880         self.assertEqual(timeouts.icmp, 60)
4881         self.assertEqual(timeouts.tcp_trans, 240)
4882         self.assertEqual(timeouts.tcp_est, 7440)
4883         self.assertEqual(timeouts.tcp_incoming_syn, 6)
4884
4885         # set and verify custom values
4886         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4887                                      tcp_est=7450, tcp_incoming_syn=10)
4888         timeouts = self.vapi.nat64_get_timeouts()
4889         self.assertEqual(timeouts.udp, 200)
4890         self.assertEqual(timeouts.icmp, 30)
4891         self.assertEqual(timeouts.tcp_trans, 250)
4892         self.assertEqual(timeouts.tcp_est, 7450)
4893         self.assertEqual(timeouts.tcp_incoming_syn, 10)
4894
4895     def test_dynamic(self):
4896         """ NAT64 dynamic translation test """
4897         self.tcp_port_in = 6303
4898         self.udp_port_in = 6304
4899         self.icmp_id_in = 6305
4900
4901         ses_num_start = self.nat64_get_ses_num()
4902
4903         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4904                                                 self.nat_addr_n)
4905         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4906         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4907
4908         # in2out
4909         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4910         self.pg0.add_stream(pkts)
4911         self.pg_enable_capture(self.pg_interfaces)
4912         self.pg_start()
4913         capture = self.pg1.get_capture(len(pkts))
4914         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4915                                 dst_ip=self.pg1.remote_ip4)
4916
4917         # out2in
4918         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4919         self.pg1.add_stream(pkts)
4920         self.pg_enable_capture(self.pg_interfaces)
4921         self.pg_start()
4922         capture = self.pg0.get_capture(len(pkts))
4923         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4924         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4925
4926         # in2out
4927         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4928         self.pg0.add_stream(pkts)
4929         self.pg_enable_capture(self.pg_interfaces)
4930         self.pg_start()
4931         capture = self.pg1.get_capture(len(pkts))
4932         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4933                                 dst_ip=self.pg1.remote_ip4)
4934
4935         # out2in
4936         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4937         self.pg1.add_stream(pkts)
4938         self.pg_enable_capture(self.pg_interfaces)
4939         self.pg_start()
4940         capture = self.pg0.get_capture(len(pkts))
4941         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4942
4943         ses_num_end = self.nat64_get_ses_num()
4944
4945         self.assertEqual(ses_num_end - ses_num_start, 3)
4946
4947         # tenant with specific VRF
4948         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4949                                                 self.vrf1_nat_addr_n,
4950                                                 vrf_id=self.vrf1_id)
4951         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4952
4953         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4954         self.pg2.add_stream(pkts)
4955         self.pg_enable_capture(self.pg_interfaces)
4956         self.pg_start()
4957         capture = self.pg1.get_capture(len(pkts))
4958         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4959                                 dst_ip=self.pg1.remote_ip4)
4960
4961         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4962         self.pg1.add_stream(pkts)
4963         self.pg_enable_capture(self.pg_interfaces)
4964         self.pg_start()
4965         capture = self.pg2.get_capture(len(pkts))
4966         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4967
4968     def test_static(self):
4969         """ NAT64 static translation test """
4970         self.tcp_port_in = 60303
4971         self.udp_port_in = 60304
4972         self.icmp_id_in = 60305
4973         self.tcp_port_out = 60303
4974         self.udp_port_out = 60304
4975         self.icmp_id_out = 60305
4976
4977         ses_num_start = self.nat64_get_ses_num()
4978
4979         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4980                                                 self.nat_addr_n)
4981         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4982         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4983
4984         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4985                                            self.nat_addr_n,
4986                                            self.tcp_port_in,
4987                                            self.tcp_port_out,
4988                                            IP_PROTOS.tcp)
4989         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4990                                            self.nat_addr_n,
4991                                            self.udp_port_in,
4992                                            self.udp_port_out,
4993                                            IP_PROTOS.udp)
4994         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4995                                            self.nat_addr_n,
4996                                            self.icmp_id_in,
4997                                            self.icmp_id_out,
4998                                            IP_PROTOS.icmp)
4999
5000         # in2out
5001         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5002         self.pg0.add_stream(pkts)
5003         self.pg_enable_capture(self.pg_interfaces)
5004         self.pg_start()
5005         capture = self.pg1.get_capture(len(pkts))
5006         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5007                                 dst_ip=self.pg1.remote_ip4, same_port=True)
5008
5009         # out2in
5010         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5011         self.pg1.add_stream(pkts)
5012         self.pg_enable_capture(self.pg_interfaces)
5013         self.pg_start()
5014         capture = self.pg0.get_capture(len(pkts))
5015         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5016         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5017
5018         ses_num_end = self.nat64_get_ses_num()
5019
5020         self.assertEqual(ses_num_end - ses_num_start, 3)
5021
5022     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5023     def test_session_timeout(self):
5024         """ NAT64 session timeout """
5025         self.icmp_id_in = 1234
5026         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5027                                                 self.nat_addr_n)
5028         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5029         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5030         self.vapi.nat64_set_timeouts(icmp=5)
5031
5032         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5033         self.pg0.add_stream(pkts)
5034         self.pg_enable_capture(self.pg_interfaces)
5035         self.pg_start()
5036         capture = self.pg1.get_capture(len(pkts))
5037
5038         ses_num_before_timeout = self.nat64_get_ses_num()
5039
5040         sleep(15)
5041
5042         # ICMP session after timeout
5043         ses_num_after_timeout = self.nat64_get_ses_num()
5044         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5045
5046     def test_icmp_error(self):
5047         """ NAT64 ICMP Error message translation """
5048         self.tcp_port_in = 6303
5049         self.udp_port_in = 6304
5050         self.icmp_id_in = 6305
5051
5052         ses_num_start = self.nat64_get_ses_num()
5053
5054         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5055                                                 self.nat_addr_n)
5056         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5057         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5058
5059         # send some packets to create sessions
5060         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5061         self.pg0.add_stream(pkts)
5062         self.pg_enable_capture(self.pg_interfaces)
5063         self.pg_start()
5064         capture_ip4 = self.pg1.get_capture(len(pkts))
5065         self.verify_capture_out(capture_ip4,
5066                                 nat_ip=self.nat_addr,
5067                                 dst_ip=self.pg1.remote_ip4)
5068
5069         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5070         self.pg1.add_stream(pkts)
5071         self.pg_enable_capture(self.pg_interfaces)
5072         self.pg_start()
5073         capture_ip6 = self.pg0.get_capture(len(pkts))
5074         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5075         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5076                                    self.pg0.remote_ip6)
5077
5078         # in2out
5079         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5080                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5081                 ICMPv6DestUnreach(code=1) /
5082                 packet[IPv6] for packet in capture_ip6]
5083         self.pg0.add_stream(pkts)
5084         self.pg_enable_capture(self.pg_interfaces)
5085         self.pg_start()
5086         capture = self.pg1.get_capture(len(pkts))
5087         for packet in capture:
5088             try:
5089                 self.assertEqual(packet[IP].src, self.nat_addr)
5090                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5091                 self.assertEqual(packet[ICMP].type, 3)
5092                 self.assertEqual(packet[ICMP].code, 13)
5093                 inner = packet[IPerror]
5094                 self.assertEqual(inner.src, self.pg1.remote_ip4)
5095                 self.assertEqual(inner.dst, self.nat_addr)
5096                 self.check_icmp_checksum(packet)
5097                 if inner.haslayer(TCPerror):
5098                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5099                 elif inner.haslayer(UDPerror):
5100                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5101                 else:
5102                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5103             except:
5104                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5105                 raise
5106
5107         # out2in
5108         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5109                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5110                 ICMP(type=3, code=13) /
5111                 packet[IP] for packet in capture_ip4]
5112         self.pg1.add_stream(pkts)
5113         self.pg_enable_capture(self.pg_interfaces)
5114         self.pg_start()
5115         capture = self.pg0.get_capture(len(pkts))
5116         for packet in capture:
5117             try:
5118                 self.assertEqual(packet[IPv6].src, ip.src)
5119                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5120                 icmp = packet[ICMPv6DestUnreach]
5121                 self.assertEqual(icmp.code, 1)
5122                 inner = icmp[IPerror6]
5123                 self.assertEqual(inner.src, self.pg0.remote_ip6)
5124                 self.assertEqual(inner.dst, ip.src)
5125                 self.check_icmpv6_checksum(packet)
5126                 if inner.haslayer(TCPerror):
5127                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5128                 elif inner.haslayer(UDPerror):
5129                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5130                 else:
5131                     self.assertEqual(inner[ICMPv6EchoRequest].id,
5132                                      self.icmp_id_in)
5133             except:
5134                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5135                 raise
5136
5137     def test_hairpinning(self):
5138         """ NAT64 hairpinning """
5139
5140         client = self.pg0.remote_hosts[0]
5141         server = self.pg0.remote_hosts[1]
5142         server_tcp_in_port = 22
5143         server_tcp_out_port = 4022
5144         server_udp_in_port = 23
5145         server_udp_out_port = 4023
5146         client_tcp_in_port = 1234
5147         client_udp_in_port = 1235
5148         client_tcp_out_port = 0
5149         client_udp_out_port = 0
5150         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5151         nat_addr_ip6 = ip.src
5152
5153         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5154                                                 self.nat_addr_n)
5155         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5156         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5157
5158         self.vapi.nat64_add_del_static_bib(server.ip6n,
5159                                            self.nat_addr_n,
5160                                            server_tcp_in_port,
5161                                            server_tcp_out_port,
5162                                            IP_PROTOS.tcp)
5163         self.vapi.nat64_add_del_static_bib(server.ip6n,
5164                                            self.nat_addr_n,
5165                                            server_udp_in_port,
5166                                            server_udp_out_port,
5167                                            IP_PROTOS.udp)
5168
5169         # client to server
5170         pkts = []
5171         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5172              IPv6(src=client.ip6, dst=nat_addr_ip6) /
5173              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5174         pkts.append(p)
5175         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5176              IPv6(src=client.ip6, dst=nat_addr_ip6) /
5177              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5178         pkts.append(p)
5179         self.pg0.add_stream(pkts)
5180         self.pg_enable_capture(self.pg_interfaces)
5181         self.pg_start()
5182         capture = self.pg0.get_capture(len(pkts))
5183         for packet in capture:
5184             try:
5185                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5186                 self.assertEqual(packet[IPv6].dst, server.ip6)
5187                 if packet.haslayer(TCP):
5188                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5189                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5190                     self.check_tcp_checksum(packet)
5191                     client_tcp_out_port = packet[TCP].sport
5192                 else:
5193                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5194                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
5195                     self.check_udp_checksum(packet)
5196                     client_udp_out_port = packet[UDP].sport
5197             except:
5198                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5199                 raise
5200
5201         # server to client
5202         pkts = []
5203         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5204              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5205              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5206         pkts.append(p)
5207         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5208              IPv6(src=server.ip6, dst=nat_addr_ip6) /
5209              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5210         pkts.append(p)
5211         self.pg0.add_stream(pkts)
5212         self.pg_enable_capture(self.pg_interfaces)
5213         self.pg_start()
5214         capture = self.pg0.get_capture(len(pkts))
5215         for packet in capture:
5216             try:
5217                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5218                 self.assertEqual(packet[IPv6].dst, client.ip6)
5219                 if packet.haslayer(TCP):
5220                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5221                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5222                     self.check_tcp_checksum(packet)
5223                 else:
5224                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
5225                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
5226                     self.check_udp_checksum(packet)
5227             except:
5228                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5229                 raise
5230
5231         # ICMP error
5232         pkts = []
5233         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5234                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5235                 ICMPv6DestUnreach(code=1) /
5236                 packet[IPv6] for packet in capture]
5237         self.pg0.add_stream(pkts)
5238         self.pg_enable_capture(self.pg_interfaces)
5239         self.pg_start()
5240         capture = self.pg0.get_capture(len(pkts))
5241         for packet in capture:
5242             try:
5243                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5244                 self.assertEqual(packet[IPv6].dst, server.ip6)
5245                 icmp = packet[ICMPv6DestUnreach]
5246                 self.assertEqual(icmp.code, 1)
5247                 inner = icmp[IPerror6]
5248                 self.assertEqual(inner.src, server.ip6)
5249                 self.assertEqual(inner.dst, nat_addr_ip6)
5250                 self.check_icmpv6_checksum(packet)
5251                 if inner.haslayer(TCPerror):
5252                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5253                     self.assertEqual(inner[TCPerror].dport,
5254                                      client_tcp_out_port)
5255                 else:
5256                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5257                     self.assertEqual(inner[UDPerror].dport,
5258                                      client_udp_out_port)
5259             except:
5260                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5261                 raise
5262
5263     def test_prefix(self):
5264         """ NAT64 Network-Specific Prefix """
5265
5266         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5267                                                 self.nat_addr_n)
5268         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5269         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5270         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5271                                                 self.vrf1_nat_addr_n,
5272                                                 vrf_id=self.vrf1_id)
5273         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5274
5275         # Add global prefix
5276         global_pref64 = "2001:db8::"
5277         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5278         global_pref64_len = 32
5279         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5280
5281         prefix = self.vapi.nat64_prefix_dump()
5282         self.assertEqual(len(prefix), 1)
5283         self.assertEqual(prefix[0].prefix, global_pref64_n)
5284         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5285         self.assertEqual(prefix[0].vrf_id, 0)
5286
5287         # Add tenant specific prefix
5288         vrf1_pref64 = "2001:db8:122:300::"
5289         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5290         vrf1_pref64_len = 56
5291         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5292                                        vrf1_pref64_len,
5293                                        vrf_id=self.vrf1_id)
5294         prefix = self.vapi.nat64_prefix_dump()
5295         self.assertEqual(len(prefix), 2)
5296
5297         # Global prefix
5298         pkts = self.create_stream_in_ip6(self.pg0,
5299                                          self.pg1,
5300                                          pref=global_pref64,
5301                                          plen=global_pref64_len)
5302         self.pg0.add_stream(pkts)
5303         self.pg_enable_capture(self.pg_interfaces)
5304         self.pg_start()
5305         capture = self.pg1.get_capture(len(pkts))
5306         self.verify_capture_out(capture, nat_ip=self.nat_addr,
5307                                 dst_ip=self.pg1.remote_ip4)
5308
5309         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5310         self.pg1.add_stream(pkts)
5311         self.pg_enable_capture(self.pg_interfaces)
5312         self.pg_start()
5313         capture = self.pg0.get_capture(len(pkts))
5314         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5315                                   global_pref64,
5316                                   global_pref64_len)
5317         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5318
5319         # Tenant specific prefix
5320         pkts = self.create_stream_in_ip6(self.pg2,
5321                                          self.pg1,
5322                                          pref=vrf1_pref64,
5323                                          plen=vrf1_pref64_len)
5324         self.pg2.add_stream(pkts)
5325         self.pg_enable_capture(self.pg_interfaces)
5326         self.pg_start()
5327         capture = self.pg1.get_capture(len(pkts))
5328         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5329                                 dst_ip=self.pg1.remote_ip4)
5330
5331         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5332         self.pg1.add_stream(pkts)
5333         self.pg_enable_capture(self.pg_interfaces)
5334         self.pg_start()
5335         capture = self.pg2.get_capture(len(pkts))
5336         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5337                                   vrf1_pref64,
5338                                   vrf1_pref64_len)
5339         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5340
5341     def test_unknown_proto(self):
5342         """ NAT64 translate packet with unknown protocol """
5343
5344         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5345                                                 self.nat_addr_n)
5346         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5347         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5348         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5349
5350         # in2out
5351         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5352              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5353              TCP(sport=self.tcp_port_in, dport=20))
5354         self.pg0.add_stream(p)
5355         self.pg_enable_capture(self.pg_interfaces)
5356         self.pg_start()
5357         p = self.pg1.get_capture(1)
5358
5359         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5360              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5361              GRE() /
5362              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5363              TCP(sport=1234, dport=1234))
5364         self.pg0.add_stream(p)
5365         self.pg_enable_capture(self.pg_interfaces)
5366         self.pg_start()
5367         p = self.pg1.get_capture(1)
5368         packet = p[0]
5369         try:
5370             self.assertEqual(packet[IP].src, self.nat_addr)
5371             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5372             self.assertTrue(packet.haslayer(GRE))
5373             self.check_ip_checksum(packet)
5374         except:
5375             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5376             raise
5377
5378         # out2in
5379         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5380              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5381              GRE() /
5382              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5383              TCP(sport=1234, dport=1234))
5384         self.pg1.add_stream(p)
5385         self.pg_enable_capture(self.pg_interfaces)
5386         self.pg_start()
5387         p = self.pg0.get_capture(1)
5388         packet = p[0]
5389         try:
5390             self.assertEqual(packet[IPv6].src, remote_ip6)
5391             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5392             self.assertEqual(packet[IPv6].nh, 47)
5393         except:
5394             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5395             raise
5396
5397     def test_hairpinning_unknown_proto(self):
5398         """ NAT64 translate packet with unknown protocol - hairpinning """
5399
5400         client = self.pg0.remote_hosts[0]
5401         server = self.pg0.remote_hosts[1]
5402         server_tcp_in_port = 22
5403         server_tcp_out_port = 4022
5404         client_tcp_in_port = 1234
5405         client_tcp_out_port = 1235
5406         server_nat_ip = "10.0.0.100"
5407         client_nat_ip = "10.0.0.110"
5408         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5409         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5410         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5411         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5412
5413         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5414                                                 client_nat_ip_n)
5415         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5416         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5417
5418         self.vapi.nat64_add_del_static_bib(server.ip6n,
5419                                            server_nat_ip_n,
5420                                            server_tcp_in_port,
5421                                            server_tcp_out_port,
5422                                            IP_PROTOS.tcp)
5423
5424         self.vapi.nat64_add_del_static_bib(server.ip6n,
5425                                            server_nat_ip_n,
5426                                            0,
5427                                            0,
5428                                            IP_PROTOS.gre)
5429
5430         self.vapi.nat64_add_del_static_bib(client.ip6n,
5431                                            client_nat_ip_n,
5432                                            client_tcp_in_port,
5433                                            client_tcp_out_port,
5434                                            IP_PROTOS.tcp)
5435
5436         # client to server
5437         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5438              IPv6(src=client.ip6, dst=server_nat_ip6) /
5439              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5440         self.pg0.add_stream(p)
5441         self.pg_enable_capture(self.pg_interfaces)
5442         self.pg_start()
5443         p = self.pg0.get_capture(1)
5444
5445         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5446              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5447              GRE() /
5448              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5449              TCP(sport=1234, dport=1234))
5450         self.pg0.add_stream(p)
5451         self.pg_enable_capture(self.pg_interfaces)
5452         self.pg_start()
5453         p = self.pg0.get_capture(1)
5454         packet = p[0]
5455         try:
5456             self.assertEqual(packet[IPv6].src, client_nat_ip6)
5457             self.assertEqual(packet[IPv6].dst, server.ip6)
5458             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5459         except:
5460             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5461             raise
5462
5463         # server to client
5464         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5465              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5466              GRE() /
5467              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5468              TCP(sport=1234, dport=1234))
5469         self.pg0.add_stream(p)
5470         self.pg_enable_capture(self.pg_interfaces)
5471         self.pg_start()
5472         p = self.pg0.get_capture(1)
5473         packet = p[0]
5474         try:
5475             self.assertEqual(packet[IPv6].src, server_nat_ip6)
5476             self.assertEqual(packet[IPv6].dst, client.ip6)
5477             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5478         except:
5479             self.logger.error(ppp("Unexpected or invalid packet:", packet))
5480             raise
5481
5482     def test_one_armed_nat64(self):
5483         """ One armed NAT64 """
5484         external_port = 0
5485         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5486                                            '64:ff9b::',
5487                                            96)
5488
5489         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5490                                                 self.nat_addr_n)
5491         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5492         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5493
5494         # in2out
5495         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5496              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5497              TCP(sport=12345, dport=80))
5498         self.pg3.add_stream(p)
5499         self.pg_enable_capture(self.pg_interfaces)
5500         self.pg_start()
5501         capture = self.pg3.get_capture(1)
5502         p = capture[0]
5503         try:
5504             ip = p[IP]
5505             tcp = p[TCP]
5506             self.assertEqual(ip.src, self.nat_addr)
5507             self.assertEqual(ip.dst, self.pg3.remote_ip4)
5508             self.assertNotEqual(tcp.sport, 12345)
5509             external_port = tcp.sport
5510             self.assertEqual(tcp.dport, 80)
5511             self.check_tcp_checksum(p)
5512             self.check_ip_checksum(p)
5513         except:
5514             self.logger.error(ppp("Unexpected or invalid packet:", p))
5515             raise
5516
5517         # out2in
5518         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5519              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5520              TCP(sport=80, dport=external_port))
5521         self.pg3.add_stream(p)
5522         self.pg_enable_capture(self.pg_interfaces)
5523         self.pg_start()
5524         capture = self.pg3.get_capture(1)
5525         p = capture[0]
5526         try:
5527             ip = p[IPv6]
5528             tcp = p[TCP]
5529             self.assertEqual(ip.src, remote_host_ip6)
5530             self.assertEqual(ip.dst, self.pg3.remote_ip6)
5531             self.assertEqual(tcp.sport, 80)
5532             self.assertEqual(tcp.dport, 12345)
5533             self.check_tcp_checksum(p)
5534         except:
5535             self.logger.error(ppp("Unexpected or invalid packet:", p))
5536             raise
5537
5538     def test_frag_in_order(self):
5539         """ NAT64 translate fragments arriving in order """
5540         self.tcp_port_in = random.randint(1025, 65535)
5541
5542         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5543                                                 self.nat_addr_n)
5544         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5545         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5546
5547         reass = self.vapi.nat_reass_dump()
5548         reass_n_start = len(reass)
5549
5550         # in2out
5551         data = 'a' * 200
5552         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5553                                            self.tcp_port_in, 20, data)
5554         self.pg0.add_stream(pkts)
5555         self.pg_enable_capture(self.pg_interfaces)
5556         self.pg_start()
5557         frags = self.pg1.get_capture(len(pkts))
5558         p = self.reass_frags_and_verify(frags,
5559                                         self.nat_addr,
5560                                         self.pg1.remote_ip4)
5561         self.assertEqual(p[TCP].dport, 20)
5562         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5563         self.tcp_port_out = p[TCP].sport
5564         self.assertEqual(data, p[Raw].load)
5565
5566         # out2in
5567         data = "A" * 4 + "b" * 16 + "C" * 3
5568         pkts = self.create_stream_frag(self.pg1,
5569                                        self.nat_addr,
5570                                        20,
5571                                        self.tcp_port_out,
5572                                        data)
5573         self.pg1.add_stream(pkts)
5574         self.pg_enable_capture(self.pg_interfaces)
5575         self.pg_start()
5576         frags = self.pg0.get_capture(len(pkts))
5577         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5578         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5579         self.assertEqual(p[TCP].sport, 20)
5580         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5581         self.assertEqual(data, p[Raw].load)
5582
5583         reass = self.vapi.nat_reass_dump()
5584         reass_n_end = len(reass)
5585
5586         self.assertEqual(reass_n_end - reass_n_start, 2)
5587
5588     def test_reass_hairpinning(self):
5589         """ NAT64 fragments hairpinning """
5590         data = 'a' * 200
5591         client = self.pg0.remote_hosts[0]
5592         server = self.pg0.remote_hosts[1]
5593         server_in_port = random.randint(1025, 65535)
5594         server_out_port = random.randint(1025, 65535)
5595         client_in_port = random.randint(1025, 65535)
5596         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5597         nat_addr_ip6 = ip.src
5598
5599         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5600                                                 self.nat_addr_n)
5601         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5602         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5603
5604         # add static BIB entry for server
5605         self.vapi.nat64_add_del_static_bib(server.ip6n,
5606                                            self.nat_addr_n,
5607                                            server_in_port,
5608                                            server_out_port,
5609                                            IP_PROTOS.tcp)
5610
5611         # send packet from host to server
5612         pkts = self.create_stream_frag_ip6(self.pg0,
5613                                            self.nat_addr,
5614                                            client_in_port,
5615                                            server_out_port,
5616                                            data)
5617         self.pg0.add_stream(pkts)
5618         self.pg_enable_capture(self.pg_interfaces)
5619         self.pg_start()
5620         frags = self.pg0.get_capture(len(pkts))
5621         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5622         self.assertNotEqual(p[TCP].sport, client_in_port)
5623         self.assertEqual(p[TCP].dport, server_in_port)
5624         self.assertEqual(data, p[Raw].load)
5625
5626     def test_frag_out_of_order(self):
5627         """ NAT64 translate fragments arriving out of order """
5628         self.tcp_port_in = random.randint(1025, 65535)
5629
5630         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5631                                                 self.nat_addr_n)
5632         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5633         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5634
5635         # in2out
5636         data = 'a' * 200
5637         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5638                                            self.tcp_port_in, 20, data)
5639         pkts.reverse()
5640         self.pg0.add_stream(pkts)
5641         self.pg_enable_capture(self.pg_interfaces)
5642         self.pg_start()
5643         frags = self.pg1.get_capture(len(pkts))
5644         p = self.reass_frags_and_verify(frags,
5645                                         self.nat_addr,
5646                                         self.pg1.remote_ip4)
5647         self.assertEqual(p[TCP].dport, 20)
5648         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5649         self.tcp_port_out = p[TCP].sport
5650         self.assertEqual(data, p[Raw].load)
5651
5652         # out2in
5653         data = "A" * 4 + "B" * 16 + "C" * 3
5654         pkts = self.create_stream_frag(self.pg1,
5655                                        self.nat_addr,
5656                                        20,
5657                                        self.tcp_port_out,
5658                                        data)
5659         pkts.reverse()
5660         self.pg1.add_stream(pkts)
5661         self.pg_enable_capture(self.pg_interfaces)
5662         self.pg_start()
5663         frags = self.pg0.get_capture(len(pkts))
5664         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5665         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5666         self.assertEqual(p[TCP].sport, 20)
5667         self.assertEqual(p[TCP].dport, self.tcp_port_in)
5668         self.assertEqual(data, p[Raw].load)
5669
5670     def test_interface_addr(self):
5671         """ Acquire NAT64 pool addresses from interface """
5672         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5673
5674         # no address in NAT64 pool
5675         adresses = self.vapi.nat44_address_dump()
5676         self.assertEqual(0, len(adresses))
5677
5678         # configure interface address and check NAT64 address pool
5679         self.pg4.config_ip4()
5680         addresses = self.vapi.nat64_pool_addr_dump()
5681         self.assertEqual(len(addresses), 1)
5682         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5683
5684         # remove interface address and check NAT64 address pool
5685         self.pg4.unconfig_ip4()
5686         addresses = self.vapi.nat64_pool_addr_dump()
5687         self.assertEqual(0, len(adresses))
5688
5689     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5690     def test_ipfix_max_bibs_sessions(self):
5691         """ IPFIX logging maximum session and BIB entries exceeded """
5692         max_bibs = 1280
5693         max_sessions = 2560
5694         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5695                                            '64:ff9b::',
5696                                            96)
5697
5698         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5699                                                 self.nat_addr_n)
5700         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5701         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5702
5703         pkts = []
5704         src = ""
5705         for i in range(0, max_bibs):
5706             src = "fd01:aa::%x" % (i)
5707             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5708                  IPv6(src=src, dst=remote_host_ip6) /
5709                  TCP(sport=12345, dport=80))
5710             pkts.append(p)
5711             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5712                  IPv6(src=src, dst=remote_host_ip6) /
5713                  TCP(sport=12345, dport=22))
5714             pkts.append(p)
5715         self.pg0.add_stream(pkts)
5716         self.pg_enable_capture(self.pg_interfaces)
5717         self.pg_start()
5718         self.pg1.get_capture(max_sessions)
5719
5720         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5721                                      src_address=self.pg3.local_ip4n,
5722                                      path_mtu=512,
5723                                      template_interval=10)
5724         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5725                             src_port=self.ipfix_src_port)
5726
5727         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5728              IPv6(src=src, dst=remote_host_ip6) /
5729              TCP(sport=12345, dport=25))
5730         self.pg0.add_stream(p)
5731         self.pg_enable_capture(self.pg_interfaces)
5732         self.pg_start()
5733         self.pg1.get_capture(0)
5734         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5735         capture = self.pg3.get_capture(9)
5736         ipfix = IPFIXDecoder()
5737         # first load template
5738         for p in capture:
5739             self.assertTrue(p.haslayer(IPFIX))
5740             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5741             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5742             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5743             self.assertEqual(p[UDP].dport, 4739)
5744             self.assertEqual(p[IPFIX].observationDomainID,
5745                              self.ipfix_domain_id)
5746             if p.haslayer(Template):
5747                 ipfix.add_template(p.getlayer(Template))
5748         # verify events in data set
5749         for p in capture:
5750             if p.haslayer(Data):
5751                 data = ipfix.decode_data_set(p.getlayer(Set))
5752                 self.verify_ipfix_max_sessions(data, max_sessions)
5753
5754         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5755              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5756              TCP(sport=12345, dport=80))
5757         self.pg0.add_stream(p)
5758         self.pg_enable_capture(self.pg_interfaces)
5759         self.pg_start()
5760         self.pg1.get_capture(0)
5761         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5762         capture = self.pg3.get_capture(1)
5763         # verify events in data set
5764         for p in capture:
5765             self.assertTrue(p.haslayer(IPFIX))
5766             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5767             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5768             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5769             self.assertEqual(p[UDP].dport, 4739)
5770             self.assertEqual(p[IPFIX].observationDomainID,
5771                              self.ipfix_domain_id)
5772             if p.haslayer(Data):
5773                 data = ipfix.decode_data_set(p.getlayer(Set))
5774                 self.verify_ipfix_max_bibs(data, max_bibs)
5775
5776     def test_ipfix_max_frags(self):
5777         """ IPFIX logging maximum fragments pending reassembly exceeded """
5778         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5779                                                 self.nat_addr_n)
5780         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5781         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5782         self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5783         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5784                                      src_address=self.pg3.local_ip4n,
5785                                      path_mtu=512,
5786                                      template_interval=10)
5787         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5788                             src_port=self.ipfix_src_port)
5789
5790         data = 'a' * 200
5791         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5792                                            self.tcp_port_in, 20, data)
5793         self.pg0.add_stream(pkts[-1])
5794         self.pg_enable_capture(self.pg_interfaces)
5795         self.pg_start()
5796         self.pg1.get_capture(0)
5797         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5798         capture = self.pg3.get_capture(9)
5799         ipfix = IPFIXDecoder()
5800         # first load template
5801         for p in capture:
5802             self.assertTrue(p.haslayer(IPFIX))
5803             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5804             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5805             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5806             self.assertEqual(p[UDP].dport, 4739)
5807             self.assertEqual(p[IPFIX].observationDomainID,
5808                              self.ipfix_domain_id)
5809             if p.haslayer(Template):
5810                 ipfix.add_template(p.getlayer(Template))
5811         # verify events in data set
5812         for p in capture:
5813             if p.haslayer(Data):
5814                 data = ipfix.decode_data_set(p.getlayer(Set))
5815                 self.verify_ipfix_max_fragments_ip6(data, 0,
5816                                                     self.pg0.remote_ip6n)
5817
5818     def test_ipfix_bib_ses(self):
5819         """ IPFIX logging NAT64 BIB/session create and delete events """
5820         self.tcp_port_in = random.randint(1025, 65535)
5821         remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5822                                            '64:ff9b::',
5823                                            96)
5824
5825         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5826                                                 self.nat_addr_n)
5827         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5828         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5829         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5830                                      src_address=self.pg3.local_ip4n,
5831                                      path_mtu=512,
5832                                      template_interval=10)
5833         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5834                             src_port=self.ipfix_src_port)
5835
5836         # Create
5837         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5838              IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5839              TCP(sport=self.tcp_port_in, dport=25))
5840         self.pg0.add_stream(p)
5841         self.pg_enable_capture(self.pg_interfaces)
5842         self.pg_start()
5843         p = self.pg1.get_capture(1)
5844         self.tcp_port_out = p[0][TCP].sport
5845         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5846         capture = self.pg3.get_capture(10)
5847         ipfix = IPFIXDecoder()
5848         # first load template
5849         for p in capture:
5850             self.assertTrue(p.haslayer(IPFIX))
5851             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5852             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5853             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5854             self.assertEqual(p[UDP].dport, 4739)
5855             self.assertEqual(p[IPFIX].observationDomainID,
5856                              self.ipfix_domain_id)
5857             if p.haslayer(Template):
5858                 ipfix.add_template(p.getlayer(Template))
5859         # verify events in data set
5860         for p in capture:
5861             if p.haslayer(Data):
5862                 data = ipfix.decode_data_set(p.getlayer(Set))
5863                 if ord(data[0][230]) == 10:
5864                     self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5865                 elif ord(data[0][230]) == 6:
5866                     self.verify_ipfix_nat64_ses(data,
5867                                                 1,
5868                                                 self.pg0.remote_ip6n,
5869                                                 self.pg1.remote_ip4,
5870                                                 25)
5871                 else:
5872                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5873
5874         # Delete
5875         self.pg_enable_capture(self.pg_interfaces)
5876         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5877                                                 self.nat_addr_n,
5878                                                 is_add=0)
5879         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
5880         capture = self.pg3.get_capture(2)
5881         # verify events in data set
5882         for p in capture:
5883             self.assertTrue(p.haslayer(IPFIX))
5884             self.assertEqual(p[IP].src, self.pg3.local_ip4)
5885             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5886             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5887             self.assertEqual(p[UDP].dport, 4739)
5888             self.assertEqual(p[IPFIX].observationDomainID,
5889                              self.ipfix_domain_id)
5890             if p.haslayer(Data):
5891                 data = ipfix.decode_data_set(p.getlayer(Set))
5892                 if ord(data[0][230]) == 11:
5893                     self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5894                 elif ord(data[0][230]) == 7:
5895                     self.verify_ipfix_nat64_ses(data,
5896                                                 0,
5897                                                 self.pg0.remote_ip6n,
5898                                                 self.pg1.remote_ip4,
5899                                                 25)
5900                 else:
5901                     self.logger.error(ppp("Unexpected or invalid packet: ", p))
5902
5903     def nat64_get_ses_num(self):
5904         """
5905         Return number of active NAT64 sessions.
5906         """
5907         st = self.vapi.nat64_st_dump()
5908         return len(st)
5909
5910     def clear_nat64(self):
5911         """
5912         Clear NAT64 configuration.
5913         """
5914         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5915                             domain_id=self.ipfix_domain_id)
5916         self.ipfix_src_port = 4739
5917         self.ipfix_domain_id = 1
5918
5919         self.vapi.nat64_set_timeouts()
5920
5921         interfaces = self.vapi.nat64_interface_dump()
5922         for intf in interfaces:
5923             if intf.is_inside > 1:
5924                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5925                                                   0,
5926                                                   is_add=0)
5927             self.vapi.nat64_add_del_interface(intf.sw_if_index,
5928                                               intf.is_inside,
5929                                               is_add=0)
5930
5931         bib = self.vapi.nat64_bib_dump(255)
5932         for bibe in bib:
5933             if bibe.is_static:
5934                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5935                                                    bibe.o_addr,
5936                                                    bibe.i_port,
5937                                                    bibe.o_port,
5938                                                    bibe.proto,
5939                                                    bibe.vrf_id,
5940                                                    is_add=0)
5941
5942         adresses = self.vapi.nat64_pool_addr_dump()
5943         for addr in adresses:
5944             self.vapi.nat64_add_del_pool_addr_range(addr.address,
5945                                                     addr.address,
5946                                                     vrf_id=addr.vrf_id,
5947                                                     is_add=0)
5948
5949         prefixes = self.vapi.nat64_prefix_dump()
5950         for prefix in prefixes:
5951             self.vapi.nat64_add_del_prefix(prefix.prefix,
5952                                            prefix.prefix_len,
5953                                            vrf_id=prefix.vrf_id,
5954                                            is_add=0)
5955
5956     def tearDown(self):
5957         super(TestNAT64, self).tearDown()
5958         if not self.vpp_dead:
5959             self.logger.info(self.vapi.cli("show nat64 pool"))
5960             self.logger.info(self.vapi.cli("show nat64 interfaces"))
5961             self.logger.info(self.vapi.cli("show nat64 prefix"))
5962             self.logger.info(self.vapi.cli("show nat64 bib all"))
5963             self.logger.info(self.vapi.cli("show nat64 session table all"))
5964             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5965             self.clear_nat64()
5966
5967
5968 class TestDSlite(MethodHolder):
5969     """ DS-Lite Test Cases """
5970
5971     @classmethod
5972     def setUpClass(cls):
5973         super(TestDSlite, cls).setUpClass()
5974
5975         try:
5976             cls.nat_addr = '10.0.0.3'
5977             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5978
5979             cls.create_pg_interfaces(range(2))
5980             cls.pg0.admin_up()
5981             cls.pg0.config_ip4()
5982             cls.pg0.resolve_arp()
5983             cls.pg1.admin_up()
5984             cls.pg1.config_ip6()
5985             cls.pg1.generate_remote_hosts(2)
5986             cls.pg1.configure_ipv6_neighbors()
5987
5988         except Exception:
5989             super(TestDSlite, cls).tearDownClass()
5990             raise
5991
5992     def test_dslite(self):
5993         """ Test DS-Lite """
5994         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5995                                                  self.nat_addr_n)
5996         aftr_ip4 = '192.0.0.1'
5997         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5998         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5999         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6000         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6001
6002         # UDP
6003         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6004              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6005              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6006              UDP(sport=20000, dport=10000))
6007         self.pg1.add_stream(p)
6008         self.pg_enable_capture(self.pg_interfaces)
6009         self.pg_start()
6010         capture = self.pg0.get_capture(1)
6011         capture = capture[0]
6012         self.assertFalse(capture.haslayer(IPv6))
6013         self.assertEqual(capture[IP].src, self.nat_addr)
6014         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6015         self.assertNotEqual(capture[UDP].sport, 20000)
6016         self.assertEqual(capture[UDP].dport, 10000)
6017         self.check_ip_checksum(capture)
6018         out_port = capture[UDP].sport
6019
6020         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6021              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6022              UDP(sport=10000, dport=out_port))
6023         self.pg0.add_stream(p)
6024         self.pg_enable_capture(self.pg_interfaces)
6025         self.pg_start()
6026         capture = self.pg1.get_capture(1)
6027         capture = capture[0]
6028         self.assertEqual(capture[IPv6].src, aftr_ip6)
6029         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6030         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6031         self.assertEqual(capture[IP].dst, '192.168.1.1')
6032         self.assertEqual(capture[UDP].sport, 10000)
6033         self.assertEqual(capture[UDP].dport, 20000)
6034         self.check_ip_checksum(capture)
6035
6036         # TCP
6037         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6038              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6039              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6040              TCP(sport=20001, dport=10001))
6041         self.pg1.add_stream(p)
6042         self.pg_enable_capture(self.pg_interfaces)
6043         self.pg_start()
6044         capture = self.pg0.get_capture(1)
6045         capture = capture[0]
6046         self.assertFalse(capture.haslayer(IPv6))
6047         self.assertEqual(capture[IP].src, self.nat_addr)
6048         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6049         self.assertNotEqual(capture[TCP].sport, 20001)
6050         self.assertEqual(capture[TCP].dport, 10001)
6051         self.check_ip_checksum(capture)
6052         self.check_tcp_checksum(capture)
6053         out_port = capture[TCP].sport
6054
6055         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6056              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6057              TCP(sport=10001, dport=out_port))
6058         self.pg0.add_stream(p)
6059         self.pg_enable_capture(self.pg_interfaces)
6060         self.pg_start()
6061         capture = self.pg1.get_capture(1)
6062         capture = capture[0]
6063         self.assertEqual(capture[IPv6].src, aftr_ip6)
6064         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6065         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6066         self.assertEqual(capture[IP].dst, '192.168.1.1')
6067         self.assertEqual(capture[TCP].sport, 10001)
6068         self.assertEqual(capture[TCP].dport, 20001)
6069         self.check_ip_checksum(capture)
6070         self.check_tcp_checksum(capture)
6071
6072         # ICMP
6073         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6074              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6075              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6076              ICMP(id=4000, type='echo-request'))
6077         self.pg1.add_stream(p)
6078         self.pg_enable_capture(self.pg_interfaces)
6079         self.pg_start()
6080         capture = self.pg0.get_capture(1)
6081         capture = capture[0]
6082         self.assertFalse(capture.haslayer(IPv6))
6083         self.assertEqual(capture[IP].src, self.nat_addr)
6084         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6085         self.assertNotEqual(capture[ICMP].id, 4000)
6086         self.check_ip_checksum(capture)
6087         self.check_icmp_checksum(capture)
6088         out_id = capture[ICMP].id
6089
6090         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6091              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6092              ICMP(id=out_id, type='echo-reply'))
6093         self.pg0.add_stream(p)
6094         self.pg_enable_capture(self.pg_interfaces)
6095         self.pg_start()
6096         capture = self.pg1.get_capture(1)
6097         capture = capture[0]
6098         self.assertEqual(capture[IPv6].src, aftr_ip6)
6099         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6100         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6101         self.assertEqual(capture[IP].dst, '192.168.1.1')
6102         self.assertEqual(capture[ICMP].id, 4000)
6103         self.check_ip_checksum(capture)
6104         self.check_icmp_checksum(capture)
6105
6106         # ping DS-Lite AFTR tunnel endpoint address
6107         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6108              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6109              ICMPv6EchoRequest())
6110         self.pg1.add_stream(p)
6111         self.pg_enable_capture(self.pg_interfaces)
6112         self.pg_start()
6113         capture = self.pg1.get_capture(1)
6114         self.assertEqual(1, len(capture))
6115         capture = capture[0]
6116         self.assertEqual(capture[IPv6].src, aftr_ip6)
6117         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6118         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6119
6120     def tearDown(self):
6121         super(TestDSlite, self).tearDown()
6122         if not self.vpp_dead:
6123             self.logger.info(self.vapi.cli("show dslite pool"))
6124             self.logger.info(
6125                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6126             self.logger.info(self.vapi.cli("show dslite sessions"))
6127
6128
6129 class TestDSliteCE(MethodHolder):
6130     """ DS-Lite CE Test Cases """
6131
6132     @classmethod
6133     def setUpConstants(cls):
6134         super(TestDSliteCE, cls).setUpConstants()
6135         cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6136
6137     @classmethod
6138     def setUpClass(cls):
6139         super(TestDSliteCE, cls).setUpClass()
6140
6141         try:
6142             cls.create_pg_interfaces(range(2))
6143             cls.pg0.admin_up()
6144             cls.pg0.config_ip4()
6145             cls.pg0.resolve_arp()
6146             cls.pg1.admin_up()
6147             cls.pg1.config_ip6()
6148             cls.pg1.generate_remote_hosts(1)
6149             cls.pg1.configure_ipv6_neighbors()
6150
6151         except Exception:
6152             super(TestDSliteCE, cls).tearDownClass()
6153             raise
6154
6155     def test_dslite_ce(self):
6156         """ Test DS-Lite CE """
6157
6158         b4_ip4 = '192.0.0.2'
6159         b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6160         b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6161         b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6162         self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6163
6164         aftr_ip4 = '192.0.0.1'
6165         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6166         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6167         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6168         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6169
6170         self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6171                                    dst_address_length=128,
6172                                    next_hop_address=self.pg1.remote_ip6n,
6173                                    next_hop_sw_if_index=self.pg1.sw_if_index,
6174                                    is_ipv6=1)
6175
6176         # UDP encapsulation
6177         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6178              IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6179              UDP(sport=10000, dport=20000))
6180         self.pg0.add_stream(p)
6181         self.pg_enable_capture(self.pg_interfaces)
6182         self.pg_start()
6183         capture = self.pg1.get_capture(1)
6184         capture = capture[0]
6185         self.assertEqual(capture[IPv6].src, b4_ip6)
6186         self.assertEqual(capture[IPv6].dst, aftr_ip6)
6187         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6188         self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6189         self.assertEqual(capture[UDP].sport, 10000)
6190         self.assertEqual(capture[UDP].dport, 20000)
6191         self.check_ip_checksum(capture)
6192
6193         # UDP decapsulation
6194         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6195              IPv6(dst=b4_ip6, src=aftr_ip6) /
6196              IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6197              UDP(sport=20000, dport=10000))
6198         self.pg1.add_stream(p)
6199         self.pg_enable_capture(self.pg_interfaces)
6200         self.pg_start()
6201         capture = self.pg0.get_capture(1)
6202         capture = capture[0]
6203         self.assertFalse(capture.haslayer(IPv6))
6204         self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6205         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6206         self.assertEqual(capture[UDP].sport, 20000)
6207         self.assertEqual(capture[UDP].dport, 10000)
6208         self.check_ip_checksum(capture)
6209
6210         # ping DS-Lite B4 tunnel endpoint address
6211         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6212              IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6213              ICMPv6EchoRequest())
6214         self.pg1.add_stream(p)
6215         self.pg_enable_capture(self.pg_interfaces)
6216         self.pg_start()
6217         capture = self.pg1.get_capture(1)
6218         self.assertEqual(1, len(capture))
6219         capture = capture[0]
6220         self.assertEqual(capture[IPv6].src, b4_ip6)
6221         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6222         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6223
6224     def tearDown(self):
6225         super(TestDSliteCE, self).tearDown()
6226         if not self.vpp_dead:
6227             self.logger.info(
6228                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6229             self.logger.info(
6230                 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6231
6232
6233 class TestNAT66(MethodHolder):
6234     """ NAT66 Test Cases """
6235
6236     @classmethod
6237     def setUpClass(cls):
6238         super(TestNAT66, cls).setUpClass()
6239
6240         try:
6241             cls.nat_addr = 'fd01:ff::2'
6242             cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6243
6244             cls.create_pg_interfaces(range(2))
6245             cls.interfaces = list(cls.pg_interfaces)
6246
6247             for i in cls.interfaces:
6248                 i.admin_up()
6249                 i.config_ip6()
6250                 i.configure_ipv6_neighbors()
6251
6252         except Exception:
6253             super(TestNAT66, cls).tearDownClass()
6254             raise
6255
6256     def test_static(self):
6257         """ 1:1 NAT66 test """
6258         self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6259         self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6260         self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6261                                                self.nat_addr_n)
6262
6263         # in2out
6264         pkts = []
6265         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6266              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6267              TCP())
6268         pkts.append(p)
6269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6270              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6271              UDP())
6272         pkts.append(p)
6273         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6274              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6275              ICMPv6EchoRequest())
6276         pkts.append(p)
6277         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6278              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6279              GRE() / IP() / TCP())
6280         pkts.append(p)
6281         self.pg0.add_stream(pkts)
6282         self.pg_enable_capture(self.pg_interfaces)
6283         self.pg_start()
6284         capture = self.pg1.get_capture(len(pkts))
6285         for packet in capture:
6286             try:
6287                 self.assertEqual(packet[IPv6].src, self.nat_addr)
6288                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6289                 if packet.haslayer(TCP):
6290                     self.check_tcp_checksum(packet)
6291                 elif packet.haslayer(UDP):
6292                     self.check_udp_checksum(packet)
6293                 elif packet.haslayer(ICMPv6EchoRequest):
6294                     self.check_icmpv6_checksum(packet)
6295             except:
6296                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6297                 raise
6298
6299         # out2in
6300         pkts = []
6301         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6302              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6303              TCP())
6304         pkts.append(p)
6305         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6306              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6307              UDP())
6308         pkts.append(p)
6309         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6310              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6311              ICMPv6EchoReply())
6312         pkts.append(p)
6313         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6314              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6315              GRE() / IP() / TCP())
6316         pkts.append(p)
6317         self.pg1.add_stream(pkts)
6318         self.pg_enable_capture(self.pg_interfaces)
6319         self.pg_start()
6320         capture = self.pg0.get_capture(len(pkts))
6321         for packet in capture:
6322             try:
6323                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6324                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6325                 if packet.haslayer(TCP):
6326                     self.check_tcp_checksum(packet)
6327                 elif packet.haslayer(UDP):
6328                     self.check_udp_checksum(packet)
6329                 elif packet.haslayer(ICMPv6EchoReply):
6330                     self.check_icmpv6_checksum(packet)
6331             except:
6332                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6333                 raise
6334
6335         sm = self.vapi.nat66_static_mapping_dump()
6336         self.assertEqual(len(sm), 1)
6337         self.assertEqual(sm[0].total_pkts, 8)
6338
6339     def clear_nat66(self):
6340         """
6341         Clear NAT66 configuration.
6342         """
6343         interfaces = self.vapi.nat66_interface_dump()
6344         for intf in interfaces:
6345             self.vapi.nat66_add_del_interface(intf.sw_if_index,
6346                                               intf.is_inside,
6347                                               is_add=0)
6348
6349         static_mappings = self.vapi.nat66_static_mapping_dump()
6350         for sm in static_mappings:
6351             self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6352                                                    sm.external_ip_address,
6353                                                    sm.vrf_id,
6354                                                    is_add=0)
6355
6356     def tearDown(self):
6357         super(TestNAT66, self).tearDown()
6358         if not self.vpp_dead:
6359             self.logger.info(self.vapi.cli("show nat66 interfaces"))
6360             self.logger.info(self.vapi.cli("show nat66 static mappings"))
6361             self.clear_nat66()
6362
6363 if __name__ == '__main__':
6364     unittest.main(testRunner=VppTestRunner)