Port restricted NAT44 (VPP-1048)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
246         """
247         Create packet stream for outside network
248
249         :param out_if: Outside interface
250         :param dst_ip: Destination IP address (Default use global NAT address)
251         :param ttl: TTL of generated packets
252         """
253         if dst_ip is None:
254             dst_ip = self.nat_addr
255         pkts = []
256         # TCP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              TCP(dport=self.tcp_port_out, sport=20))
260         pkts.append(p)
261
262         # UDP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              UDP(dport=self.udp_port_out, sport=20))
266         pkts.append(p)
267
268         # ICMP
269         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271              ICMP(id=self.icmp_id_out, type='echo-reply'))
272         pkts.append(p)
273
274         return pkts
275
276     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277                            packet_num=3, dst_ip=None):
278         """
279         Verify captured packets on outside network
280
281         :param capture: Captured packets
282         :param nat_ip: Translated IP address (Default use global NAT address)
283         :param same_port: Sorce port number is not translated (Default False)
284         :param packet_num: Expected number of packets (Default 3)
285         :param dst_ip: Destination IP address (Default do not verify)
286         """
287         if nat_ip is None:
288             nat_ip = self.nat_addr
289         self.assertEqual(packet_num, len(capture))
290         for packet in capture:
291             try:
292                 self.check_ip_checksum(packet)
293                 self.assertEqual(packet[IP].src, nat_ip)
294                 if dst_ip is not None:
295                     self.assertEqual(packet[IP].dst, dst_ip)
296                 if packet.haslayer(TCP):
297                     if same_port:
298                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
299                     else:
300                         self.assertNotEqual(
301                             packet[TCP].sport, self.tcp_port_in)
302                     self.tcp_port_out = packet[TCP].sport
303                     self.check_tcp_checksum(packet)
304                 elif packet.haslayer(UDP):
305                     if same_port:
306                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
307                     else:
308                         self.assertNotEqual(
309                             packet[UDP].sport, self.udp_port_in)
310                     self.udp_port_out = packet[UDP].sport
311                 else:
312                     if same_port:
313                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
314                     else:
315                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316                     self.icmp_id_out = packet[ICMP].id
317                     self.check_icmp_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323     def verify_capture_in(self, capture, in_if, packet_num=3):
324         """
325         Verify captured packets on inside network
326
327         :param capture: Captured packets
328         :param in_if: Inside interface
329         :param packet_num: Expected number of packets (Default 3)
330         """
331         self.assertEqual(packet_num, len(capture))
332         for packet in capture:
333             try:
334                 self.check_ip_checksum(packet)
335                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336                 if packet.haslayer(TCP):
337                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338                     self.check_tcp_checksum(packet)
339                 elif packet.haslayer(UDP):
340                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
341                 else:
342                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343                     self.check_icmp_checksum(packet)
344             except:
345                 self.logger.error(ppp("Unexpected or invalid packet "
346                                       "(inside network):", packet))
347                 raise
348
349     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
350         """
351         Verify captured IPv6 packets on inside network
352
353         :param capture: Captured packets
354         :param src_ip: Source IP
355         :param dst_ip: Destination IP address
356         :param packet_num: Expected number of packets (Default 3)
357         """
358         self.assertEqual(packet_num, len(capture))
359         for packet in capture:
360             try:
361                 self.assertEqual(packet[IPv6].src, src_ip)
362                 self.assertEqual(packet[IPv6].dst, dst_ip)
363                 if packet.haslayer(TCP):
364                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365                     self.check_tcp_checksum(packet)
366                 elif packet.haslayer(UDP):
367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
368                     self.check_udp_checksum(packet)
369                 else:
370                     self.assertEqual(packet[ICMPv6EchoReply].id,
371                                      self.icmp_id_in)
372                     self.check_icmpv6_checksum(packet)
373             except:
374                 self.logger.error(ppp("Unexpected or invalid packet "
375                                       "(inside network):", packet))
376                 raise
377
378     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
379         """
380         Verify captured packet that don't have to be translated
381
382         :param capture: Captured packets
383         :param ingress_if: Ingress interface
384         :param egress_if: Egress interface
385         """
386         for packet in capture:
387             try:
388                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390                 if packet.haslayer(TCP):
391                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392                 elif packet.haslayer(UDP):
393                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
394                 else:
395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
396             except:
397                 self.logger.error(ppp("Unexpected or invalid packet "
398                                       "(inside network):", packet))
399                 raise
400
401     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402                                             packet_num=3, icmp_type=11):
403         """
404         Verify captured packets with ICMP errors on outside network
405
406         :param capture: Captured packets
407         :param src_ip: Translated IP address or IP address of VPP
408                        (Default use global NAT address)
409         :param packet_num: Expected number of packets (Default 3)
410         :param icmp_type: Type of error ICMP packet
411                           we are expecting (Default 11)
412         """
413         if src_ip is None:
414             src_ip = self.nat_addr
415         self.assertEqual(packet_num, len(capture))
416         for packet in capture:
417             try:
418                 self.assertEqual(packet[IP].src, src_ip)
419                 self.assertTrue(packet.haslayer(ICMP))
420                 icmp = packet[ICMP]
421                 self.assertEqual(icmp.type, icmp_type)
422                 self.assertTrue(icmp.haslayer(IPerror))
423                 inner_ip = icmp[IPerror]
424                 if inner_ip.haslayer(TCPerror):
425                     self.assertEqual(inner_ip[TCPerror].dport,
426                                      self.tcp_port_out)
427                 elif inner_ip.haslayer(UDPerror):
428                     self.assertEqual(inner_ip[UDPerror].dport,
429                                      self.udp_port_out)
430                 else:
431                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
432             except:
433                 self.logger.error(ppp("Unexpected or invalid packet "
434                                       "(outside network):", packet))
435                 raise
436
437     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
438                                            icmp_type=11):
439         """
440         Verify captured packets with ICMP errors on inside network
441
442         :param capture: Captured packets
443         :param in_if: Inside interface
444         :param packet_num: Expected number of packets (Default 3)
445         :param icmp_type: Type of error ICMP packet
446                           we are expecting (Default 11)
447         """
448         self.assertEqual(packet_num, len(capture))
449         for packet in capture:
450             try:
451                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452                 self.assertTrue(packet.haslayer(ICMP))
453                 icmp = packet[ICMP]
454                 self.assertEqual(icmp.type, icmp_type)
455                 self.assertTrue(icmp.haslayer(IPerror))
456                 inner_ip = icmp[IPerror]
457                 if inner_ip.haslayer(TCPerror):
458                     self.assertEqual(inner_ip[TCPerror].sport,
459                                      self.tcp_port_in)
460                 elif inner_ip.haslayer(UDPerror):
461                     self.assertEqual(inner_ip[UDPerror].sport,
462                                      self.udp_port_in)
463                 else:
464                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
465             except:
466                 self.logger.error(ppp("Unexpected or invalid packet "
467                                       "(inside network):", packet))
468                 raise
469
470     def create_stream_frag(self, src_if, dst, sport, dport, data):
471         """
472         Create fragmented packet stream
473
474         :param src_if: Source interface
475         :param dst: Destination IPv4 address
476         :param sport: Source TCP port
477         :param dport: Destination TCP port
478         :param data: Payload data
479         :returns: Fragmets
480         """
481         id = random.randint(0, 65535)
482         p = (IP(src=src_if.remote_ip4, dst=dst) /
483              TCP(sport=sport, dport=dport) /
484              Raw(data))
485         p = p.__class__(str(p))
486         chksum = p['TCP'].chksum
487         pkts = []
488         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490              TCP(sport=sport, dport=dport, chksum=chksum) /
491              Raw(data[0:4]))
492         pkts.append(p)
493         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495                 proto=IP_PROTOS.tcp) /
496              Raw(data[4:20]))
497         pkts.append(p)
498         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500                 id=id) /
501              Raw(data[20:]))
502         pkts.append(p)
503         return pkts
504
505     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506                                pref=None, plen=0, frag_size=128):
507         """
508         Create fragmented packet stream
509
510         :param src_if: Source interface
511         :param dst: Destination IPv4 address
512         :param sport: Source TCP port
513         :param dport: Destination TCP port
514         :param data: Payload data
515         :param pref: NAT64 prefix
516         :param plen: NAT64 prefix length
517         :param fragsize: size of fragments
518         :returns: Fragmets
519         """
520         if pref is None:
521             dst_ip6 = ''.join(['64:ff9b::', dst])
522         else:
523             dst_ip6 = self.compose_ip6(dst, pref, plen)
524
525         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528              TCP(sport=sport, dport=dport) /
529              Raw(data))
530
531         return fragment6(p, frag_size)
532
533     def reass_frags_and_verify(self, frags, src, dst):
534         """
535         Reassemble and verify fragmented packet
536
537         :param frags: Captured fragments
538         :param src: Source IPv4 address to verify
539         :param dst: Destination IPv4 address to verify
540
541         :returns: Reassembled IPv4 packet
542         """
543         buffer = StringIO.StringIO()
544         for p in frags:
545             self.assertEqual(p[IP].src, src)
546             self.assertEqual(p[IP].dst, dst)
547             self.check_ip_checksum(p)
548             buffer.seek(p[IP].frag * 8)
549             buffer.write(p[IP].payload)
550         ip = frags[0].getlayer(IP)
551         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552                 proto=frags[0][IP].proto)
553         if ip.proto == IP_PROTOS.tcp:
554             p = (ip / TCP(buffer.getvalue()))
555             self.check_tcp_checksum(p)
556         elif ip.proto == IP_PROTOS.udp:
557             p = (ip / UDP(buffer.getvalue()))
558         return p
559
560     def reass_frags_and_verify_ip6(self, frags, src, dst):
561         """
562         Reassemble and verify fragmented packet
563
564         :param frags: Captured fragments
565         :param src: Source IPv6 address to verify
566         :param dst: Destination IPv6 address to verify
567
568         :returns: Reassembled IPv6 packet
569         """
570         buffer = StringIO.StringIO()
571         for p in frags:
572             self.assertEqual(p[IPv6].src, src)
573             self.assertEqual(p[IPv6].dst, dst)
574             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575             buffer.write(p[IPv6ExtHdrFragment].payload)
576         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577                   nh=frags[0][IPv6ExtHdrFragment].nh)
578         if ip.nh == IP_PROTOS.tcp:
579             p = (ip / TCP(buffer.getvalue()))
580             self.check_tcp_checksum(p)
581         elif ip.nh == IP_PROTOS.udp:
582             p = (ip / UDP(buffer.getvalue()))
583         return p
584
585     def verify_ipfix_nat44_ses(self, data):
586         """
587         Verify IPFIX NAT44 session create/delete event
588
589         :param data: Decoded IPFIX data records
590         """
591         nat44_ses_create_num = 0
592         nat44_ses_delete_num = 0
593         self.assertEqual(6, len(data))
594         for record in data:
595             # natEvent
596             self.assertIn(ord(record[230]), [4, 5])
597             if ord(record[230]) == 4:
598                 nat44_ses_create_num += 1
599             else:
600                 nat44_ses_delete_num += 1
601             # sourceIPv4Address
602             self.assertEqual(self.pg0.remote_ip4n, record[8])
603             # postNATSourceIPv4Address
604             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
605                              record[225])
606             # ingressVRFID
607             self.assertEqual(struct.pack("!I", 0), record[234])
608             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609             if IP_PROTOS.icmp == ord(record[4]):
610                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
612                                  record[227])
613             elif IP_PROTOS.tcp == ord(record[4]):
614                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
615                                  record[7])
616                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
617                                  record[227])
618             elif IP_PROTOS.udp == ord(record[4]):
619                 self.assertEqual(struct.pack("!H", self.udp_port_in),
620                                  record[7])
621                 self.assertEqual(struct.pack("!H", self.udp_port_out),
622                                  record[227])
623             else:
624                 self.fail("Invalid protocol")
625         self.assertEqual(3, nat44_ses_create_num)
626         self.assertEqual(3, nat44_ses_delete_num)
627
628     def verify_ipfix_addr_exhausted(self, data):
629         """
630         Verify IPFIX NAT addresses event
631
632         :param data: Decoded IPFIX data records
633         """
634         self.assertEqual(1, len(data))
635         record = data[0]
636         # natEvent
637         self.assertEqual(ord(record[230]), 3)
638         # natPoolID
639         self.assertEqual(struct.pack("!I", 0), record[283])
640
641
642 class TestNAT44(MethodHolder):
643     """ NAT44 Test Cases """
644
645     @classmethod
646     def setUpClass(cls):
647         super(TestNAT44, cls).setUpClass()
648
649         try:
650             cls.tcp_port_in = 6303
651             cls.tcp_port_out = 6303
652             cls.udp_port_in = 6304
653             cls.udp_port_out = 6304
654             cls.icmp_id_in = 6305
655             cls.icmp_id_out = 6305
656             cls.nat_addr = '10.0.0.3'
657             cls.ipfix_src_port = 4739
658             cls.ipfix_domain_id = 1
659
660             cls.create_pg_interfaces(range(10))
661             cls.interfaces = list(cls.pg_interfaces[0:4])
662
663             for i in cls.interfaces:
664                 i.admin_up()
665                 i.config_ip4()
666                 i.resolve_arp()
667
668             cls.pg0.generate_remote_hosts(3)
669             cls.pg0.configure_ipv4_neighbors()
670
671             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672             cls.vapi.ip_table_add_del(10, is_add=1)
673             cls.vapi.ip_table_add_del(20, is_add=1)
674
675             cls.pg4._local_ip4 = "172.16.255.1"
676             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678             cls.pg4.set_table_ip4(10)
679             cls.pg5._local_ip4 = "172.17.255.3"
680             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682             cls.pg5.set_table_ip4(10)
683             cls.pg6._local_ip4 = "172.16.255.1"
684             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686             cls.pg6.set_table_ip4(20)
687             for i in cls.overlapping_interfaces:
688                 i.config_ip4()
689                 i.admin_up()
690                 i.resolve_arp()
691
692             cls.pg7.admin_up()
693             cls.pg8.admin_up()
694
695             cls.pg9.generate_remote_hosts(2)
696             cls.pg9.config_ip4()
697             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
699                                                   ip_addr_n,
700                                                   24)
701             cls.pg9.admin_up()
702             cls.pg9.resolve_arp()
703             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705             cls.pg9.resolve_arp()
706
707             random.seed()
708
709         except Exception:
710             super(TestNAT44, cls).tearDownClass()
711             raise
712
713     def clear_nat44(self):
714         """
715         Clear NAT44 configuration.
716         """
717         # I found no elegant way to do this
718         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
719                                    dst_address_length=32,
720                                    next_hop_address=self.pg7.remote_ip4n,
721                                    next_hop_sw_if_index=self.pg7.sw_if_index,
722                                    is_add=0)
723         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
724                                    dst_address_length=32,
725                                    next_hop_address=self.pg8.remote_ip4n,
726                                    next_hop_sw_if_index=self.pg8.sw_if_index,
727                                    is_add=0)
728
729         for intf in [self.pg7, self.pg8]:
730             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
731             for n in neighbors:
732                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
733                                               n.mac_address,
734                                               n.ip_address,
735                                               is_add=0)
736
737         if self.pg7.has_ip4_config:
738             self.pg7.unconfig_ip4()
739
740         interfaces = self.vapi.nat44_interface_addr_dump()
741         for intf in interfaces:
742             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
743
744         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745                             domain_id=self.ipfix_domain_id)
746         self.ipfix_src_port = 4739
747         self.ipfix_domain_id = 1
748
749         interfaces = self.vapi.nat44_interface_dump()
750         for intf in interfaces:
751             if intf.is_inside > 1:
752                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
753                                                           0,
754                                                           is_add=0)
755             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
756                                                       intf.is_inside,
757                                                       is_add=0)
758
759         interfaces = self.vapi.nat44_interface_output_feature_dump()
760         for intf in interfaces:
761             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
762                                                              intf.is_inside,
763                                                              is_add=0)
764
765         static_mappings = self.vapi.nat44_static_mapping_dump()
766         for sm in static_mappings:
767             self.vapi.nat44_add_del_static_mapping(
768                 sm.local_ip_address,
769                 sm.external_ip_address,
770                 local_port=sm.local_port,
771                 external_port=sm.external_port,
772                 addr_only=sm.addr_only,
773                 vrf_id=sm.vrf_id,
774                 protocol=sm.protocol,
775                 is_add=0)
776
777         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
778         for lb_sm in lb_static_mappings:
779             self.vapi.nat44_add_del_lb_static_mapping(
780                 lb_sm.external_addr,
781                 lb_sm.external_port,
782                 lb_sm.protocol,
783                 lb_sm.vrf_id,
784                 is_add=0,
785                 local_num=0,
786                 locals=[])
787
788         adresses = self.vapi.nat44_address_dump()
789         for addr in adresses:
790             self.vapi.nat44_add_del_address_range(addr.ip_address,
791                                                   addr.ip_address,
792                                                   is_add=0)
793
794         self.vapi.nat_set_reass()
795         self.vapi.nat_set_reass(is_ip6=1)
796
797     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
798                                  local_port=0, external_port=0, vrf_id=0,
799                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
800                                  proto=0):
801         """
802         Add/delete NAT44 static mapping
803
804         :param local_ip: Local IP address
805         :param external_ip: External IP address
806         :param local_port: Local port number (Optional)
807         :param external_port: External port number (Optional)
808         :param vrf_id: VRF ID (Default 0)
809         :param is_add: 1 if add, 0 if delete (Default add)
810         :param external_sw_if_index: External interface instead of IP address
811         :param proto: IP protocol (Mandatory if port specified)
812         """
813         addr_only = 1
814         if local_port and external_port:
815             addr_only = 0
816         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
817         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
818         self.vapi.nat44_add_del_static_mapping(
819             l_ip,
820             e_ip,
821             external_sw_if_index,
822             local_port,
823             external_port,
824             addr_only,
825             vrf_id,
826             proto,
827             is_add)
828
829     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
830         """
831         Add/delete NAT44 address
832
833         :param ip: IP address
834         :param is_add: 1 if add, 0 if delete (Default add)
835         """
836         nat_addr = socket.inet_pton(socket.AF_INET, ip)
837         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
838                                               vrf_id=vrf_id)
839
840     def test_dynamic(self):
841         """ NAT44 dynamic translation test """
842
843         self.nat44_add_address(self.nat_addr)
844         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
845         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
846                                                   is_inside=0)
847
848         # in2out
849         pkts = self.create_stream_in(self.pg0, self.pg1)
850         self.pg0.add_stream(pkts)
851         self.pg_enable_capture(self.pg_interfaces)
852         self.pg_start()
853         capture = self.pg1.get_capture(len(pkts))
854         self.verify_capture_out(capture)
855
856         # out2in
857         pkts = self.create_stream_out(self.pg1)
858         self.pg1.add_stream(pkts)
859         self.pg_enable_capture(self.pg_interfaces)
860         self.pg_start()
861         capture = self.pg0.get_capture(len(pkts))
862         self.verify_capture_in(capture, self.pg0)
863
864     def test_dynamic_icmp_errors_in2out_ttl_1(self):
865         """ NAT44 handling of client packets with TTL=1 """
866
867         self.nat44_add_address(self.nat_addr)
868         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
869         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
870                                                   is_inside=0)
871
872         # Client side - generate traffic
873         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
874         self.pg0.add_stream(pkts)
875         self.pg_enable_capture(self.pg_interfaces)
876         self.pg_start()
877
878         # Client side - verify ICMP type 11 packets
879         capture = self.pg0.get_capture(len(pkts))
880         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
881
882     def test_dynamic_icmp_errors_out2in_ttl_1(self):
883         """ NAT44 handling of server packets with TTL=1 """
884
885         self.nat44_add_address(self.nat_addr)
886         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
888                                                   is_inside=0)
889
890         # Client side - create sessions
891         pkts = self.create_stream_in(self.pg0, self.pg1)
892         self.pg0.add_stream(pkts)
893         self.pg_enable_capture(self.pg_interfaces)
894         self.pg_start()
895
896         # Server side - generate traffic
897         capture = self.pg1.get_capture(len(pkts))
898         self.verify_capture_out(capture)
899         pkts = self.create_stream_out(self.pg1, ttl=1)
900         self.pg1.add_stream(pkts)
901         self.pg_enable_capture(self.pg_interfaces)
902         self.pg_start()
903
904         # Server side - verify ICMP type 11 packets
905         capture = self.pg1.get_capture(len(pkts))
906         self.verify_capture_out_with_icmp_errors(capture,
907                                                  src_ip=self.pg1.local_ip4)
908
909     def test_dynamic_icmp_errors_in2out_ttl_2(self):
910         """ NAT44 handling of error responses to client packets with TTL=2 """
911
912         self.nat44_add_address(self.nat_addr)
913         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
914         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
915                                                   is_inside=0)
916
917         # Client side - generate traffic
918         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
919         self.pg0.add_stream(pkts)
920         self.pg_enable_capture(self.pg_interfaces)
921         self.pg_start()
922
923         # Server side - simulate ICMP type 11 response
924         capture = self.pg1.get_capture(len(pkts))
925         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
926                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
927                 ICMP(type=11) / packet[IP] for packet in capture]
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931
932         # Client side - verify ICMP type 11 packets
933         capture = self.pg0.get_capture(len(pkts))
934         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
935
936     def test_dynamic_icmp_errors_out2in_ttl_2(self):
937         """ NAT44 handling of error responses to server packets with TTL=2 """
938
939         self.nat44_add_address(self.nat_addr)
940         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
942                                                   is_inside=0)
943
944         # Client side - create sessions
945         pkts = self.create_stream_in(self.pg0, self.pg1)
946         self.pg0.add_stream(pkts)
947         self.pg_enable_capture(self.pg_interfaces)
948         self.pg_start()
949
950         # Server side - generate traffic
951         capture = self.pg1.get_capture(len(pkts))
952         self.verify_capture_out(capture)
953         pkts = self.create_stream_out(self.pg1, ttl=2)
954         self.pg1.add_stream(pkts)
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pg_start()
957
958         # Client side - simulate ICMP type 11 response
959         capture = self.pg0.get_capture(len(pkts))
960         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
961                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
962                 ICMP(type=11) / packet[IP] for packet in capture]
963         self.pg0.add_stream(pkts)
964         self.pg_enable_capture(self.pg_interfaces)
965         self.pg_start()
966
967         # Server side - verify ICMP type 11 packets
968         capture = self.pg1.get_capture(len(pkts))
969         self.verify_capture_out_with_icmp_errors(capture)
970
971     def test_ping_out_interface_from_outside(self):
972         """ Ping NAT44 out interface from outside network """
973
974         self.nat44_add_address(self.nat_addr)
975         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
976         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
977                                                   is_inside=0)
978
979         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
980              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
981              ICMP(id=self.icmp_id_out, type='echo-request'))
982         pkts = [p]
983         self.pg1.add_stream(pkts)
984         self.pg_enable_capture(self.pg_interfaces)
985         self.pg_start()
986         capture = self.pg1.get_capture(len(pkts))
987         self.assertEqual(1, len(capture))
988         packet = capture[0]
989         try:
990             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
991             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
992             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
993             self.assertEqual(packet[ICMP].type, 0)  # echo reply
994         except:
995             self.logger.error(ppp("Unexpected or invalid packet "
996                                   "(outside network):", packet))
997             raise
998
999     def test_ping_internal_host_from_outside(self):
1000         """ Ping internal host from outside network """
1001
1002         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1003         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1004         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1005                                                   is_inside=0)
1006
1007         # out2in
1008         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1009                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1010                ICMP(id=self.icmp_id_out, type='echo-request'))
1011         self.pg1.add_stream(pkt)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg0.get_capture(1)
1015         self.verify_capture_in(capture, self.pg0, packet_num=1)
1016         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1017
1018         # in2out
1019         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1020                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1021                ICMP(id=self.icmp_id_in, type='echo-reply'))
1022         self.pg0.add_stream(pkt)
1023         self.pg_enable_capture(self.pg_interfaces)
1024         self.pg_start()
1025         capture = self.pg1.get_capture(1)
1026         self.verify_capture_out(capture, same_port=True, packet_num=1)
1027         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1028
1029     def test_static_in(self):
1030         """ 1:1 NAT initialized from inside network """
1031
1032         nat_ip = "10.0.0.10"
1033         self.tcp_port_out = 6303
1034         self.udp_port_out = 6304
1035         self.icmp_id_out = 6305
1036
1037         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1038         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1039         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1040                                                   is_inside=0)
1041
1042         # in2out
1043         pkts = self.create_stream_in(self.pg0, self.pg1)
1044         self.pg0.add_stream(pkts)
1045         self.pg_enable_capture(self.pg_interfaces)
1046         self.pg_start()
1047         capture = self.pg1.get_capture(len(pkts))
1048         self.verify_capture_out(capture, nat_ip, True)
1049
1050         # out2in
1051         pkts = self.create_stream_out(self.pg1, nat_ip)
1052         self.pg1.add_stream(pkts)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg0.get_capture(len(pkts))
1056         self.verify_capture_in(capture, self.pg0)
1057
1058     def test_static_out(self):
1059         """ 1:1 NAT initialized from outside network """
1060
1061         nat_ip = "10.0.0.20"
1062         self.tcp_port_out = 6303
1063         self.udp_port_out = 6304
1064         self.icmp_id_out = 6305
1065
1066         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1067         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1068         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1069                                                   is_inside=0)
1070
1071         # out2in
1072         pkts = self.create_stream_out(self.pg1, nat_ip)
1073         self.pg1.add_stream(pkts)
1074         self.pg_enable_capture(self.pg_interfaces)
1075         self.pg_start()
1076         capture = self.pg0.get_capture(len(pkts))
1077         self.verify_capture_in(capture, self.pg0)
1078
1079         # in2out
1080         pkts = self.create_stream_in(self.pg0, self.pg1)
1081         self.pg0.add_stream(pkts)
1082         self.pg_enable_capture(self.pg_interfaces)
1083         self.pg_start()
1084         capture = self.pg1.get_capture(len(pkts))
1085         self.verify_capture_out(capture, nat_ip, True)
1086
1087     def test_static_with_port_in(self):
1088         """ 1:1 NAPT initialized from inside network """
1089
1090         self.tcp_port_out = 3606
1091         self.udp_port_out = 3607
1092         self.icmp_id_out = 3608
1093
1094         self.nat44_add_address(self.nat_addr)
1095         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1096                                       self.tcp_port_in, self.tcp_port_out,
1097                                       proto=IP_PROTOS.tcp)
1098         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1099                                       self.udp_port_in, self.udp_port_out,
1100                                       proto=IP_PROTOS.udp)
1101         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1102                                       self.icmp_id_in, self.icmp_id_out,
1103                                       proto=IP_PROTOS.icmp)
1104         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1105         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1106                                                   is_inside=0)
1107
1108         # in2out
1109         pkts = self.create_stream_in(self.pg0, self.pg1)
1110         self.pg0.add_stream(pkts)
1111         self.pg_enable_capture(self.pg_interfaces)
1112         self.pg_start()
1113         capture = self.pg1.get_capture(len(pkts))
1114         self.verify_capture_out(capture)
1115
1116         # out2in
1117         pkts = self.create_stream_out(self.pg1)
1118         self.pg1.add_stream(pkts)
1119         self.pg_enable_capture(self.pg_interfaces)
1120         self.pg_start()
1121         capture = self.pg0.get_capture(len(pkts))
1122         self.verify_capture_in(capture, self.pg0)
1123
1124     def test_static_with_port_out(self):
1125         """ 1:1 NAPT initialized from outside network """
1126
1127         self.tcp_port_out = 30606
1128         self.udp_port_out = 30607
1129         self.icmp_id_out = 30608
1130
1131         self.nat44_add_address(self.nat_addr)
1132         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1133                                       self.tcp_port_in, self.tcp_port_out,
1134                                       proto=IP_PROTOS.tcp)
1135         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1136                                       self.udp_port_in, self.udp_port_out,
1137                                       proto=IP_PROTOS.udp)
1138         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1139                                       self.icmp_id_in, self.icmp_id_out,
1140                                       proto=IP_PROTOS.icmp)
1141         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1142         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143                                                   is_inside=0)
1144
1145         # out2in
1146         pkts = self.create_stream_out(self.pg1)
1147         self.pg1.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150         capture = self.pg0.get_capture(len(pkts))
1151         self.verify_capture_in(capture, self.pg0)
1152
1153         # in2out
1154         pkts = self.create_stream_in(self.pg0, self.pg1)
1155         self.pg0.add_stream(pkts)
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158         capture = self.pg1.get_capture(len(pkts))
1159         self.verify_capture_out(capture)
1160
1161     def test_static_vrf_aware(self):
1162         """ 1:1 NAT VRF awareness """
1163
1164         nat_ip1 = "10.0.0.30"
1165         nat_ip2 = "10.0.0.40"
1166         self.tcp_port_out = 6303
1167         self.udp_port_out = 6304
1168         self.icmp_id_out = 6305
1169
1170         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1171                                       vrf_id=10)
1172         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1173                                       vrf_id=10)
1174         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1175                                                   is_inside=0)
1176         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1177         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1178
1179         # inside interface VRF match NAT44 static mapping VRF
1180         pkts = self.create_stream_in(self.pg4, self.pg3)
1181         self.pg4.add_stream(pkts)
1182         self.pg_enable_capture(self.pg_interfaces)
1183         self.pg_start()
1184         capture = self.pg3.get_capture(len(pkts))
1185         self.verify_capture_out(capture, nat_ip1, True)
1186
1187         # inside interface VRF don't match NAT44 static mapping VRF (packets
1188         # are dropped)
1189         pkts = self.create_stream_in(self.pg0, self.pg3)
1190         self.pg0.add_stream(pkts)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193         self.pg3.assert_nothing_captured()
1194
1195     def test_static_lb(self):
1196         """ NAT44 local service load balancing """
1197         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1198         external_port = 80
1199         local_port = 8080
1200         server1 = self.pg0.remote_hosts[0]
1201         server2 = self.pg0.remote_hosts[1]
1202
1203         locals = [{'addr': server1.ip4n,
1204                    'port': local_port,
1205                    'probability': 70},
1206                   {'addr': server2.ip4n,
1207                    'port': local_port,
1208                    'probability': 30}]
1209
1210         self.nat44_add_address(self.nat_addr)
1211         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1212                                                   external_port,
1213                                                   IP_PROTOS.tcp,
1214                                                   local_num=len(locals),
1215                                                   locals=locals)
1216         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1218                                                   is_inside=0)
1219
1220         # from client to service
1221         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1223              TCP(sport=12345, dport=external_port))
1224         self.pg1.add_stream(p)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227         capture = self.pg0.get_capture(1)
1228         p = capture[0]
1229         server = None
1230         try:
1231             ip = p[IP]
1232             tcp = p[TCP]
1233             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1234             if ip.dst == server1.ip4:
1235                 server = server1
1236             else:
1237                 server = server2
1238             self.assertEqual(tcp.dport, local_port)
1239             self.check_tcp_checksum(p)
1240             self.check_ip_checksum(p)
1241         except:
1242             self.logger.error(ppp("Unexpected or invalid packet:", p))
1243             raise
1244
1245         # from service back to client
1246         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1247              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1248              TCP(sport=local_port, dport=12345))
1249         self.pg0.add_stream(p)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg1.get_capture(1)
1253         p = capture[0]
1254         try:
1255             ip = p[IP]
1256             tcp = p[TCP]
1257             self.assertEqual(ip.src, self.nat_addr)
1258             self.assertEqual(tcp.sport, external_port)
1259             self.check_tcp_checksum(p)
1260             self.check_ip_checksum(p)
1261         except:
1262             self.logger.error(ppp("Unexpected or invalid packet:", p))
1263             raise
1264
1265         # multiple clients
1266         server1_n = 0
1267         server2_n = 0
1268         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1269         pkts = []
1270         for client in clients:
1271             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1272                  IP(src=client, dst=self.nat_addr) /
1273                  TCP(sport=12345, dport=external_port))
1274             pkts.append(p)
1275         self.pg1.add_stream(pkts)
1276         self.pg_enable_capture(self.pg_interfaces)
1277         self.pg_start()
1278         capture = self.pg0.get_capture(len(pkts))
1279         for p in capture:
1280             if p[IP].dst == server1.ip4:
1281                 server1_n += 1
1282             else:
1283                 server2_n += 1
1284         self.assertTrue(server1_n > server2_n)
1285
1286     def test_multiple_inside_interfaces(self):
1287         """ NAT44 multiple non-overlapping address space inside interfaces """
1288
1289         self.nat44_add_address(self.nat_addr)
1290         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1291         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1292         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1293                                                   is_inside=0)
1294
1295         # between two NAT44 inside interfaces (no translation)
1296         pkts = self.create_stream_in(self.pg0, self.pg1)
1297         self.pg0.add_stream(pkts)
1298         self.pg_enable_capture(self.pg_interfaces)
1299         self.pg_start()
1300         capture = self.pg1.get_capture(len(pkts))
1301         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1302
1303         # from NAT44 inside to interface without NAT44 feature (no translation)
1304         pkts = self.create_stream_in(self.pg0, self.pg2)
1305         self.pg0.add_stream(pkts)
1306         self.pg_enable_capture(self.pg_interfaces)
1307         self.pg_start()
1308         capture = self.pg2.get_capture(len(pkts))
1309         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1310
1311         # in2out 1st interface
1312         pkts = self.create_stream_in(self.pg0, self.pg3)
1313         self.pg0.add_stream(pkts)
1314         self.pg_enable_capture(self.pg_interfaces)
1315         self.pg_start()
1316         capture = self.pg3.get_capture(len(pkts))
1317         self.verify_capture_out(capture)
1318
1319         # out2in 1st interface
1320         pkts = self.create_stream_out(self.pg3)
1321         self.pg3.add_stream(pkts)
1322         self.pg_enable_capture(self.pg_interfaces)
1323         self.pg_start()
1324         capture = self.pg0.get_capture(len(pkts))
1325         self.verify_capture_in(capture, self.pg0)
1326
1327         # in2out 2nd interface
1328         pkts = self.create_stream_in(self.pg1, self.pg3)
1329         self.pg1.add_stream(pkts)
1330         self.pg_enable_capture(self.pg_interfaces)
1331         self.pg_start()
1332         capture = self.pg3.get_capture(len(pkts))
1333         self.verify_capture_out(capture)
1334
1335         # out2in 2nd interface
1336         pkts = self.create_stream_out(self.pg3)
1337         self.pg3.add_stream(pkts)
1338         self.pg_enable_capture(self.pg_interfaces)
1339         self.pg_start()
1340         capture = self.pg1.get_capture(len(pkts))
1341         self.verify_capture_in(capture, self.pg1)
1342
1343     def test_inside_overlapping_interfaces(self):
1344         """ NAT44 multiple inside interfaces with overlapping address space """
1345
1346         static_nat_ip = "10.0.0.10"
1347         self.nat44_add_address(self.nat_addr)
1348         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1349                                                   is_inside=0)
1350         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1351         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1352         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1353         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1354                                       vrf_id=20)
1355
1356         # between NAT44 inside interfaces with same VRF (no translation)
1357         pkts = self.create_stream_in(self.pg4, self.pg5)
1358         self.pg4.add_stream(pkts)
1359         self.pg_enable_capture(self.pg_interfaces)
1360         self.pg_start()
1361         capture = self.pg5.get_capture(len(pkts))
1362         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1363
1364         # between NAT44 inside interfaces with different VRF (hairpinning)
1365         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1366              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1367              TCP(sport=1234, dport=5678))
1368         self.pg4.add_stream(p)
1369         self.pg_enable_capture(self.pg_interfaces)
1370         self.pg_start()
1371         capture = self.pg6.get_capture(1)
1372         p = capture[0]
1373         try:
1374             ip = p[IP]
1375             tcp = p[TCP]
1376             self.assertEqual(ip.src, self.nat_addr)
1377             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1378             self.assertNotEqual(tcp.sport, 1234)
1379             self.assertEqual(tcp.dport, 5678)
1380         except:
1381             self.logger.error(ppp("Unexpected or invalid packet:", p))
1382             raise
1383
1384         # in2out 1st interface
1385         pkts = self.create_stream_in(self.pg4, self.pg3)
1386         self.pg4.add_stream(pkts)
1387         self.pg_enable_capture(self.pg_interfaces)
1388         self.pg_start()
1389         capture = self.pg3.get_capture(len(pkts))
1390         self.verify_capture_out(capture)
1391
1392         # out2in 1st interface
1393         pkts = self.create_stream_out(self.pg3)
1394         self.pg3.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg4.get_capture(len(pkts))
1398         self.verify_capture_in(capture, self.pg4)
1399
1400         # in2out 2nd interface
1401         pkts = self.create_stream_in(self.pg5, self.pg3)
1402         self.pg5.add_stream(pkts)
1403         self.pg_enable_capture(self.pg_interfaces)
1404         self.pg_start()
1405         capture = self.pg3.get_capture(len(pkts))
1406         self.verify_capture_out(capture)
1407
1408         # out2in 2nd interface
1409         pkts = self.create_stream_out(self.pg3)
1410         self.pg3.add_stream(pkts)
1411         self.pg_enable_capture(self.pg_interfaces)
1412         self.pg_start()
1413         capture = self.pg5.get_capture(len(pkts))
1414         self.verify_capture_in(capture, self.pg5)
1415
1416         # pg5 session dump
1417         addresses = self.vapi.nat44_address_dump()
1418         self.assertEqual(len(addresses), 1)
1419         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1420         self.assertEqual(len(sessions), 3)
1421         for session in sessions:
1422             self.assertFalse(session.is_static)
1423             self.assertEqual(session.inside_ip_address[0:4],
1424                              self.pg5.remote_ip4n)
1425             self.assertEqual(session.outside_ip_address,
1426                              addresses[0].ip_address)
1427         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1428         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1429         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1430         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1431         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1432         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1433         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1434         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1435         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1436
1437         # in2out 3rd interface
1438         pkts = self.create_stream_in(self.pg6, self.pg3)
1439         self.pg6.add_stream(pkts)
1440         self.pg_enable_capture(self.pg_interfaces)
1441         self.pg_start()
1442         capture = self.pg3.get_capture(len(pkts))
1443         self.verify_capture_out(capture, static_nat_ip, True)
1444
1445         # out2in 3rd interface
1446         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1447         self.pg3.add_stream(pkts)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450         capture = self.pg6.get_capture(len(pkts))
1451         self.verify_capture_in(capture, self.pg6)
1452
1453         # general user and session dump verifications
1454         users = self.vapi.nat44_user_dump()
1455         self.assertTrue(len(users) >= 3)
1456         addresses = self.vapi.nat44_address_dump()
1457         self.assertEqual(len(addresses), 1)
1458         for user in users:
1459             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1460                                                          user.vrf_id)
1461             for session in sessions:
1462                 self.assertEqual(user.ip_address, session.inside_ip_address)
1463                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1464                 self.assertTrue(session.protocol in
1465                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1466                                  IP_PROTOS.icmp])
1467
1468         # pg4 session dump
1469         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1470         self.assertTrue(len(sessions) >= 4)
1471         for session in sessions:
1472             self.assertFalse(session.is_static)
1473             self.assertEqual(session.inside_ip_address[0:4],
1474                              self.pg4.remote_ip4n)
1475             self.assertEqual(session.outside_ip_address,
1476                              addresses[0].ip_address)
1477
1478         # pg6 session dump
1479         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1480         self.assertTrue(len(sessions) >= 3)
1481         for session in sessions:
1482             self.assertTrue(session.is_static)
1483             self.assertEqual(session.inside_ip_address[0:4],
1484                              self.pg6.remote_ip4n)
1485             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1486                              map(int, static_nat_ip.split('.')))
1487             self.assertTrue(session.inside_port in
1488                             [self.tcp_port_in, self.udp_port_in,
1489                              self.icmp_id_in])
1490
1491     def test_hairpinning(self):
1492         """ NAT44 hairpinning - 1:1 NAPT """
1493
1494         host = self.pg0.remote_hosts[0]
1495         server = self.pg0.remote_hosts[1]
1496         host_in_port = 1234
1497         host_out_port = 0
1498         server_in_port = 5678
1499         server_out_port = 8765
1500
1501         self.nat44_add_address(self.nat_addr)
1502         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1503         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1504                                                   is_inside=0)
1505         # add static mapping for server
1506         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1507                                       server_in_port, server_out_port,
1508                                       proto=IP_PROTOS.tcp)
1509
1510         # send packet from host to server
1511         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1512              IP(src=host.ip4, dst=self.nat_addr) /
1513              TCP(sport=host_in_port, dport=server_out_port))
1514         self.pg0.add_stream(p)
1515         self.pg_enable_capture(self.pg_interfaces)
1516         self.pg_start()
1517         capture = self.pg0.get_capture(1)
1518         p = capture[0]
1519         try:
1520             ip = p[IP]
1521             tcp = p[TCP]
1522             self.assertEqual(ip.src, self.nat_addr)
1523             self.assertEqual(ip.dst, server.ip4)
1524             self.assertNotEqual(tcp.sport, host_in_port)
1525             self.assertEqual(tcp.dport, server_in_port)
1526             self.check_tcp_checksum(p)
1527             host_out_port = tcp.sport
1528         except:
1529             self.logger.error(ppp("Unexpected or invalid packet:", p))
1530             raise
1531
1532         # send reply from server to host
1533         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1534              IP(src=server.ip4, dst=self.nat_addr) /
1535              TCP(sport=server_in_port, dport=host_out_port))
1536         self.pg0.add_stream(p)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(1)
1540         p = capture[0]
1541         try:
1542             ip = p[IP]
1543             tcp = p[TCP]
1544             self.assertEqual(ip.src, self.nat_addr)
1545             self.assertEqual(ip.dst, host.ip4)
1546             self.assertEqual(tcp.sport, server_out_port)
1547             self.assertEqual(tcp.dport, host_in_port)
1548             self.check_tcp_checksum(p)
1549         except:
1550             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1551             raise
1552
1553     def test_hairpinning2(self):
1554         """ NAT44 hairpinning - 1:1 NAT"""
1555
1556         server1_nat_ip = "10.0.0.10"
1557         server2_nat_ip = "10.0.0.11"
1558         host = self.pg0.remote_hosts[0]
1559         server1 = self.pg0.remote_hosts[1]
1560         server2 = self.pg0.remote_hosts[2]
1561         server_tcp_port = 22
1562         server_udp_port = 20
1563
1564         self.nat44_add_address(self.nat_addr)
1565         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1566         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1567                                                   is_inside=0)
1568
1569         # add static mapping for servers
1570         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1571         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1572
1573         # host to server1
1574         pkts = []
1575         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1576              IP(src=host.ip4, dst=server1_nat_ip) /
1577              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1578         pkts.append(p)
1579         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1580              IP(src=host.ip4, dst=server1_nat_ip) /
1581              UDP(sport=self.udp_port_in, dport=server_udp_port))
1582         pkts.append(p)
1583         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1584              IP(src=host.ip4, dst=server1_nat_ip) /
1585              ICMP(id=self.icmp_id_in, type='echo-request'))
1586         pkts.append(p)
1587         self.pg0.add_stream(pkts)
1588         self.pg_enable_capture(self.pg_interfaces)
1589         self.pg_start()
1590         capture = self.pg0.get_capture(len(pkts))
1591         for packet in capture:
1592             try:
1593                 self.assertEqual(packet[IP].src, self.nat_addr)
1594                 self.assertEqual(packet[IP].dst, server1.ip4)
1595                 if packet.haslayer(TCP):
1596                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1597                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1598                     self.tcp_port_out = packet[TCP].sport
1599                     self.check_tcp_checksum(packet)
1600                 elif packet.haslayer(UDP):
1601                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1602                     self.assertEqual(packet[UDP].dport, server_udp_port)
1603                     self.udp_port_out = packet[UDP].sport
1604                 else:
1605                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1606                     self.icmp_id_out = packet[ICMP].id
1607             except:
1608                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1609                 raise
1610
1611         # server1 to host
1612         pkts = []
1613         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614              IP(src=server1.ip4, dst=self.nat_addr) /
1615              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1616         pkts.append(p)
1617         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618              IP(src=server1.ip4, dst=self.nat_addr) /
1619              UDP(sport=server_udp_port, dport=self.udp_port_out))
1620         pkts.append(p)
1621         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622              IP(src=server1.ip4, dst=self.nat_addr) /
1623              ICMP(id=self.icmp_id_out, type='echo-reply'))
1624         pkts.append(p)
1625         self.pg0.add_stream(pkts)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg0.get_capture(len(pkts))
1629         for packet in capture:
1630             try:
1631                 self.assertEqual(packet[IP].src, server1_nat_ip)
1632                 self.assertEqual(packet[IP].dst, host.ip4)
1633                 if packet.haslayer(TCP):
1634                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1635                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1636                     self.check_tcp_checksum(packet)
1637                 elif packet.haslayer(UDP):
1638                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1639                     self.assertEqual(packet[UDP].sport, server_udp_port)
1640                 else:
1641                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1642             except:
1643                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1644                 raise
1645
1646         # server2 to server1
1647         pkts = []
1648         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1649              IP(src=server2.ip4, dst=server1_nat_ip) /
1650              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1651         pkts.append(p)
1652         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1653              IP(src=server2.ip4, dst=server1_nat_ip) /
1654              UDP(sport=self.udp_port_in, dport=server_udp_port))
1655         pkts.append(p)
1656         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1657              IP(src=server2.ip4, dst=server1_nat_ip) /
1658              ICMP(id=self.icmp_id_in, type='echo-request'))
1659         pkts.append(p)
1660         self.pg0.add_stream(pkts)
1661         self.pg_enable_capture(self.pg_interfaces)
1662         self.pg_start()
1663         capture = self.pg0.get_capture(len(pkts))
1664         for packet in capture:
1665             try:
1666                 self.assertEqual(packet[IP].src, server2_nat_ip)
1667                 self.assertEqual(packet[IP].dst, server1.ip4)
1668                 if packet.haslayer(TCP):
1669                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1670                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1671                     self.tcp_port_out = packet[TCP].sport
1672                     self.check_tcp_checksum(packet)
1673                 elif packet.haslayer(UDP):
1674                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1675                     self.assertEqual(packet[UDP].dport, server_udp_port)
1676                     self.udp_port_out = packet[UDP].sport
1677                 else:
1678                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1679                     self.icmp_id_out = packet[ICMP].id
1680             except:
1681                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1682                 raise
1683
1684         # server1 to server2
1685         pkts = []
1686         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687              IP(src=server1.ip4, dst=server2_nat_ip) /
1688              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1689         pkts.append(p)
1690         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691              IP(src=server1.ip4, dst=server2_nat_ip) /
1692              UDP(sport=server_udp_port, dport=self.udp_port_out))
1693         pkts.append(p)
1694         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695              IP(src=server1.ip4, dst=server2_nat_ip) /
1696              ICMP(id=self.icmp_id_out, type='echo-reply'))
1697         pkts.append(p)
1698         self.pg0.add_stream(pkts)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         capture = self.pg0.get_capture(len(pkts))
1702         for packet in capture:
1703             try:
1704                 self.assertEqual(packet[IP].src, server1_nat_ip)
1705                 self.assertEqual(packet[IP].dst, server2.ip4)
1706                 if packet.haslayer(TCP):
1707                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1708                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1709                     self.check_tcp_checksum(packet)
1710                 elif packet.haslayer(UDP):
1711                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1712                     self.assertEqual(packet[UDP].sport, server_udp_port)
1713                 else:
1714                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1715             except:
1716                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1717                 raise
1718
1719     def test_max_translations_per_user(self):
1720         """ MAX translations per user - recycle the least recently used """
1721
1722         self.nat44_add_address(self.nat_addr)
1723         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1724         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1725                                                   is_inside=0)
1726
1727         # get maximum number of translations per user
1728         nat44_config = self.vapi.nat_show_config()
1729
1730         # send more than maximum number of translations per user packets
1731         pkts_num = nat44_config.max_translations_per_user + 5
1732         pkts = []
1733         for port in range(0, pkts_num):
1734             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1735                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1736                  TCP(sport=1025 + port))
1737             pkts.append(p)
1738         self.pg0.add_stream(pkts)
1739         self.pg_enable_capture(self.pg_interfaces)
1740         self.pg_start()
1741
1742         # verify number of translated packet
1743         self.pg1.get_capture(pkts_num)
1744
1745     def test_interface_addr(self):
1746         """ Acquire NAT44 addresses from interface """
1747         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1748
1749         # no address in NAT pool
1750         adresses = self.vapi.nat44_address_dump()
1751         self.assertEqual(0, len(adresses))
1752
1753         # configure interface address and check NAT address pool
1754         self.pg7.config_ip4()
1755         adresses = self.vapi.nat44_address_dump()
1756         self.assertEqual(1, len(adresses))
1757         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1758
1759         # remove interface address and check NAT address pool
1760         self.pg7.unconfig_ip4()
1761         adresses = self.vapi.nat44_address_dump()
1762         self.assertEqual(0, len(adresses))
1763
1764     def test_interface_addr_static_mapping(self):
1765         """ Static mapping with addresses from interface """
1766         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1767         self.nat44_add_static_mapping(
1768             '1.2.3.4',
1769             external_sw_if_index=self.pg7.sw_if_index)
1770
1771         # static mappings with external interface
1772         static_mappings = self.vapi.nat44_static_mapping_dump()
1773         self.assertEqual(1, len(static_mappings))
1774         self.assertEqual(self.pg7.sw_if_index,
1775                          static_mappings[0].external_sw_if_index)
1776
1777         # configure interface address and check static mappings
1778         self.pg7.config_ip4()
1779         static_mappings = self.vapi.nat44_static_mapping_dump()
1780         self.assertEqual(1, len(static_mappings))
1781         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1782                          self.pg7.local_ip4n)
1783         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1784
1785         # remove interface address and check static mappings
1786         self.pg7.unconfig_ip4()
1787         static_mappings = self.vapi.nat44_static_mapping_dump()
1788         self.assertEqual(0, len(static_mappings))
1789
1790     def test_ipfix_nat44_sess(self):
1791         """ IPFIX logging NAT44 session created/delted """
1792         self.ipfix_domain_id = 10
1793         self.ipfix_src_port = 20202
1794         colector_port = 30303
1795         bind_layers(UDP, IPFIX, dport=30303)
1796         self.nat44_add_address(self.nat_addr)
1797         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1798         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1799                                                   is_inside=0)
1800         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1801                                      src_address=self.pg3.local_ip4n,
1802                                      path_mtu=512,
1803                                      template_interval=10,
1804                                      collector_port=colector_port)
1805         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1806                             src_port=self.ipfix_src_port)
1807
1808         pkts = self.create_stream_in(self.pg0, self.pg1)
1809         self.pg0.add_stream(pkts)
1810         self.pg_enable_capture(self.pg_interfaces)
1811         self.pg_start()
1812         capture = self.pg1.get_capture(len(pkts))
1813         self.verify_capture_out(capture)
1814         self.nat44_add_address(self.nat_addr, is_add=0)
1815         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1816         capture = self.pg3.get_capture(3)
1817         ipfix = IPFIXDecoder()
1818         # first load template
1819         for p in capture:
1820             self.assertTrue(p.haslayer(IPFIX))
1821             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1822             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1823             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1824             self.assertEqual(p[UDP].dport, colector_port)
1825             self.assertEqual(p[IPFIX].observationDomainID,
1826                              self.ipfix_domain_id)
1827             if p.haslayer(Template):
1828                 ipfix.add_template(p.getlayer(Template))
1829         # verify events in data set
1830         for p in capture:
1831             if p.haslayer(Data):
1832                 data = ipfix.decode_data_set(p.getlayer(Set))
1833                 self.verify_ipfix_nat44_ses(data)
1834
1835     def test_ipfix_addr_exhausted(self):
1836         """ IPFIX logging NAT addresses exhausted """
1837         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1838         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1839                                                   is_inside=0)
1840         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1841                                      src_address=self.pg3.local_ip4n,
1842                                      path_mtu=512,
1843                                      template_interval=10)
1844         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1845                             src_port=self.ipfix_src_port)
1846
1847         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1848              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1849              TCP(sport=3025))
1850         self.pg0.add_stream(p)
1851         self.pg_enable_capture(self.pg_interfaces)
1852         self.pg_start()
1853         capture = self.pg1.get_capture(0)
1854         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1855         capture = self.pg3.get_capture(3)
1856         ipfix = IPFIXDecoder()
1857         # first load template
1858         for p in capture:
1859             self.assertTrue(p.haslayer(IPFIX))
1860             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1861             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1862             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1863             self.assertEqual(p[UDP].dport, 4739)
1864             self.assertEqual(p[IPFIX].observationDomainID,
1865                              self.ipfix_domain_id)
1866             if p.haslayer(Template):
1867                 ipfix.add_template(p.getlayer(Template))
1868         # verify events in data set
1869         for p in capture:
1870             if p.haslayer(Data):
1871                 data = ipfix.decode_data_set(p.getlayer(Set))
1872                 self.verify_ipfix_addr_exhausted(data)
1873
1874     def test_pool_addr_fib(self):
1875         """ NAT44 add pool addresses to FIB """
1876         static_addr = '10.0.0.10'
1877         self.nat44_add_address(self.nat_addr)
1878         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1880                                                   is_inside=0)
1881         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1882
1883         # NAT44 address
1884         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1885              ARP(op=ARP.who_has, pdst=self.nat_addr,
1886                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1887         self.pg1.add_stream(p)
1888         self.pg_enable_capture(self.pg_interfaces)
1889         self.pg_start()
1890         capture = self.pg1.get_capture(1)
1891         self.assertTrue(capture[0].haslayer(ARP))
1892         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1893
1894         # 1:1 NAT address
1895         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1896              ARP(op=ARP.who_has, pdst=static_addr,
1897                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1898         self.pg1.add_stream(p)
1899         self.pg_enable_capture(self.pg_interfaces)
1900         self.pg_start()
1901         capture = self.pg1.get_capture(1)
1902         self.assertTrue(capture[0].haslayer(ARP))
1903         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1904
1905         # send ARP to non-NAT44 interface
1906         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1907              ARP(op=ARP.who_has, pdst=self.nat_addr,
1908                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1909         self.pg2.add_stream(p)
1910         self.pg_enable_capture(self.pg_interfaces)
1911         self.pg_start()
1912         capture = self.pg1.get_capture(0)
1913
1914         # remove addresses and verify
1915         self.nat44_add_address(self.nat_addr, is_add=0)
1916         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1917                                       is_add=0)
1918
1919         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1920              ARP(op=ARP.who_has, pdst=self.nat_addr,
1921                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1922         self.pg1.add_stream(p)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925         capture = self.pg1.get_capture(0)
1926
1927         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1928              ARP(op=ARP.who_has, pdst=static_addr,
1929                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1930         self.pg1.add_stream(p)
1931         self.pg_enable_capture(self.pg_interfaces)
1932         self.pg_start()
1933         capture = self.pg1.get_capture(0)
1934
1935     def test_vrf_mode(self):
1936         """ NAT44 tenant VRF aware address pool mode """
1937
1938         vrf_id1 = 1
1939         vrf_id2 = 2
1940         nat_ip1 = "10.0.0.10"
1941         nat_ip2 = "10.0.0.11"
1942
1943         self.pg0.unconfig_ip4()
1944         self.pg1.unconfig_ip4()
1945         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1946         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1947         self.pg0.set_table_ip4(vrf_id1)
1948         self.pg1.set_table_ip4(vrf_id2)
1949         self.pg0.config_ip4()
1950         self.pg1.config_ip4()
1951
1952         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1953         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1954         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1956         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1957                                                   is_inside=0)
1958
1959         # first VRF
1960         pkts = self.create_stream_in(self.pg0, self.pg2)
1961         self.pg0.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg2.get_capture(len(pkts))
1965         self.verify_capture_out(capture, nat_ip1)
1966
1967         # second VRF
1968         pkts = self.create_stream_in(self.pg1, self.pg2)
1969         self.pg1.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg2.get_capture(len(pkts))
1973         self.verify_capture_out(capture, nat_ip2)
1974
1975         self.pg0.unconfig_ip4()
1976         self.pg1.unconfig_ip4()
1977         self.pg0.set_table_ip4(0)
1978         self.pg1.set_table_ip4(0)
1979         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1980         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1981
1982     def test_vrf_feature_independent(self):
1983         """ NAT44 tenant VRF independent address pool mode """
1984
1985         nat_ip1 = "10.0.0.10"
1986         nat_ip2 = "10.0.0.11"
1987
1988         self.nat44_add_address(nat_ip1)
1989         self.nat44_add_address(nat_ip2)
1990         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1991         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1992         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1993                                                   is_inside=0)
1994
1995         # first VRF
1996         pkts = self.create_stream_in(self.pg0, self.pg2)
1997         self.pg0.add_stream(pkts)
1998         self.pg_enable_capture(self.pg_interfaces)
1999         self.pg_start()
2000         capture = self.pg2.get_capture(len(pkts))
2001         self.verify_capture_out(capture, nat_ip1)
2002
2003         # second VRF
2004         pkts = self.create_stream_in(self.pg1, self.pg2)
2005         self.pg1.add_stream(pkts)
2006         self.pg_enable_capture(self.pg_interfaces)
2007         self.pg_start()
2008         capture = self.pg2.get_capture(len(pkts))
2009         self.verify_capture_out(capture, nat_ip1)
2010
2011     def test_dynamic_ipless_interfaces(self):
2012         """ NAT44 interfaces without configured IP address """
2013
2014         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2015                                       mactobinary(self.pg7.remote_mac),
2016                                       self.pg7.remote_ip4n,
2017                                       is_static=1)
2018         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2019                                       mactobinary(self.pg8.remote_mac),
2020                                       self.pg8.remote_ip4n,
2021                                       is_static=1)
2022
2023         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2024                                    dst_address_length=32,
2025                                    next_hop_address=self.pg7.remote_ip4n,
2026                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2027         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2028                                    dst_address_length=32,
2029                                    next_hop_address=self.pg8.remote_ip4n,
2030                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2031
2032         self.nat44_add_address(self.nat_addr)
2033         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2034         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2035                                                   is_inside=0)
2036
2037         # in2out
2038         pkts = self.create_stream_in(self.pg7, self.pg8)
2039         self.pg7.add_stream(pkts)
2040         self.pg_enable_capture(self.pg_interfaces)
2041         self.pg_start()
2042         capture = self.pg8.get_capture(len(pkts))
2043         self.verify_capture_out(capture)
2044
2045         # out2in
2046         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2047         self.pg8.add_stream(pkts)
2048         self.pg_enable_capture(self.pg_interfaces)
2049         self.pg_start()
2050         capture = self.pg7.get_capture(len(pkts))
2051         self.verify_capture_in(capture, self.pg7)
2052
2053     def test_static_ipless_interfaces(self):
2054         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2055
2056         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2057                                       mactobinary(self.pg7.remote_mac),
2058                                       self.pg7.remote_ip4n,
2059                                       is_static=1)
2060         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2061                                       mactobinary(self.pg8.remote_mac),
2062                                       self.pg8.remote_ip4n,
2063                                       is_static=1)
2064
2065         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2066                                    dst_address_length=32,
2067                                    next_hop_address=self.pg7.remote_ip4n,
2068                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2069         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2070                                    dst_address_length=32,
2071                                    next_hop_address=self.pg8.remote_ip4n,
2072                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2073
2074         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2075         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2076         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2077                                                   is_inside=0)
2078
2079         # out2in
2080         pkts = self.create_stream_out(self.pg8)
2081         self.pg8.add_stream(pkts)
2082         self.pg_enable_capture(self.pg_interfaces)
2083         self.pg_start()
2084         capture = self.pg7.get_capture(len(pkts))
2085         self.verify_capture_in(capture, self.pg7)
2086
2087         # in2out
2088         pkts = self.create_stream_in(self.pg7, self.pg8)
2089         self.pg7.add_stream(pkts)
2090         self.pg_enable_capture(self.pg_interfaces)
2091         self.pg_start()
2092         capture = self.pg8.get_capture(len(pkts))
2093         self.verify_capture_out(capture, self.nat_addr, True)
2094
2095     def test_static_with_port_ipless_interfaces(self):
2096         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2097
2098         self.tcp_port_out = 30606
2099         self.udp_port_out = 30607
2100         self.icmp_id_out = 30608
2101
2102         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2103                                       mactobinary(self.pg7.remote_mac),
2104                                       self.pg7.remote_ip4n,
2105                                       is_static=1)
2106         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2107                                       mactobinary(self.pg8.remote_mac),
2108                                       self.pg8.remote_ip4n,
2109                                       is_static=1)
2110
2111         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2112                                    dst_address_length=32,
2113                                    next_hop_address=self.pg7.remote_ip4n,
2114                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2115         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2116                                    dst_address_length=32,
2117                                    next_hop_address=self.pg8.remote_ip4n,
2118                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2119
2120         self.nat44_add_address(self.nat_addr)
2121         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2122                                       self.tcp_port_in, self.tcp_port_out,
2123                                       proto=IP_PROTOS.tcp)
2124         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2125                                       self.udp_port_in, self.udp_port_out,
2126                                       proto=IP_PROTOS.udp)
2127         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2128                                       self.icmp_id_in, self.icmp_id_out,
2129                                       proto=IP_PROTOS.icmp)
2130         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2131         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2132                                                   is_inside=0)
2133
2134         # out2in
2135         pkts = self.create_stream_out(self.pg8)
2136         self.pg8.add_stream(pkts)
2137         self.pg_enable_capture(self.pg_interfaces)
2138         self.pg_start()
2139         capture = self.pg7.get_capture(len(pkts))
2140         self.verify_capture_in(capture, self.pg7)
2141
2142         # in2out
2143         pkts = self.create_stream_in(self.pg7, self.pg8)
2144         self.pg7.add_stream(pkts)
2145         self.pg_enable_capture(self.pg_interfaces)
2146         self.pg_start()
2147         capture = self.pg8.get_capture(len(pkts))
2148         self.verify_capture_out(capture)
2149
2150     def test_static_unknown_proto(self):
2151         """ 1:1 NAT translate packet with unknown protocol """
2152         nat_ip = "10.0.0.10"
2153         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2154         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2155         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2156                                                   is_inside=0)
2157
2158         # in2out
2159         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2160              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2161              GRE() /
2162              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2163              TCP(sport=1234, dport=1234))
2164         self.pg0.add_stream(p)
2165         self.pg_enable_capture(self.pg_interfaces)
2166         self.pg_start()
2167         p = self.pg1.get_capture(1)
2168         packet = p[0]
2169         try:
2170             self.assertEqual(packet[IP].src, nat_ip)
2171             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2172             self.assertTrue(packet.haslayer(GRE))
2173             self.check_ip_checksum(packet)
2174         except:
2175             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2176             raise
2177
2178         # out2in
2179         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2180              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2181              GRE() /
2182              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2183              TCP(sport=1234, dport=1234))
2184         self.pg1.add_stream(p)
2185         self.pg_enable_capture(self.pg_interfaces)
2186         self.pg_start()
2187         p = self.pg0.get_capture(1)
2188         packet = p[0]
2189         try:
2190             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2191             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2192             self.assertTrue(packet.haslayer(GRE))
2193             self.check_ip_checksum(packet)
2194         except:
2195             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2196             raise
2197
2198     def test_hairpinning_static_unknown_proto(self):
2199         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2200
2201         host = self.pg0.remote_hosts[0]
2202         server = self.pg0.remote_hosts[1]
2203
2204         host_nat_ip = "10.0.0.10"
2205         server_nat_ip = "10.0.0.11"
2206
2207         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2208         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2209         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2210         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2211                                                   is_inside=0)
2212
2213         # host to server
2214         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2215              IP(src=host.ip4, dst=server_nat_ip) /
2216              GRE() /
2217              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2218              TCP(sport=1234, dport=1234))
2219         self.pg0.add_stream(p)
2220         self.pg_enable_capture(self.pg_interfaces)
2221         self.pg_start()
2222         p = self.pg0.get_capture(1)
2223         packet = p[0]
2224         try:
2225             self.assertEqual(packet[IP].src, host_nat_ip)
2226             self.assertEqual(packet[IP].dst, server.ip4)
2227             self.assertTrue(packet.haslayer(GRE))
2228             self.check_ip_checksum(packet)
2229         except:
2230             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231             raise
2232
2233         # server to host
2234         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2235              IP(src=server.ip4, dst=host_nat_ip) /
2236              GRE() /
2237              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2238              TCP(sport=1234, dport=1234))
2239         self.pg0.add_stream(p)
2240         self.pg_enable_capture(self.pg_interfaces)
2241         self.pg_start()
2242         p = self.pg0.get_capture(1)
2243         packet = p[0]
2244         try:
2245             self.assertEqual(packet[IP].src, server_nat_ip)
2246             self.assertEqual(packet[IP].dst, host.ip4)
2247             self.assertTrue(packet.haslayer(GRE))
2248             self.check_ip_checksum(packet)
2249         except:
2250             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2251             raise
2252
2253     def test_unknown_proto(self):
2254         """ NAT44 translate packet with unknown protocol """
2255         self.nat44_add_address(self.nat_addr)
2256         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2257         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2258                                                   is_inside=0)
2259
2260         # in2out
2261         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263              TCP(sport=self.tcp_port_in, dport=20))
2264         self.pg0.add_stream(p)
2265         self.pg_enable_capture(self.pg_interfaces)
2266         self.pg_start()
2267         p = self.pg1.get_capture(1)
2268
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2271              GRE() /
2272              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2273              TCP(sport=1234, dport=1234))
2274         self.pg0.add_stream(p)
2275         self.pg_enable_capture(self.pg_interfaces)
2276         self.pg_start()
2277         p = self.pg1.get_capture(1)
2278         packet = p[0]
2279         try:
2280             self.assertEqual(packet[IP].src, self.nat_addr)
2281             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2282             self.assertTrue(packet.haslayer(GRE))
2283             self.check_ip_checksum(packet)
2284         except:
2285             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2286             raise
2287
2288         # out2in
2289         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2290              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2291              GRE() /
2292              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2293              TCP(sport=1234, dport=1234))
2294         self.pg1.add_stream(p)
2295         self.pg_enable_capture(self.pg_interfaces)
2296         self.pg_start()
2297         p = self.pg0.get_capture(1)
2298         packet = p[0]
2299         try:
2300             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2301             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2302             self.assertTrue(packet.haslayer(GRE))
2303             self.check_ip_checksum(packet)
2304         except:
2305             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306             raise
2307
2308     def test_hairpinning_unknown_proto(self):
2309         """ NAT44 translate packet with unknown protocol - hairpinning """
2310         host = self.pg0.remote_hosts[0]
2311         server = self.pg0.remote_hosts[1]
2312         host_in_port = 1234
2313         host_out_port = 0
2314         server_in_port = 5678
2315         server_out_port = 8765
2316         server_nat_ip = "10.0.0.11"
2317
2318         self.nat44_add_address(self.nat_addr)
2319         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2320         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2321                                                   is_inside=0)
2322
2323         # add static mapping for server
2324         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2325
2326         # host to server
2327         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328              IP(src=host.ip4, dst=server_nat_ip) /
2329              TCP(sport=host_in_port, dport=server_out_port))
2330         self.pg0.add_stream(p)
2331         self.pg_enable_capture(self.pg_interfaces)
2332         self.pg_start()
2333         capture = self.pg0.get_capture(1)
2334
2335         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2336              IP(src=host.ip4, dst=server_nat_ip) /
2337              GRE() /
2338              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2339              TCP(sport=1234, dport=1234))
2340         self.pg0.add_stream(p)
2341         self.pg_enable_capture(self.pg_interfaces)
2342         self.pg_start()
2343         p = self.pg0.get_capture(1)
2344         packet = p[0]
2345         try:
2346             self.assertEqual(packet[IP].src, self.nat_addr)
2347             self.assertEqual(packet[IP].dst, server.ip4)
2348             self.assertTrue(packet.haslayer(GRE))
2349             self.check_ip_checksum(packet)
2350         except:
2351             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2352             raise
2353
2354         # server to host
2355         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2356              IP(src=server.ip4, dst=self.nat_addr) /
2357              GRE() /
2358              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2359              TCP(sport=1234, dport=1234))
2360         self.pg0.add_stream(p)
2361         self.pg_enable_capture(self.pg_interfaces)
2362         self.pg_start()
2363         p = self.pg0.get_capture(1)
2364         packet = p[0]
2365         try:
2366             self.assertEqual(packet[IP].src, server_nat_ip)
2367             self.assertEqual(packet[IP].dst, host.ip4)
2368             self.assertTrue(packet.haslayer(GRE))
2369             self.check_ip_checksum(packet)
2370         except:
2371             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2372             raise
2373
2374     def test_output_feature(self):
2375         """ NAT44 interface output feature (in2out postrouting) """
2376         self.nat44_add_address(self.nat_addr)
2377         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2378         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2379         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2380                                                          is_inside=0)
2381
2382         # in2out
2383         pkts = self.create_stream_in(self.pg0, self.pg3)
2384         self.pg0.add_stream(pkts)
2385         self.pg_enable_capture(self.pg_interfaces)
2386         self.pg_start()
2387         capture = self.pg3.get_capture(len(pkts))
2388         self.verify_capture_out(capture)
2389
2390         # out2in
2391         pkts = self.create_stream_out(self.pg3)
2392         self.pg3.add_stream(pkts)
2393         self.pg_enable_capture(self.pg_interfaces)
2394         self.pg_start()
2395         capture = self.pg0.get_capture(len(pkts))
2396         self.verify_capture_in(capture, self.pg0)
2397
2398         # from non-NAT interface to NAT inside interface
2399         pkts = self.create_stream_in(self.pg2, self.pg0)
2400         self.pg2.add_stream(pkts)
2401         self.pg_enable_capture(self.pg_interfaces)
2402         self.pg_start()
2403         capture = self.pg0.get_capture(len(pkts))
2404         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2405
2406     def test_output_feature_vrf_aware(self):
2407         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2408         nat_ip_vrf10 = "10.0.0.10"
2409         nat_ip_vrf20 = "10.0.0.20"
2410
2411         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2412                                    dst_address_length=32,
2413                                    next_hop_address=self.pg3.remote_ip4n,
2414                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2415                                    table_id=10)
2416         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2417                                    dst_address_length=32,
2418                                    next_hop_address=self.pg3.remote_ip4n,
2419                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2420                                    table_id=20)
2421
2422         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2423         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2424         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2425         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2426         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2427                                                          is_inside=0)
2428
2429         # in2out VRF 10
2430         pkts = self.create_stream_in(self.pg4, self.pg3)
2431         self.pg4.add_stream(pkts)
2432         self.pg_enable_capture(self.pg_interfaces)
2433         self.pg_start()
2434         capture = self.pg3.get_capture(len(pkts))
2435         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2436
2437         # out2in VRF 10
2438         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2439         self.pg3.add_stream(pkts)
2440         self.pg_enable_capture(self.pg_interfaces)
2441         self.pg_start()
2442         capture = self.pg4.get_capture(len(pkts))
2443         self.verify_capture_in(capture, self.pg4)
2444
2445         # in2out VRF 20
2446         pkts = self.create_stream_in(self.pg6, self.pg3)
2447         self.pg6.add_stream(pkts)
2448         self.pg_enable_capture(self.pg_interfaces)
2449         self.pg_start()
2450         capture = self.pg3.get_capture(len(pkts))
2451         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2452
2453         # out2in VRF 20
2454         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2455         self.pg3.add_stream(pkts)
2456         self.pg_enable_capture(self.pg_interfaces)
2457         self.pg_start()
2458         capture = self.pg6.get_capture(len(pkts))
2459         self.verify_capture_in(capture, self.pg6)
2460
2461     def test_output_feature_hairpinning(self):
2462         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2463         host = self.pg0.remote_hosts[0]
2464         server = self.pg0.remote_hosts[1]
2465         host_in_port = 1234
2466         host_out_port = 0
2467         server_in_port = 5678
2468         server_out_port = 8765
2469
2470         self.nat44_add_address(self.nat_addr)
2471         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2472         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2473                                                          is_inside=0)
2474
2475         # add static mapping for server
2476         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2477                                       server_in_port, server_out_port,
2478                                       proto=IP_PROTOS.tcp)
2479
2480         # send packet from host to server
2481         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2482              IP(src=host.ip4, dst=self.nat_addr) /
2483              TCP(sport=host_in_port, dport=server_out_port))
2484         self.pg0.add_stream(p)
2485         self.pg_enable_capture(self.pg_interfaces)
2486         self.pg_start()
2487         capture = self.pg0.get_capture(1)
2488         p = capture[0]
2489         try:
2490             ip = p[IP]
2491             tcp = p[TCP]
2492             self.assertEqual(ip.src, self.nat_addr)
2493             self.assertEqual(ip.dst, server.ip4)
2494             self.assertNotEqual(tcp.sport, host_in_port)
2495             self.assertEqual(tcp.dport, server_in_port)
2496             self.check_tcp_checksum(p)
2497             host_out_port = tcp.sport
2498         except:
2499             self.logger.error(ppp("Unexpected or invalid packet:", p))
2500             raise
2501
2502         # send reply from server to host
2503         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2504              IP(src=server.ip4, dst=self.nat_addr) /
2505              TCP(sport=server_in_port, dport=host_out_port))
2506         self.pg0.add_stream(p)
2507         self.pg_enable_capture(self.pg_interfaces)
2508         self.pg_start()
2509         capture = self.pg0.get_capture(1)
2510         p = capture[0]
2511         try:
2512             ip = p[IP]
2513             tcp = p[TCP]
2514             self.assertEqual(ip.src, self.nat_addr)
2515             self.assertEqual(ip.dst, host.ip4)
2516             self.assertEqual(tcp.sport, server_out_port)
2517             self.assertEqual(tcp.dport, host_in_port)
2518             self.check_tcp_checksum(p)
2519         except:
2520             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2521             raise
2522
2523     def test_one_armed_nat44(self):
2524         """ One armed NAT44 """
2525         remote_host = self.pg9.remote_hosts[0]
2526         local_host = self.pg9.remote_hosts[1]
2527         external_port = 0
2528
2529         self.nat44_add_address(self.nat_addr)
2530         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2531         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2532                                                   is_inside=0)
2533
2534         # in2out
2535         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2536              IP(src=local_host.ip4, dst=remote_host.ip4) /
2537              TCP(sport=12345, dport=80))
2538         self.pg9.add_stream(p)
2539         self.pg_enable_capture(self.pg_interfaces)
2540         self.pg_start()
2541         capture = self.pg9.get_capture(1)
2542         p = capture[0]
2543         try:
2544             ip = p[IP]
2545             tcp = p[TCP]
2546             self.assertEqual(ip.src, self.nat_addr)
2547             self.assertEqual(ip.dst, remote_host.ip4)
2548             self.assertNotEqual(tcp.sport, 12345)
2549             external_port = tcp.sport
2550             self.assertEqual(tcp.dport, 80)
2551             self.check_tcp_checksum(p)
2552             self.check_ip_checksum(p)
2553         except:
2554             self.logger.error(ppp("Unexpected or invalid packet:", p))
2555             raise
2556
2557         # out2in
2558         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2559              IP(src=remote_host.ip4, dst=self.nat_addr) /
2560              TCP(sport=80, dport=external_port))
2561         self.pg9.add_stream(p)
2562         self.pg_enable_capture(self.pg_interfaces)
2563         self.pg_start()
2564         capture = self.pg9.get_capture(1)
2565         p = capture[0]
2566         try:
2567             ip = p[IP]
2568             tcp = p[TCP]
2569             self.assertEqual(ip.src, remote_host.ip4)
2570             self.assertEqual(ip.dst, local_host.ip4)
2571             self.assertEqual(tcp.sport, 80)
2572             self.assertEqual(tcp.dport, 12345)
2573             self.check_tcp_checksum(p)
2574             self.check_ip_checksum(p)
2575         except:
2576             self.logger.error(ppp("Unexpected or invalid packet:", p))
2577             raise
2578
2579     def test_del_session(self):
2580         """ Delete NAT44 session """
2581         self.nat44_add_address(self.nat_addr)
2582         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2583         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2584                                                   is_inside=0)
2585
2586         pkts = self.create_stream_in(self.pg0, self.pg1)
2587         self.pg0.add_stream(pkts)
2588         self.pg_enable_capture(self.pg_interfaces)
2589         self.pg_start()
2590         capture = self.pg1.get_capture(len(pkts))
2591
2592         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2593         nsessions = len(sessions)
2594
2595         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2596                                     sessions[0].inside_port,
2597                                     sessions[0].protocol)
2598         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2599                                     sessions[1].outside_port,
2600                                     sessions[1].protocol,
2601                                     is_in=0)
2602
2603         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2604         self.assertEqual(nsessions - len(sessions), 2)
2605
2606     def test_set_get_reass(self):
2607         """ NAT44 set/get virtual fragmentation reassembly """
2608         reas_cfg1 = self.vapi.nat_get_reass()
2609
2610         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2611                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2612                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2613
2614         reas_cfg2 = self.vapi.nat_get_reass()
2615
2616         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2617         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2618         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2619
2620         self.vapi.nat_set_reass(drop_frag=1)
2621         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2622
2623     def test_frag_in_order(self):
2624         """ NAT44 translate fragments arriving in order """
2625         self.nat44_add_address(self.nat_addr)
2626         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2627         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2628                                                   is_inside=0)
2629
2630         data = "A" * 4 + "B" * 16 + "C" * 3
2631         self.tcp_port_in = random.randint(1025, 65535)
2632
2633         reass = self.vapi.nat_reass_dump()
2634         reass_n_start = len(reass)
2635
2636         # in2out
2637         pkts = self.create_stream_frag(self.pg0,
2638                                        self.pg1.remote_ip4,
2639                                        self.tcp_port_in,
2640                                        20,
2641                                        data)
2642         self.pg0.add_stream(pkts)
2643         self.pg_enable_capture(self.pg_interfaces)
2644         self.pg_start()
2645         frags = self.pg1.get_capture(len(pkts))
2646         p = self.reass_frags_and_verify(frags,
2647                                         self.nat_addr,
2648                                         self.pg1.remote_ip4)
2649         self.assertEqual(p[TCP].dport, 20)
2650         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2651         self.tcp_port_out = p[TCP].sport
2652         self.assertEqual(data, p[Raw].load)
2653
2654         # out2in
2655         pkts = self.create_stream_frag(self.pg1,
2656                                        self.nat_addr,
2657                                        20,
2658                                        self.tcp_port_out,
2659                                        data)
2660         self.pg1.add_stream(pkts)
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663         frags = self.pg0.get_capture(len(pkts))
2664         p = self.reass_frags_and_verify(frags,
2665                                         self.pg1.remote_ip4,
2666                                         self.pg0.remote_ip4)
2667         self.assertEqual(p[TCP].sport, 20)
2668         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2669         self.assertEqual(data, p[Raw].load)
2670
2671         reass = self.vapi.nat_reass_dump()
2672         reass_n_end = len(reass)
2673
2674         self.assertEqual(reass_n_end - reass_n_start, 2)
2675
2676     def test_reass_hairpinning(self):
2677         """ NAT44 fragments hairpinning """
2678         host = self.pg0.remote_hosts[0]
2679         server = self.pg0.remote_hosts[1]
2680         host_in_port = random.randint(1025, 65535)
2681         host_out_port = 0
2682         server_in_port = random.randint(1025, 65535)
2683         server_out_port = random.randint(1025, 65535)
2684         data = "A" * 4 + "B" * 16 + "C" * 3
2685
2686         self.nat44_add_address(self.nat_addr)
2687         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2688         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2689                                                   is_inside=0)
2690         # add static mapping for server
2691         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2692                                       server_in_port, server_out_port,
2693                                       proto=IP_PROTOS.tcp)
2694
2695         # send packet from host to server
2696         pkts = self.create_stream_frag(self.pg0,
2697                                        self.nat_addr,
2698                                        host_in_port,
2699                                        server_out_port,
2700                                        data)
2701         self.pg0.add_stream(pkts)
2702         self.pg_enable_capture(self.pg_interfaces)
2703         self.pg_start()
2704         frags = self.pg0.get_capture(len(pkts))
2705         p = self.reass_frags_and_verify(frags,
2706                                         self.nat_addr,
2707                                         server.ip4)
2708         self.assertNotEqual(p[TCP].sport, host_in_port)
2709         self.assertEqual(p[TCP].dport, server_in_port)
2710         self.assertEqual(data, p[Raw].load)
2711
2712     def test_frag_out_of_order(self):
2713         """ NAT44 translate fragments arriving out of order """
2714         self.nat44_add_address(self.nat_addr)
2715         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2716         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2717                                                   is_inside=0)
2718
2719         data = "A" * 4 + "B" * 16 + "C" * 3
2720         random.randint(1025, 65535)
2721
2722         # in2out
2723         pkts = self.create_stream_frag(self.pg0,
2724                                        self.pg1.remote_ip4,
2725                                        self.tcp_port_in,
2726                                        20,
2727                                        data)
2728         pkts.reverse()
2729         self.pg0.add_stream(pkts)
2730         self.pg_enable_capture(self.pg_interfaces)
2731         self.pg_start()
2732         frags = self.pg1.get_capture(len(pkts))
2733         p = self.reass_frags_and_verify(frags,
2734                                         self.nat_addr,
2735                                         self.pg1.remote_ip4)
2736         self.assertEqual(p[TCP].dport, 20)
2737         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2738         self.tcp_port_out = p[TCP].sport
2739         self.assertEqual(data, p[Raw].load)
2740
2741         # out2in
2742         pkts = self.create_stream_frag(self.pg1,
2743                                        self.nat_addr,
2744                                        20,
2745                                        self.tcp_port_out,
2746                                        data)
2747         pkts.reverse()
2748         self.pg1.add_stream(pkts)
2749         self.pg_enable_capture(self.pg_interfaces)
2750         self.pg_start()
2751         frags = self.pg0.get_capture(len(pkts))
2752         p = self.reass_frags_and_verify(frags,
2753                                         self.pg1.remote_ip4,
2754                                         self.pg0.remote_ip4)
2755         self.assertEqual(p[TCP].sport, 20)
2756         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2757         self.assertEqual(data, p[Raw].load)
2758
2759     def test_port_restricted(self):
2760         """ Port restricted NAT44 (MAP-E CE) """
2761         self.nat44_add_address(self.nat_addr)
2762         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2763         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2764                                                   is_inside=0)
2765         self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
2766                       "psid-offset 6 psid-len 6")
2767
2768         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2769              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2770              TCP(sport=4567, dport=22))
2771         self.pg0.add_stream(p)
2772         self.pg_enable_capture(self.pg_interfaces)
2773         self.pg_start()
2774         capture = self.pg1.get_capture(1)
2775         p = capture[0]
2776         try:
2777             ip = p[IP]
2778             tcp = p[TCP]
2779             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2780             self.assertEqual(ip.src, self.nat_addr)
2781             self.assertEqual(tcp.dport, 22)
2782             self.assertNotEqual(tcp.sport, 4567)
2783             self.assertEqual((tcp.sport >> 6) & 63, 10)
2784             self.check_tcp_checksum(p)
2785             self.check_ip_checksum(p)
2786         except:
2787             self.logger.error(ppp("Unexpected or invalid packet:", p))
2788             raise
2789
2790     def tearDown(self):
2791         super(TestNAT44, self).tearDown()
2792         if not self.vpp_dead:
2793             self.logger.info(self.vapi.cli("show nat44 verbose"))
2794             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2795             self.vapi.cli("nat44 addr-port-assignment-alg default")
2796             self.clear_nat44()
2797
2798
2799 class TestDeterministicNAT(MethodHolder):
2800     """ Deterministic NAT Test Cases """
2801
2802     @classmethod
2803     def setUpConstants(cls):
2804         super(TestDeterministicNAT, cls).setUpConstants()
2805         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2806
2807     @classmethod
2808     def setUpClass(cls):
2809         super(TestDeterministicNAT, cls).setUpClass()
2810
2811         try:
2812             cls.tcp_port_in = 6303
2813             cls.tcp_external_port = 6303
2814             cls.udp_port_in = 6304
2815             cls.udp_external_port = 6304
2816             cls.icmp_id_in = 6305
2817             cls.nat_addr = '10.0.0.3'
2818
2819             cls.create_pg_interfaces(range(3))
2820             cls.interfaces = list(cls.pg_interfaces)
2821
2822             for i in cls.interfaces:
2823                 i.admin_up()
2824                 i.config_ip4()
2825                 i.resolve_arp()
2826
2827             cls.pg0.generate_remote_hosts(2)
2828             cls.pg0.configure_ipv4_neighbors()
2829
2830         except Exception:
2831             super(TestDeterministicNAT, cls).tearDownClass()
2832             raise
2833
2834     def create_stream_in(self, in_if, out_if, ttl=64):
2835         """
2836         Create packet stream for inside network
2837
2838         :param in_if: Inside interface
2839         :param out_if: Outside interface
2840         :param ttl: TTL of generated packets
2841         """
2842         pkts = []
2843         # TCP
2844         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2845              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2846              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2847         pkts.append(p)
2848
2849         # UDP
2850         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2851              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2852              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2853         pkts.append(p)
2854
2855         # ICMP
2856         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2857              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2858              ICMP(id=self.icmp_id_in, type='echo-request'))
2859         pkts.append(p)
2860
2861         return pkts
2862
2863     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2864         """
2865         Create packet stream for outside network
2866
2867         :param out_if: Outside interface
2868         :param dst_ip: Destination IP address (Default use global NAT address)
2869         :param ttl: TTL of generated packets
2870         """
2871         if dst_ip is None:
2872             dst_ip = self.nat_addr
2873         pkts = []
2874         # TCP
2875         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2876              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2877              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2878         pkts.append(p)
2879
2880         # UDP
2881         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2882              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2883              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2884         pkts.append(p)
2885
2886         # ICMP
2887         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2888              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2889              ICMP(id=self.icmp_external_id, type='echo-reply'))
2890         pkts.append(p)
2891
2892         return pkts
2893
2894     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2895         """
2896         Verify captured packets on outside network
2897
2898         :param capture: Captured packets
2899         :param nat_ip: Translated IP address (Default use global NAT address)
2900         :param same_port: Sorce port number is not translated (Default False)
2901         :param packet_num: Expected number of packets (Default 3)
2902         """
2903         if nat_ip is None:
2904             nat_ip = self.nat_addr
2905         self.assertEqual(packet_num, len(capture))
2906         for packet in capture:
2907             try:
2908                 self.assertEqual(packet[IP].src, nat_ip)
2909                 if packet.haslayer(TCP):
2910                     self.tcp_port_out = packet[TCP].sport
2911                 elif packet.haslayer(UDP):
2912                     self.udp_port_out = packet[UDP].sport
2913                 else:
2914                     self.icmp_external_id = packet[ICMP].id
2915             except:
2916                 self.logger.error(ppp("Unexpected or invalid packet "
2917                                       "(outside network):", packet))
2918                 raise
2919
2920     def initiate_tcp_session(self, in_if, out_if):
2921         """
2922         Initiates TCP session
2923
2924         :param in_if: Inside interface
2925         :param out_if: Outside interface
2926         """
2927         try:
2928             # SYN packet in->out
2929             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2930                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2931                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2932                      flags="S"))
2933             in_if.add_stream(p)
2934             self.pg_enable_capture(self.pg_interfaces)
2935             self.pg_start()
2936             capture = out_if.get_capture(1)
2937             p = capture[0]
2938             self.tcp_port_out = p[TCP].sport
2939
2940             # SYN + ACK packet out->in
2941             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2942                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2943                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2944                      flags="SA"))
2945             out_if.add_stream(p)
2946             self.pg_enable_capture(self.pg_interfaces)
2947             self.pg_start()
2948             in_if.get_capture(1)
2949
2950             # ACK packet in->out
2951             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2952                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2953                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2954                      flags="A"))
2955             in_if.add_stream(p)
2956             self.pg_enable_capture(self.pg_interfaces)
2957             self.pg_start()
2958             out_if.get_capture(1)
2959
2960         except:
2961             self.logger.error("TCP 3 way handshake failed")
2962             raise
2963
2964     def verify_ipfix_max_entries_per_user(self, data):
2965         """
2966         Verify IPFIX maximum entries per user exceeded event
2967
2968         :param data: Decoded IPFIX data records
2969         """
2970         self.assertEqual(1, len(data))
2971         record = data[0]
2972         # natEvent
2973         self.assertEqual(ord(record[230]), 13)
2974         # natQuotaExceededEvent
2975         self.assertEqual('\x03\x00\x00\x00', record[466])
2976         # sourceIPv4Address
2977         self.assertEqual(self.pg0.remote_ip4n, record[8])
2978
2979     def test_deterministic_mode(self):
2980         """ NAT plugin run deterministic mode """
2981         in_addr = '172.16.255.0'
2982         out_addr = '172.17.255.50'
2983         in_addr_t = '172.16.255.20'
2984         in_addr_n = socket.inet_aton(in_addr)
2985         out_addr_n = socket.inet_aton(out_addr)
2986         in_addr_t_n = socket.inet_aton(in_addr_t)
2987         in_plen = 24
2988         out_plen = 32
2989
2990         nat_config = self.vapi.nat_show_config()
2991         self.assertEqual(1, nat_config.deterministic)
2992
2993         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2994
2995         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2996         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2997         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2998         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2999
3000         deterministic_mappings = self.vapi.nat_det_map_dump()
3001         self.assertEqual(len(deterministic_mappings), 1)
3002         dsm = deterministic_mappings[0]
3003         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3004         self.assertEqual(in_plen, dsm.in_plen)
3005         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3006         self.assertEqual(out_plen, dsm.out_plen)
3007
3008         self.clear_nat_det()
3009         deterministic_mappings = self.vapi.nat_det_map_dump()
3010         self.assertEqual(len(deterministic_mappings), 0)
3011
3012     def test_set_timeouts(self):
3013         """ Set deterministic NAT timeouts """
3014         timeouts_before = self.vapi.nat_det_get_timeouts()
3015
3016         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3017                                        timeouts_before.tcp_established + 10,
3018                                        timeouts_before.tcp_transitory + 10,
3019                                        timeouts_before.icmp + 10)
3020
3021         timeouts_after = self.vapi.nat_det_get_timeouts()
3022
3023         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3024         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3025         self.assertNotEqual(timeouts_before.tcp_established,
3026                             timeouts_after.tcp_established)
3027         self.assertNotEqual(timeouts_before.tcp_transitory,
3028                             timeouts_after.tcp_transitory)
3029
3030     def test_det_in(self):
3031         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3032
3033         nat_ip = "10.0.0.10"
3034
3035         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3036                                       32,
3037                                       socket.inet_aton(nat_ip),
3038                                       32)
3039         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3040         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3041                                                   is_inside=0)
3042
3043         # in2out
3044         pkts = self.create_stream_in(self.pg0, self.pg1)
3045         self.pg0.add_stream(pkts)
3046         self.pg_enable_capture(self.pg_interfaces)
3047         self.pg_start()
3048         capture = self.pg1.get_capture(len(pkts))
3049         self.verify_capture_out(capture, nat_ip)
3050
3051         # out2in
3052         pkts = self.create_stream_out(self.pg1, nat_ip)
3053         self.pg1.add_stream(pkts)
3054         self.pg_enable_capture(self.pg_interfaces)
3055         self.pg_start()
3056         capture = self.pg0.get_capture(len(pkts))
3057         self.verify_capture_in(capture, self.pg0)
3058
3059         # session dump test
3060         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3061         self.assertEqual(len(sessions), 3)
3062
3063         # TCP session
3064         s = sessions[0]
3065         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3066         self.assertEqual(s.in_port, self.tcp_port_in)
3067         self.assertEqual(s.out_port, self.tcp_port_out)
3068         self.assertEqual(s.ext_port, self.tcp_external_port)
3069
3070         # UDP session
3071         s = sessions[1]
3072         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3073         self.assertEqual(s.in_port, self.udp_port_in)
3074         self.assertEqual(s.out_port, self.udp_port_out)
3075         self.assertEqual(s.ext_port, self.udp_external_port)
3076
3077         # ICMP session
3078         s = sessions[2]
3079         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3080         self.assertEqual(s.in_port, self.icmp_id_in)
3081         self.assertEqual(s.out_port, self.icmp_external_id)
3082
3083     def test_multiple_users(self):
3084         """ Deterministic NAT multiple users """
3085
3086         nat_ip = "10.0.0.10"
3087         port_in = 80
3088         external_port = 6303
3089
3090         host0 = self.pg0.remote_hosts[0]
3091         host1 = self.pg0.remote_hosts[1]
3092
3093         self.vapi.nat_det_add_del_map(host0.ip4n,
3094                                       24,
3095                                       socket.inet_aton(nat_ip),
3096                                       32)
3097         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3098         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3099                                                   is_inside=0)
3100
3101         # host0 to out
3102         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3103              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3104              TCP(sport=port_in, dport=external_port))
3105         self.pg0.add_stream(p)
3106         self.pg_enable_capture(self.pg_interfaces)
3107         self.pg_start()
3108         capture = self.pg1.get_capture(1)
3109         p = capture[0]
3110         try:
3111             ip = p[IP]
3112             tcp = p[TCP]
3113             self.assertEqual(ip.src, nat_ip)
3114             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3115             self.assertEqual(tcp.dport, external_port)
3116             port_out0 = tcp.sport
3117         except:
3118             self.logger.error(ppp("Unexpected or invalid packet:", p))
3119             raise
3120
3121         # host1 to out
3122         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3123              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3124              TCP(sport=port_in, dport=external_port))
3125         self.pg0.add_stream(p)
3126         self.pg_enable_capture(self.pg_interfaces)
3127         self.pg_start()
3128         capture = self.pg1.get_capture(1)
3129         p = capture[0]
3130         try:
3131             ip = p[IP]
3132             tcp = p[TCP]
3133             self.assertEqual(ip.src, nat_ip)
3134             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3135             self.assertEqual(tcp.dport, external_port)
3136             port_out1 = tcp.sport
3137         except:
3138             self.logger.error(ppp("Unexpected or invalid packet:", p))
3139             raise
3140
3141         dms = self.vapi.nat_det_map_dump()
3142         self.assertEqual(1, len(dms))
3143         self.assertEqual(2, dms[0].ses_num)
3144
3145         # out to host0
3146         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3147              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3148              TCP(sport=external_port, dport=port_out0))
3149         self.pg1.add_stream(p)
3150         self.pg_enable_capture(self.pg_interfaces)
3151         self.pg_start()
3152         capture = self.pg0.get_capture(1)
3153         p = capture[0]
3154         try:
3155             ip = p[IP]
3156             tcp = p[TCP]
3157             self.assertEqual(ip.src, self.pg1.remote_ip4)
3158             self.assertEqual(ip.dst, host0.ip4)
3159             self.assertEqual(tcp.dport, port_in)
3160             self.assertEqual(tcp.sport, external_port)
3161         except:
3162             self.logger.error(ppp("Unexpected or invalid packet:", p))
3163             raise
3164
3165         # out to host1
3166         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3167              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3168              TCP(sport=external_port, dport=port_out1))
3169         self.pg1.add_stream(p)
3170         self.pg_enable_capture(self.pg_interfaces)
3171         self.pg_start()
3172         capture = self.pg0.get_capture(1)
3173         p = capture[0]
3174         try:
3175             ip = p[IP]
3176             tcp = p[TCP]
3177             self.assertEqual(ip.src, self.pg1.remote_ip4)
3178             self.assertEqual(ip.dst, host1.ip4)
3179             self.assertEqual(tcp.dport, port_in)
3180             self.assertEqual(tcp.sport, external_port)
3181         except:
3182             self.logger.error(ppp("Unexpected or invalid packet", p))
3183             raise
3184
3185         # session close api test
3186         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3187                                             port_out1,
3188                                             self.pg1.remote_ip4n,
3189                                             external_port)
3190         dms = self.vapi.nat_det_map_dump()
3191         self.assertEqual(dms[0].ses_num, 1)
3192
3193         self.vapi.nat_det_close_session_in(host0.ip4n,
3194                                            port_in,
3195                                            self.pg1.remote_ip4n,
3196                                            external_port)
3197         dms = self.vapi.nat_det_map_dump()
3198         self.assertEqual(dms[0].ses_num, 0)
3199
3200     def test_tcp_session_close_detection_in(self):
3201         """ Deterministic NAT TCP session close from inside network """
3202         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3203                                       32,
3204                                       socket.inet_aton(self.nat_addr),
3205                                       32)
3206         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3207         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3208                                                   is_inside=0)
3209
3210         self.initiate_tcp_session(self.pg0, self.pg1)
3211
3212         # close the session from inside
3213         try:
3214             # FIN packet in -> out
3215             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3216                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3217                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3218                      flags="F"))
3219             self.pg0.add_stream(p)
3220             self.pg_enable_capture(self.pg_interfaces)
3221             self.pg_start()
3222             self.pg1.get_capture(1)
3223
3224             pkts = []
3225
3226             # ACK packet out -> in
3227             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3228                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3229                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3230                      flags="A"))
3231             pkts.append(p)
3232
3233             # FIN packet out -> in
3234             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3235                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3236                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3237                      flags="F"))
3238             pkts.append(p)
3239
3240             self.pg1.add_stream(pkts)
3241             self.pg_enable_capture(self.pg_interfaces)
3242             self.pg_start()
3243             self.pg0.get_capture(2)
3244
3245             # ACK packet in -> out
3246             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3247                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3248                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3249                      flags="A"))
3250             self.pg0.add_stream(p)
3251             self.pg_enable_capture(self.pg_interfaces)
3252             self.pg_start()
3253             self.pg1.get_capture(1)
3254
3255             # Check if deterministic NAT44 closed the session
3256             dms = self.vapi.nat_det_map_dump()
3257             self.assertEqual(0, dms[0].ses_num)
3258         except:
3259             self.logger.error("TCP session termination failed")
3260             raise
3261
3262     def test_tcp_session_close_detection_out(self):
3263         """ Deterministic NAT TCP session close from outside network """
3264         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3265                                       32,
3266                                       socket.inet_aton(self.nat_addr),
3267                                       32)
3268         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3269         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3270                                                   is_inside=0)
3271
3272         self.initiate_tcp_session(self.pg0, self.pg1)
3273
3274         # close the session from outside
3275         try:
3276             # FIN packet out -> in
3277             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3278                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3279                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3280                      flags="F"))
3281             self.pg1.add_stream(p)
3282             self.pg_enable_capture(self.pg_interfaces)
3283             self.pg_start()
3284             self.pg0.get_capture(1)
3285
3286             pkts = []
3287
3288             # ACK packet in -> out
3289             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3290                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3291                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3292                      flags="A"))
3293             pkts.append(p)
3294
3295             # ACK packet in -> out
3296             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3297                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3298                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3299                      flags="F"))
3300             pkts.append(p)
3301
3302             self.pg0.add_stream(pkts)
3303             self.pg_enable_capture(self.pg_interfaces)
3304             self.pg_start()
3305             self.pg1.get_capture(2)
3306
3307             # ACK packet out -> in
3308             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3309                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3310                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3311                      flags="A"))
3312             self.pg1.add_stream(p)
3313             self.pg_enable_capture(self.pg_interfaces)
3314             self.pg_start()
3315             self.pg0.get_capture(1)
3316
3317             # Check if deterministic NAT44 closed the session
3318             dms = self.vapi.nat_det_map_dump()
3319             self.assertEqual(0, dms[0].ses_num)
3320         except:
3321             self.logger.error("TCP session termination failed")
3322             raise
3323
3324     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3325     def test_session_timeout(self):
3326         """ Deterministic NAT session timeouts """
3327         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3328                                       32,
3329                                       socket.inet_aton(self.nat_addr),
3330                                       32)
3331         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3332         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3333                                                   is_inside=0)
3334
3335         self.initiate_tcp_session(self.pg0, self.pg1)
3336         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3337         pkts = self.create_stream_in(self.pg0, self.pg1)
3338         self.pg0.add_stream(pkts)
3339         self.pg_enable_capture(self.pg_interfaces)
3340         self.pg_start()
3341         capture = self.pg1.get_capture(len(pkts))
3342         sleep(15)
3343
3344         dms = self.vapi.nat_det_map_dump()
3345         self.assertEqual(0, dms[0].ses_num)
3346
3347     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3348     def test_session_limit_per_user(self):
3349         """ Deterministic NAT maximum sessions per user limit """
3350         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3351                                       32,
3352                                       socket.inet_aton(self.nat_addr),
3353                                       32)
3354         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3356                                                   is_inside=0)
3357         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3358                                      src_address=self.pg2.local_ip4n,
3359                                      path_mtu=512,
3360                                      template_interval=10)
3361         self.vapi.nat_ipfix()
3362
3363         pkts = []
3364         for port in range(1025, 2025):
3365             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3366                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3367                  UDP(sport=port, dport=port))
3368             pkts.append(p)
3369
3370         self.pg0.add_stream(pkts)
3371         self.pg_enable_capture(self.pg_interfaces)
3372         self.pg_start()
3373         capture = self.pg1.get_capture(len(pkts))
3374
3375         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3376              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3377              UDP(sport=3001, dport=3002))
3378         self.pg0.add_stream(p)
3379         self.pg_enable_capture(self.pg_interfaces)
3380         self.pg_start()
3381         capture = self.pg1.assert_nothing_captured()
3382
3383         # verify ICMP error packet
3384         capture = self.pg0.get_capture(1)
3385         p = capture[0]
3386         self.assertTrue(p.haslayer(ICMP))
3387         icmp = p[ICMP]
3388         self.assertEqual(icmp.type, 3)
3389         self.assertEqual(icmp.code, 1)
3390         self.assertTrue(icmp.haslayer(IPerror))
3391         inner_ip = icmp[IPerror]
3392         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3393         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3394
3395         dms = self.vapi.nat_det_map_dump()
3396
3397         self.assertEqual(1000, dms[0].ses_num)
3398
3399         # verify IPFIX logging
3400         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3401         sleep(1)
3402         capture = self.pg2.get_capture(2)
3403         ipfix = IPFIXDecoder()
3404         # first load template
3405         for p in capture:
3406             self.assertTrue(p.haslayer(IPFIX))
3407             if p.haslayer(Template):
3408                 ipfix.add_template(p.getlayer(Template))
3409         # verify events in data set
3410         for p in capture:
3411             if p.haslayer(Data):
3412                 data = ipfix.decode_data_set(p.getlayer(Set))
3413                 self.verify_ipfix_max_entries_per_user(data)
3414
3415     def clear_nat_det(self):
3416         """
3417         Clear deterministic NAT configuration.
3418         """
3419         self.vapi.nat_ipfix(enable=0)
3420         self.vapi.nat_det_set_timeouts()
3421         deterministic_mappings = self.vapi.nat_det_map_dump()
3422         for dsm in deterministic_mappings:
3423             self.vapi.nat_det_add_del_map(dsm.in_addr,
3424                                           dsm.in_plen,
3425                                           dsm.out_addr,
3426                                           dsm.out_plen,
3427                                           is_add=0)
3428
3429         interfaces = self.vapi.nat44_interface_dump()
3430         for intf in interfaces:
3431             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3432                                                       intf.is_inside,
3433                                                       is_add=0)
3434
3435     def tearDown(self):
3436         super(TestDeterministicNAT, self).tearDown()
3437         if not self.vpp_dead:
3438             self.logger.info(self.vapi.cli("show nat44 detail"))
3439             self.clear_nat_det()
3440
3441
3442 class TestNAT64(MethodHolder):
3443     """ NAT64 Test Cases """
3444
3445     @classmethod
3446     def setUpClass(cls):
3447         super(TestNAT64, cls).setUpClass()
3448
3449         try:
3450             cls.tcp_port_in = 6303
3451             cls.tcp_port_out = 6303
3452             cls.udp_port_in = 6304
3453             cls.udp_port_out = 6304
3454             cls.icmp_id_in = 6305
3455             cls.icmp_id_out = 6305
3456             cls.nat_addr = '10.0.0.3'
3457             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3458             cls.vrf1_id = 10
3459             cls.vrf1_nat_addr = '10.0.10.3'
3460             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3461                                                    cls.vrf1_nat_addr)
3462
3463             cls.create_pg_interfaces(range(5))
3464             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3465             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3466             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3467
3468             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3469
3470             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3471
3472             cls.pg0.generate_remote_hosts(2)
3473
3474             for i in cls.ip6_interfaces:
3475                 i.admin_up()
3476                 i.config_ip6()
3477                 i.configure_ipv6_neighbors()
3478
3479             for i in cls.ip4_interfaces:
3480                 i.admin_up()
3481                 i.config_ip4()
3482                 i.resolve_arp()
3483
3484             cls.pg3.admin_up()
3485             cls.pg3.config_ip4()
3486             cls.pg3.resolve_arp()
3487             cls.pg3.config_ip6()
3488             cls.pg3.configure_ipv6_neighbors()
3489
3490         except Exception:
3491             super(TestNAT64, cls).tearDownClass()
3492             raise
3493
3494     def test_pool(self):
3495         """ Add/delete address to NAT64 pool """
3496         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3497
3498         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3499
3500         addresses = self.vapi.nat64_pool_addr_dump()
3501         self.assertEqual(len(addresses), 1)
3502         self.assertEqual(addresses[0].address, nat_addr)
3503
3504         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3505
3506         addresses = self.vapi.nat64_pool_addr_dump()
3507         self.assertEqual(len(addresses), 0)
3508
3509     def test_interface(self):
3510         """ Enable/disable NAT64 feature on the interface """
3511         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3512         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3513
3514         interfaces = self.vapi.nat64_interface_dump()
3515         self.assertEqual(len(interfaces), 2)
3516         pg0_found = False
3517         pg1_found = False
3518         for intf in interfaces:
3519             if intf.sw_if_index == self.pg0.sw_if_index:
3520                 self.assertEqual(intf.is_inside, 1)
3521                 pg0_found = True
3522             elif intf.sw_if_index == self.pg1.sw_if_index:
3523                 self.assertEqual(intf.is_inside, 0)
3524                 pg1_found = True
3525         self.assertTrue(pg0_found)
3526         self.assertTrue(pg1_found)
3527
3528         features = self.vapi.cli("show interface features pg0")
3529         self.assertNotEqual(features.find('nat64-in2out'), -1)
3530         features = self.vapi.cli("show interface features pg1")
3531         self.assertNotEqual(features.find('nat64-out2in'), -1)
3532
3533         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3534         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3535
3536         interfaces = self.vapi.nat64_interface_dump()
3537         self.assertEqual(len(interfaces), 0)
3538
3539     def test_static_bib(self):
3540         """ Add/delete static BIB entry """
3541         in_addr = socket.inet_pton(socket.AF_INET6,
3542                                    '2001:db8:85a3::8a2e:370:7334')
3543         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3544         in_port = 1234
3545         out_port = 5678
3546         proto = IP_PROTOS.tcp
3547
3548         self.vapi.nat64_add_del_static_bib(in_addr,
3549                                            out_addr,
3550                                            in_port,
3551                                            out_port,
3552                                            proto)
3553         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3554         static_bib_num = 0
3555         for bibe in bib:
3556             if bibe.is_static:
3557                 static_bib_num += 1
3558                 self.assertEqual(bibe.i_addr, in_addr)
3559                 self.assertEqual(bibe.o_addr, out_addr)
3560                 self.assertEqual(bibe.i_port, in_port)
3561                 self.assertEqual(bibe.o_port, out_port)
3562         self.assertEqual(static_bib_num, 1)
3563
3564         self.vapi.nat64_add_del_static_bib(in_addr,
3565                                            out_addr,
3566                                            in_port,
3567                                            out_port,
3568                                            proto,
3569                                            is_add=0)
3570         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3571         static_bib_num = 0
3572         for bibe in bib:
3573             if bibe.is_static:
3574                 static_bib_num += 1
3575         self.assertEqual(static_bib_num, 0)
3576
3577     def test_set_timeouts(self):
3578         """ Set NAT64 timeouts """
3579         # verify default values
3580         timeouts = self.vapi.nat64_get_timeouts()
3581         self.assertEqual(timeouts.udp, 300)
3582         self.assertEqual(timeouts.icmp, 60)
3583         self.assertEqual(timeouts.tcp_trans, 240)
3584         self.assertEqual(timeouts.tcp_est, 7440)
3585         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3586
3587         # set and verify custom values
3588         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3589                                      tcp_est=7450, tcp_incoming_syn=10)
3590         timeouts = self.vapi.nat64_get_timeouts()
3591         self.assertEqual(timeouts.udp, 200)
3592         self.assertEqual(timeouts.icmp, 30)
3593         self.assertEqual(timeouts.tcp_trans, 250)
3594         self.assertEqual(timeouts.tcp_est, 7450)
3595         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3596
3597     def test_dynamic(self):
3598         """ NAT64 dynamic translation test """
3599         self.tcp_port_in = 6303
3600         self.udp_port_in = 6304
3601         self.icmp_id_in = 6305
3602
3603         ses_num_start = self.nat64_get_ses_num()
3604
3605         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3606                                                 self.nat_addr_n)
3607         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3608         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3609
3610         # in2out
3611         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3612         self.pg0.add_stream(pkts)
3613         self.pg_enable_capture(self.pg_interfaces)
3614         self.pg_start()
3615         capture = self.pg1.get_capture(len(pkts))
3616         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3617                                 dst_ip=self.pg1.remote_ip4)
3618
3619         # out2in
3620         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3621         self.pg1.add_stream(pkts)
3622         self.pg_enable_capture(self.pg_interfaces)
3623         self.pg_start()
3624         capture = self.pg0.get_capture(len(pkts))
3625         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3626         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3627
3628         # in2out
3629         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3630         self.pg0.add_stream(pkts)
3631         self.pg_enable_capture(self.pg_interfaces)
3632         self.pg_start()
3633         capture = self.pg1.get_capture(len(pkts))
3634         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3635                                 dst_ip=self.pg1.remote_ip4)
3636
3637         # out2in
3638         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3639         self.pg1.add_stream(pkts)
3640         self.pg_enable_capture(self.pg_interfaces)
3641         self.pg_start()
3642         capture = self.pg0.get_capture(len(pkts))
3643         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3644
3645         ses_num_end = self.nat64_get_ses_num()
3646
3647         self.assertEqual(ses_num_end - ses_num_start, 3)
3648
3649         # tenant with specific VRF
3650         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3651                                                 self.vrf1_nat_addr_n,
3652                                                 vrf_id=self.vrf1_id)
3653         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3654
3655         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3656         self.pg2.add_stream(pkts)
3657         self.pg_enable_capture(self.pg_interfaces)
3658         self.pg_start()
3659         capture = self.pg1.get_capture(len(pkts))
3660         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3661                                 dst_ip=self.pg1.remote_ip4)
3662
3663         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3664         self.pg1.add_stream(pkts)
3665         self.pg_enable_capture(self.pg_interfaces)
3666         self.pg_start()
3667         capture = self.pg2.get_capture(len(pkts))
3668         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3669
3670     def test_static(self):
3671         """ NAT64 static translation test """
3672         self.tcp_port_in = 60303
3673         self.udp_port_in = 60304
3674         self.icmp_id_in = 60305
3675         self.tcp_port_out = 60303
3676         self.udp_port_out = 60304
3677         self.icmp_id_out = 60305
3678
3679         ses_num_start = self.nat64_get_ses_num()
3680
3681         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3682                                                 self.nat_addr_n)
3683         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3684         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3685
3686         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3687                                            self.nat_addr_n,
3688                                            self.tcp_port_in,
3689                                            self.tcp_port_out,
3690                                            IP_PROTOS.tcp)
3691         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3692                                            self.nat_addr_n,
3693                                            self.udp_port_in,
3694                                            self.udp_port_out,
3695                                            IP_PROTOS.udp)
3696         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3697                                            self.nat_addr_n,
3698                                            self.icmp_id_in,
3699                                            self.icmp_id_out,
3700                                            IP_PROTOS.icmp)
3701
3702         # in2out
3703         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3704         self.pg0.add_stream(pkts)
3705         self.pg_enable_capture(self.pg_interfaces)
3706         self.pg_start()
3707         capture = self.pg1.get_capture(len(pkts))
3708         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3709                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3710
3711         # out2in
3712         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3713         self.pg1.add_stream(pkts)
3714         self.pg_enable_capture(self.pg_interfaces)
3715         self.pg_start()
3716         capture = self.pg0.get_capture(len(pkts))
3717         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3718         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3719
3720         ses_num_end = self.nat64_get_ses_num()
3721
3722         self.assertEqual(ses_num_end - ses_num_start, 3)
3723
3724     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3725     def test_session_timeout(self):
3726         """ NAT64 session timeout """
3727         self.icmp_id_in = 1234
3728         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3729                                                 self.nat_addr_n)
3730         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3731         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3732         self.vapi.nat64_set_timeouts(icmp=5)
3733
3734         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3735         self.pg0.add_stream(pkts)
3736         self.pg_enable_capture(self.pg_interfaces)
3737         self.pg_start()
3738         capture = self.pg1.get_capture(len(pkts))
3739
3740         ses_num_before_timeout = self.nat64_get_ses_num()
3741
3742         sleep(15)
3743
3744         # ICMP session after timeout
3745         ses_num_after_timeout = self.nat64_get_ses_num()
3746         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3747
3748     def test_icmp_error(self):
3749         """ NAT64 ICMP Error message translation """
3750         self.tcp_port_in = 6303
3751         self.udp_port_in = 6304
3752         self.icmp_id_in = 6305
3753
3754         ses_num_start = self.nat64_get_ses_num()
3755
3756         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3757                                                 self.nat_addr_n)
3758         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3759         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3760
3761         # send some packets to create sessions
3762         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3763         self.pg0.add_stream(pkts)
3764         self.pg_enable_capture(self.pg_interfaces)
3765         self.pg_start()
3766         capture_ip4 = self.pg1.get_capture(len(pkts))
3767         self.verify_capture_out(capture_ip4,
3768                                 nat_ip=self.nat_addr,
3769                                 dst_ip=self.pg1.remote_ip4)
3770
3771         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3772         self.pg1.add_stream(pkts)
3773         self.pg_enable_capture(self.pg_interfaces)
3774         self.pg_start()
3775         capture_ip6 = self.pg0.get_capture(len(pkts))
3776         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3777         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3778                                    self.pg0.remote_ip6)
3779
3780         # in2out
3781         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3782                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3783                 ICMPv6DestUnreach(code=1) /
3784                 packet[IPv6] for packet in capture_ip6]
3785         self.pg0.add_stream(pkts)
3786         self.pg_enable_capture(self.pg_interfaces)
3787         self.pg_start()
3788         capture = self.pg1.get_capture(len(pkts))
3789         for packet in capture:
3790             try:
3791                 self.assertEqual(packet[IP].src, self.nat_addr)
3792                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3793                 self.assertEqual(packet[ICMP].type, 3)
3794                 self.assertEqual(packet[ICMP].code, 13)
3795                 inner = packet[IPerror]
3796                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3797                 self.assertEqual(inner.dst, self.nat_addr)
3798                 self.check_icmp_checksum(packet)
3799                 if inner.haslayer(TCPerror):
3800                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3801                 elif inner.haslayer(UDPerror):
3802                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3803                 else:
3804                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3805             except:
3806                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3807                 raise
3808
3809         # out2in
3810         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3811                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3812                 ICMP(type=3, code=13) /
3813                 packet[IP] for packet in capture_ip4]
3814         self.pg1.add_stream(pkts)
3815         self.pg_enable_capture(self.pg_interfaces)
3816         self.pg_start()
3817         capture = self.pg0.get_capture(len(pkts))
3818         for packet in capture:
3819             try:
3820                 self.assertEqual(packet[IPv6].src, ip.src)
3821                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3822                 icmp = packet[ICMPv6DestUnreach]
3823                 self.assertEqual(icmp.code, 1)
3824                 inner = icmp[IPerror6]
3825                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3826                 self.assertEqual(inner.dst, ip.src)
3827                 self.check_icmpv6_checksum(packet)
3828                 if inner.haslayer(TCPerror):
3829                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3830                 elif inner.haslayer(UDPerror):
3831                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3832                 else:
3833                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3834                                      self.icmp_id_in)
3835             except:
3836                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3837                 raise
3838
3839     def test_hairpinning(self):
3840         """ NAT64 hairpinning """
3841
3842         client = self.pg0.remote_hosts[0]
3843         server = self.pg0.remote_hosts[1]
3844         server_tcp_in_port = 22
3845         server_tcp_out_port = 4022
3846         server_udp_in_port = 23
3847         server_udp_out_port = 4023
3848         client_tcp_in_port = 1234
3849         client_udp_in_port = 1235
3850         client_tcp_out_port = 0
3851         client_udp_out_port = 0
3852         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3853         nat_addr_ip6 = ip.src
3854
3855         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3856                                                 self.nat_addr_n)
3857         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3858         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3859
3860         self.vapi.nat64_add_del_static_bib(server.ip6n,
3861                                            self.nat_addr_n,
3862                                            server_tcp_in_port,
3863                                            server_tcp_out_port,
3864                                            IP_PROTOS.tcp)
3865         self.vapi.nat64_add_del_static_bib(server.ip6n,
3866                                            self.nat_addr_n,
3867                                            server_udp_in_port,
3868                                            server_udp_out_port,
3869                                            IP_PROTOS.udp)
3870
3871         # client to server
3872         pkts = []
3873         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3874              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3875              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3876         pkts.append(p)
3877         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3878              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3879              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3880         pkts.append(p)
3881         self.pg0.add_stream(pkts)
3882         self.pg_enable_capture(self.pg_interfaces)
3883         self.pg_start()
3884         capture = self.pg0.get_capture(len(pkts))
3885         for packet in capture:
3886             try:
3887                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3888                 self.assertEqual(packet[IPv6].dst, server.ip6)
3889                 if packet.haslayer(TCP):
3890                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3891                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3892                     self.check_tcp_checksum(packet)
3893                     client_tcp_out_port = packet[TCP].sport
3894                 else:
3895                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3896                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3897                     self.check_udp_checksum(packet)
3898                     client_udp_out_port = packet[UDP].sport
3899             except:
3900                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3901                 raise
3902
3903         # server to client
3904         pkts = []
3905         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3906              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3907              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3908         pkts.append(p)
3909         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3910              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3911              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3912         pkts.append(p)
3913         self.pg0.add_stream(pkts)
3914         self.pg_enable_capture(self.pg_interfaces)
3915         self.pg_start()
3916         capture = self.pg0.get_capture(len(pkts))
3917         for packet in capture:
3918             try:
3919                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3920                 self.assertEqual(packet[IPv6].dst, client.ip6)
3921                 if packet.haslayer(TCP):
3922                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3923                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3924                     self.check_tcp_checksum(packet)
3925                 else:
3926                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3927                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3928                     self.check_udp_checksum(packet)
3929             except:
3930                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3931                 raise
3932
3933         # ICMP error
3934         pkts = []
3935         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3936                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3937                 ICMPv6DestUnreach(code=1) /
3938                 packet[IPv6] for packet in capture]
3939         self.pg0.add_stream(pkts)
3940         self.pg_enable_capture(self.pg_interfaces)
3941         self.pg_start()
3942         capture = self.pg0.get_capture(len(pkts))
3943         for packet in capture:
3944             try:
3945                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3946                 self.assertEqual(packet[IPv6].dst, server.ip6)
3947                 icmp = packet[ICMPv6DestUnreach]
3948                 self.assertEqual(icmp.code, 1)
3949                 inner = icmp[IPerror6]
3950                 self.assertEqual(inner.src, server.ip6)
3951                 self.assertEqual(inner.dst, nat_addr_ip6)
3952                 self.check_icmpv6_checksum(packet)
3953                 if inner.haslayer(TCPerror):
3954                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3955                     self.assertEqual(inner[TCPerror].dport,
3956                                      client_tcp_out_port)
3957                 else:
3958                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3959                     self.assertEqual(inner[UDPerror].dport,
3960                                      client_udp_out_port)
3961             except:
3962                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3963                 raise
3964
3965     def test_prefix(self):
3966         """ NAT64 Network-Specific Prefix """
3967
3968         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3969                                                 self.nat_addr_n)
3970         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3971         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3972         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3973                                                 self.vrf1_nat_addr_n,
3974                                                 vrf_id=self.vrf1_id)
3975         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3976
3977         # Add global prefix
3978         global_pref64 = "2001:db8::"
3979         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3980         global_pref64_len = 32
3981         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3982
3983         prefix = self.vapi.nat64_prefix_dump()
3984         self.assertEqual(len(prefix), 1)
3985         self.assertEqual(prefix[0].prefix, global_pref64_n)
3986         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3987         self.assertEqual(prefix[0].vrf_id, 0)
3988
3989         # Add tenant specific prefix
3990         vrf1_pref64 = "2001:db8:122:300::"
3991         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3992         vrf1_pref64_len = 56
3993         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3994                                        vrf1_pref64_len,
3995                                        vrf_id=self.vrf1_id)
3996         prefix = self.vapi.nat64_prefix_dump()
3997         self.assertEqual(len(prefix), 2)
3998
3999         # Global prefix
4000         pkts = self.create_stream_in_ip6(self.pg0,
4001                                          self.pg1,
4002                                          pref=global_pref64,
4003                                          plen=global_pref64_len)
4004         self.pg0.add_stream(pkts)
4005         self.pg_enable_capture(self.pg_interfaces)
4006         self.pg_start()
4007         capture = self.pg1.get_capture(len(pkts))
4008         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4009                                 dst_ip=self.pg1.remote_ip4)
4010
4011         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4012         self.pg1.add_stream(pkts)
4013         self.pg_enable_capture(self.pg_interfaces)
4014         self.pg_start()
4015         capture = self.pg0.get_capture(len(pkts))
4016         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4017                                   global_pref64,
4018                                   global_pref64_len)
4019         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4020
4021         # Tenant specific prefix
4022         pkts = self.create_stream_in_ip6(self.pg2,
4023                                          self.pg1,
4024                                          pref=vrf1_pref64,
4025                                          plen=vrf1_pref64_len)
4026         self.pg2.add_stream(pkts)
4027         self.pg_enable_capture(self.pg_interfaces)
4028         self.pg_start()
4029         capture = self.pg1.get_capture(len(pkts))
4030         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4031                                 dst_ip=self.pg1.remote_ip4)
4032
4033         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4034         self.pg1.add_stream(pkts)
4035         self.pg_enable_capture(self.pg_interfaces)
4036         self.pg_start()
4037         capture = self.pg2.get_capture(len(pkts))
4038         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4039                                   vrf1_pref64,
4040                                   vrf1_pref64_len)
4041         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4042
4043     def test_unknown_proto(self):
4044         """ NAT64 translate packet with unknown protocol """
4045
4046         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4047                                                 self.nat_addr_n)
4048         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4049         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4050         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4051
4052         # in2out
4053         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4054              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4055              TCP(sport=self.tcp_port_in, dport=20))
4056         self.pg0.add_stream(p)
4057         self.pg_enable_capture(self.pg_interfaces)
4058         self.pg_start()
4059         p = self.pg1.get_capture(1)
4060
4061         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4062              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4063              GRE() /
4064              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4065              TCP(sport=1234, dport=1234))
4066         self.pg0.add_stream(p)
4067         self.pg_enable_capture(self.pg_interfaces)
4068         self.pg_start()
4069         p = self.pg1.get_capture(1)
4070         packet = p[0]
4071         try:
4072             self.assertEqual(packet[IP].src, self.nat_addr)
4073             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4074             self.assertTrue(packet.haslayer(GRE))
4075             self.check_ip_checksum(packet)
4076         except:
4077             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4078             raise
4079
4080         # out2in
4081         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4082              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4083              GRE() /
4084              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4085              TCP(sport=1234, dport=1234))
4086         self.pg1.add_stream(p)
4087         self.pg_enable_capture(self.pg_interfaces)
4088         self.pg_start()
4089         p = self.pg0.get_capture(1)
4090         packet = p[0]
4091         try:
4092             self.assertEqual(packet[IPv6].src, remote_ip6)
4093             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4094             self.assertEqual(packet[IPv6].nh, 47)
4095         except:
4096             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4097             raise
4098
4099     def test_hairpinning_unknown_proto(self):
4100         """ NAT64 translate packet with unknown protocol - hairpinning """
4101
4102         client = self.pg0.remote_hosts[0]
4103         server = self.pg0.remote_hosts[1]
4104         server_tcp_in_port = 22
4105         server_tcp_out_port = 4022
4106         client_tcp_in_port = 1234
4107         client_tcp_out_port = 1235
4108         server_nat_ip = "10.0.0.100"
4109         client_nat_ip = "10.0.0.110"
4110         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4111         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4112         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4113         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4114
4115         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4116                                                 client_nat_ip_n)
4117         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4118         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4119
4120         self.vapi.nat64_add_del_static_bib(server.ip6n,
4121                                            server_nat_ip_n,
4122                                            server_tcp_in_port,
4123                                            server_tcp_out_port,
4124                                            IP_PROTOS.tcp)
4125
4126         self.vapi.nat64_add_del_static_bib(server.ip6n,
4127                                            server_nat_ip_n,
4128                                            0,
4129                                            0,
4130                                            IP_PROTOS.gre)
4131
4132         self.vapi.nat64_add_del_static_bib(client.ip6n,
4133                                            client_nat_ip_n,
4134                                            client_tcp_in_port,
4135                                            client_tcp_out_port,
4136                                            IP_PROTOS.tcp)
4137
4138         # client to server
4139         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4140              IPv6(src=client.ip6, dst=server_nat_ip6) /
4141              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4142         self.pg0.add_stream(p)
4143         self.pg_enable_capture(self.pg_interfaces)
4144         self.pg_start()
4145         p = self.pg0.get_capture(1)
4146
4147         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4148              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4149              GRE() /
4150              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4151              TCP(sport=1234, dport=1234))
4152         self.pg0.add_stream(p)
4153         self.pg_enable_capture(self.pg_interfaces)
4154         self.pg_start()
4155         p = self.pg0.get_capture(1)
4156         packet = p[0]
4157         try:
4158             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4159             self.assertEqual(packet[IPv6].dst, server.ip6)
4160             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4161         except:
4162             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4163             raise
4164
4165         # server to client
4166         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4167              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4168              GRE() /
4169              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4170              TCP(sport=1234, dport=1234))
4171         self.pg0.add_stream(p)
4172         self.pg_enable_capture(self.pg_interfaces)
4173         self.pg_start()
4174         p = self.pg0.get_capture(1)
4175         packet = p[0]
4176         try:
4177             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4178             self.assertEqual(packet[IPv6].dst, client.ip6)
4179             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4180         except:
4181             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4182             raise
4183
4184     def test_one_armed_nat64(self):
4185         """ One armed NAT64 """
4186         external_port = 0
4187         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4188                                            '64:ff9b::',
4189                                            96)
4190
4191         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4192                                                 self.nat_addr_n)
4193         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4194         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4195
4196         # in2out
4197         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4198              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4199              TCP(sport=12345, dport=80))
4200         self.pg3.add_stream(p)
4201         self.pg_enable_capture(self.pg_interfaces)
4202         self.pg_start()
4203         capture = self.pg3.get_capture(1)
4204         p = capture[0]
4205         try:
4206             ip = p[IP]
4207             tcp = p[TCP]
4208             self.assertEqual(ip.src, self.nat_addr)
4209             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4210             self.assertNotEqual(tcp.sport, 12345)
4211             external_port = tcp.sport
4212             self.assertEqual(tcp.dport, 80)
4213             self.check_tcp_checksum(p)
4214             self.check_ip_checksum(p)
4215         except:
4216             self.logger.error(ppp("Unexpected or invalid packet:", p))
4217             raise
4218
4219         # out2in
4220         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4221              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4222              TCP(sport=80, dport=external_port))
4223         self.pg3.add_stream(p)
4224         self.pg_enable_capture(self.pg_interfaces)
4225         self.pg_start()
4226         capture = self.pg3.get_capture(1)
4227         p = capture[0]
4228         try:
4229             ip = p[IPv6]
4230             tcp = p[TCP]
4231             self.assertEqual(ip.src, remote_host_ip6)
4232             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4233             self.assertEqual(tcp.sport, 80)
4234             self.assertEqual(tcp.dport, 12345)
4235             self.check_tcp_checksum(p)
4236         except:
4237             self.logger.error(ppp("Unexpected or invalid packet:", p))
4238             raise
4239
4240     def test_frag_in_order(self):
4241         """ NAT64 translate fragments arriving in order """
4242         self.tcp_port_in = random.randint(1025, 65535)
4243
4244         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4245                                                 self.nat_addr_n)
4246         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4247         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4248
4249         reass = self.vapi.nat_reass_dump()
4250         reass_n_start = len(reass)
4251
4252         # in2out
4253         data = 'a' * 200
4254         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4255                                            self.tcp_port_in, 20, data)
4256         self.pg0.add_stream(pkts)
4257         self.pg_enable_capture(self.pg_interfaces)
4258         self.pg_start()
4259         frags = self.pg1.get_capture(len(pkts))
4260         p = self.reass_frags_and_verify(frags,
4261                                         self.nat_addr,
4262                                         self.pg1.remote_ip4)
4263         self.assertEqual(p[TCP].dport, 20)
4264         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4265         self.tcp_port_out = p[TCP].sport
4266         self.assertEqual(data, p[Raw].load)
4267
4268         # out2in
4269         data = "A" * 4 + "b" * 16 + "C" * 3
4270         pkts = self.create_stream_frag(self.pg1,
4271                                        self.nat_addr,
4272                                        20,
4273                                        self.tcp_port_out,
4274                                        data)
4275         self.pg1.add_stream(pkts)
4276         self.pg_enable_capture(self.pg_interfaces)
4277         self.pg_start()
4278         frags = self.pg0.get_capture(len(pkts))
4279         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4280         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4281         self.assertEqual(p[TCP].sport, 20)
4282         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4283         self.assertEqual(data, p[Raw].load)
4284
4285         reass = self.vapi.nat_reass_dump()
4286         reass_n_end = len(reass)
4287
4288         self.assertEqual(reass_n_end - reass_n_start, 2)
4289
4290     def test_reass_hairpinning(self):
4291         """ NAT64 fragments hairpinning """
4292         data = 'a' * 200
4293         client = self.pg0.remote_hosts[0]
4294         server = self.pg0.remote_hosts[1]
4295         server_in_port = random.randint(1025, 65535)
4296         server_out_port = random.randint(1025, 65535)
4297         client_in_port = random.randint(1025, 65535)
4298         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4299         nat_addr_ip6 = ip.src
4300
4301         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4302                                                 self.nat_addr_n)
4303         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4304         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4305
4306         # add static BIB entry for server
4307         self.vapi.nat64_add_del_static_bib(server.ip6n,
4308                                            self.nat_addr_n,
4309                                            server_in_port,
4310                                            server_out_port,
4311                                            IP_PROTOS.tcp)
4312
4313         # send packet from host to server
4314         pkts = self.create_stream_frag_ip6(self.pg0,
4315                                            self.nat_addr,
4316                                            client_in_port,
4317                                            server_out_port,
4318                                            data)
4319         self.pg0.add_stream(pkts)
4320         self.pg_enable_capture(self.pg_interfaces)
4321         self.pg_start()
4322         frags = self.pg0.get_capture(len(pkts))
4323         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4324         self.assertNotEqual(p[TCP].sport, client_in_port)
4325         self.assertEqual(p[TCP].dport, server_in_port)
4326         self.assertEqual(data, p[Raw].load)
4327
4328     def test_frag_out_of_order(self):
4329         """ NAT64 translate fragments arriving out of order """
4330         self.tcp_port_in = random.randint(1025, 65535)
4331
4332         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4333                                                 self.nat_addr_n)
4334         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4335         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4336
4337         # in2out
4338         data = 'a' * 200
4339         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4340                                            self.tcp_port_in, 20, data)
4341         pkts.reverse()
4342         self.pg0.add_stream(pkts)
4343         self.pg_enable_capture(self.pg_interfaces)
4344         self.pg_start()
4345         frags = self.pg1.get_capture(len(pkts))
4346         p = self.reass_frags_and_verify(frags,
4347                                         self.nat_addr,
4348                                         self.pg1.remote_ip4)
4349         self.assertEqual(p[TCP].dport, 20)
4350         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4351         self.tcp_port_out = p[TCP].sport
4352         self.assertEqual(data, p[Raw].load)
4353
4354         # out2in
4355         data = "A" * 4 + "B" * 16 + "C" * 3
4356         pkts = self.create_stream_frag(self.pg1,
4357                                        self.nat_addr,
4358                                        20,
4359                                        self.tcp_port_out,
4360                                        data)
4361         pkts.reverse()
4362         self.pg1.add_stream(pkts)
4363         self.pg_enable_capture(self.pg_interfaces)
4364         self.pg_start()
4365         frags = self.pg0.get_capture(len(pkts))
4366         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4367         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4368         self.assertEqual(p[TCP].sport, 20)
4369         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4370         self.assertEqual(data, p[Raw].load)
4371
4372     def test_interface_addr(self):
4373         """ Acquire NAT64 pool addresses from interface """
4374         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4375
4376         # no address in NAT64 pool
4377         adresses = self.vapi.nat44_address_dump()
4378         self.assertEqual(0, len(adresses))
4379
4380         # configure interface address and check NAT64 address pool
4381         self.pg4.config_ip4()
4382         addresses = self.vapi.nat64_pool_addr_dump()
4383         self.assertEqual(len(addresses), 1)
4384         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4385
4386         # remove interface address and check NAT64 address pool
4387         self.pg4.unconfig_ip4()
4388         addresses = self.vapi.nat64_pool_addr_dump()
4389         self.assertEqual(0, len(adresses))
4390
4391     def nat64_get_ses_num(self):
4392         """
4393         Return number of active NAT64 sessions.
4394         """
4395         st = self.vapi.nat64_st_dump()
4396         return len(st)
4397
4398     def clear_nat64(self):
4399         """
4400         Clear NAT64 configuration.
4401         """
4402         self.vapi.nat64_set_timeouts()
4403
4404         interfaces = self.vapi.nat64_interface_dump()
4405         for intf in interfaces:
4406             if intf.is_inside > 1:
4407                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4408                                                   0,
4409                                                   is_add=0)
4410             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4411                                               intf.is_inside,
4412                                               is_add=0)
4413
4414         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4415         for bibe in bib:
4416             if bibe.is_static:
4417                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4418                                                    bibe.o_addr,
4419                                                    bibe.i_port,
4420                                                    bibe.o_port,
4421                                                    bibe.proto,
4422                                                    bibe.vrf_id,
4423                                                    is_add=0)
4424
4425         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4426         for bibe in bib:
4427             if bibe.is_static:
4428                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4429                                                    bibe.o_addr,
4430                                                    bibe.i_port,
4431                                                    bibe.o_port,
4432                                                    bibe.proto,
4433                                                    bibe.vrf_id,
4434                                                    is_add=0)
4435
4436         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4437         for bibe in bib:
4438             if bibe.is_static:
4439                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4440                                                    bibe.o_addr,
4441                                                    bibe.i_port,
4442                                                    bibe.o_port,
4443                                                    bibe.proto,
4444                                                    bibe.vrf_id,
4445                                                    is_add=0)
4446
4447         adresses = self.vapi.nat64_pool_addr_dump()
4448         for addr in adresses:
4449             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4450                                                     addr.address,
4451                                                     vrf_id=addr.vrf_id,
4452                                                     is_add=0)
4453
4454         prefixes = self.vapi.nat64_prefix_dump()
4455         for prefix in prefixes:
4456             self.vapi.nat64_add_del_prefix(prefix.prefix,
4457                                            prefix.prefix_len,
4458                                            vrf_id=prefix.vrf_id,
4459                                            is_add=0)
4460
4461     def tearDown(self):
4462         super(TestNAT64, self).tearDown()
4463         if not self.vpp_dead:
4464             self.logger.info(self.vapi.cli("show nat64 pool"))
4465             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4466             self.logger.info(self.vapi.cli("show nat64 prefix"))
4467             self.logger.info(self.vapi.cli("show nat64 bib all"))
4468             self.logger.info(self.vapi.cli("show nat64 session table all"))
4469             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4470             self.clear_nat64()
4471
4472
4473 class TestDSlite(MethodHolder):
4474     """ DS-Lite Test Cases """
4475
4476     @classmethod
4477     def setUpClass(cls):
4478         super(TestDSlite, cls).setUpClass()
4479
4480         try:
4481             cls.nat_addr = '10.0.0.3'
4482             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4483
4484             cls.create_pg_interfaces(range(2))
4485             cls.pg0.admin_up()
4486             cls.pg0.config_ip4()
4487             cls.pg0.resolve_arp()
4488             cls.pg1.admin_up()
4489             cls.pg1.config_ip6()
4490             cls.pg1.generate_remote_hosts(2)
4491             cls.pg1.configure_ipv6_neighbors()
4492
4493         except Exception:
4494             super(TestDSlite, cls).tearDownClass()
4495             raise
4496
4497     def test_dslite(self):
4498         """ Test DS-Lite """
4499         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4500                                                  self.nat_addr_n)
4501         aftr_ip4 = '192.0.0.1'
4502         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4503         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4504         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4505         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4506
4507         # UDP
4508         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4509              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4510              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4511              UDP(sport=20000, dport=10000))
4512         self.pg1.add_stream(p)
4513         self.pg_enable_capture(self.pg_interfaces)
4514         self.pg_start()
4515         capture = self.pg0.get_capture(1)
4516         capture = capture[0]
4517         self.assertFalse(capture.haslayer(IPv6))
4518         self.assertEqual(capture[IP].src, self.nat_addr)
4519         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4520         self.assertNotEqual(capture[UDP].sport, 20000)
4521         self.assertEqual(capture[UDP].dport, 10000)
4522         self.check_ip_checksum(capture)
4523         out_port = capture[UDP].sport
4524
4525         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4526              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4527              UDP(sport=10000, dport=out_port))
4528         self.pg0.add_stream(p)
4529         self.pg_enable_capture(self.pg_interfaces)
4530         self.pg_start()
4531         capture = self.pg1.get_capture(1)
4532         capture = capture[0]
4533         self.assertEqual(capture[IPv6].src, aftr_ip6)
4534         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4535         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4536         self.assertEqual(capture[IP].dst, '192.168.1.1')
4537         self.assertEqual(capture[UDP].sport, 10000)
4538         self.assertEqual(capture[UDP].dport, 20000)
4539         self.check_ip_checksum(capture)
4540
4541         # TCP
4542         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4543              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4544              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4545              TCP(sport=20001, dport=10001))
4546         self.pg1.add_stream(p)
4547         self.pg_enable_capture(self.pg_interfaces)
4548         self.pg_start()
4549         capture = self.pg0.get_capture(1)
4550         capture = capture[0]
4551         self.assertFalse(capture.haslayer(IPv6))
4552         self.assertEqual(capture[IP].src, self.nat_addr)
4553         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4554         self.assertNotEqual(capture[TCP].sport, 20001)
4555         self.assertEqual(capture[TCP].dport, 10001)
4556         self.check_ip_checksum(capture)
4557         self.check_tcp_checksum(capture)
4558         out_port = capture[TCP].sport
4559
4560         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4561              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4562              TCP(sport=10001, dport=out_port))
4563         self.pg0.add_stream(p)
4564         self.pg_enable_capture(self.pg_interfaces)
4565         self.pg_start()
4566         capture = self.pg1.get_capture(1)
4567         capture = capture[0]
4568         self.assertEqual(capture[IPv6].src, aftr_ip6)
4569         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4570         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4571         self.assertEqual(capture[IP].dst, '192.168.1.1')
4572         self.assertEqual(capture[TCP].sport, 10001)
4573         self.assertEqual(capture[TCP].dport, 20001)
4574         self.check_ip_checksum(capture)
4575         self.check_tcp_checksum(capture)
4576
4577         # ICMP
4578         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4579              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4580              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4581              ICMP(id=4000, type='echo-request'))
4582         self.pg1.add_stream(p)
4583         self.pg_enable_capture(self.pg_interfaces)
4584         self.pg_start()
4585         capture = self.pg0.get_capture(1)
4586         capture = capture[0]
4587         self.assertFalse(capture.haslayer(IPv6))
4588         self.assertEqual(capture[IP].src, self.nat_addr)
4589         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4590         self.assertNotEqual(capture[ICMP].id, 4000)
4591         self.check_ip_checksum(capture)
4592         self.check_icmp_checksum(capture)
4593         out_id = capture[ICMP].id
4594
4595         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4596              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4597              ICMP(id=out_id, type='echo-reply'))
4598         self.pg0.add_stream(p)
4599         self.pg_enable_capture(self.pg_interfaces)
4600         self.pg_start()
4601         capture = self.pg1.get_capture(1)
4602         capture = capture[0]
4603         self.assertEqual(capture[IPv6].src, aftr_ip6)
4604         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4605         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4606         self.assertEqual(capture[IP].dst, '192.168.1.1')
4607         self.assertEqual(capture[ICMP].id, 4000)
4608         self.check_ip_checksum(capture)
4609         self.check_icmp_checksum(capture)
4610
4611     def tearDown(self):
4612         super(TestDSlite, self).tearDown()
4613         if not self.vpp_dead:
4614             self.logger.info(self.vapi.cli("show dslite pool"))
4615             self.logger.info(
4616                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4617             self.logger.info(self.vapi.cli("show dslite sessions"))
4618
4619 if __name__ == '__main__':
4620     unittest.main(testRunner=VppTestRunner)