NAT64: multi-thread support (VPP-891)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
246         """
247         Create packet stream for outside network
248
249         :param out_if: Outside interface
250         :param dst_ip: Destination IP address (Default use global NAT address)
251         :param ttl: TTL of generated packets
252         """
253         if dst_ip is None:
254             dst_ip = self.nat_addr
255         pkts = []
256         # TCP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              TCP(dport=self.tcp_port_out, sport=20))
260         pkts.append(p)
261
262         # UDP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              UDP(dport=self.udp_port_out, sport=20))
266         pkts.append(p)
267
268         # ICMP
269         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271              ICMP(id=self.icmp_id_out, type='echo-reply'))
272         pkts.append(p)
273
274         return pkts
275
276     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277                            packet_num=3, dst_ip=None):
278         """
279         Verify captured packets on outside network
280
281         :param capture: Captured packets
282         :param nat_ip: Translated IP address (Default use global NAT address)
283         :param same_port: Sorce port number is not translated (Default False)
284         :param packet_num: Expected number of packets (Default 3)
285         :param dst_ip: Destination IP address (Default do not verify)
286         """
287         if nat_ip is None:
288             nat_ip = self.nat_addr
289         self.assertEqual(packet_num, len(capture))
290         for packet in capture:
291             try:
292                 self.check_ip_checksum(packet)
293                 self.assertEqual(packet[IP].src, nat_ip)
294                 if dst_ip is not None:
295                     self.assertEqual(packet[IP].dst, dst_ip)
296                 if packet.haslayer(TCP):
297                     if same_port:
298                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
299                     else:
300                         self.assertNotEqual(
301                             packet[TCP].sport, self.tcp_port_in)
302                     self.tcp_port_out = packet[TCP].sport
303                     self.check_tcp_checksum(packet)
304                 elif packet.haslayer(UDP):
305                     if same_port:
306                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
307                     else:
308                         self.assertNotEqual(
309                             packet[UDP].sport, self.udp_port_in)
310                     self.udp_port_out = packet[UDP].sport
311                 else:
312                     if same_port:
313                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
314                     else:
315                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316                     self.icmp_id_out = packet[ICMP].id
317                     self.check_icmp_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323     def verify_capture_in(self, capture, in_if, packet_num=3):
324         """
325         Verify captured packets on inside network
326
327         :param capture: Captured packets
328         :param in_if: Inside interface
329         :param packet_num: Expected number of packets (Default 3)
330         """
331         self.assertEqual(packet_num, len(capture))
332         for packet in capture:
333             try:
334                 self.check_ip_checksum(packet)
335                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336                 if packet.haslayer(TCP):
337                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338                     self.check_tcp_checksum(packet)
339                 elif packet.haslayer(UDP):
340                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
341                 else:
342                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343                     self.check_icmp_checksum(packet)
344             except:
345                 self.logger.error(ppp("Unexpected or invalid packet "
346                                       "(inside network):", packet))
347                 raise
348
349     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
350         """
351         Verify captured IPv6 packets on inside network
352
353         :param capture: Captured packets
354         :param src_ip: Source IP
355         :param dst_ip: Destination IP address
356         :param packet_num: Expected number of packets (Default 3)
357         """
358         self.assertEqual(packet_num, len(capture))
359         for packet in capture:
360             try:
361                 self.assertEqual(packet[IPv6].src, src_ip)
362                 self.assertEqual(packet[IPv6].dst, dst_ip)
363                 if packet.haslayer(TCP):
364                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365                     self.check_tcp_checksum(packet)
366                 elif packet.haslayer(UDP):
367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
368                     self.check_udp_checksum(packet)
369                 else:
370                     self.assertEqual(packet[ICMPv6EchoReply].id,
371                                      self.icmp_id_in)
372                     self.check_icmpv6_checksum(packet)
373             except:
374                 self.logger.error(ppp("Unexpected or invalid packet "
375                                       "(inside network):", packet))
376                 raise
377
378     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
379         """
380         Verify captured packet that don't have to be translated
381
382         :param capture: Captured packets
383         :param ingress_if: Ingress interface
384         :param egress_if: Egress interface
385         """
386         for packet in capture:
387             try:
388                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390                 if packet.haslayer(TCP):
391                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392                 elif packet.haslayer(UDP):
393                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
394                 else:
395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
396             except:
397                 self.logger.error(ppp("Unexpected or invalid packet "
398                                       "(inside network):", packet))
399                 raise
400
401     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402                                             packet_num=3, icmp_type=11):
403         """
404         Verify captured packets with ICMP errors on outside network
405
406         :param capture: Captured packets
407         :param src_ip: Translated IP address or IP address of VPP
408                        (Default use global NAT address)
409         :param packet_num: Expected number of packets (Default 3)
410         :param icmp_type: Type of error ICMP packet
411                           we are expecting (Default 11)
412         """
413         if src_ip is None:
414             src_ip = self.nat_addr
415         self.assertEqual(packet_num, len(capture))
416         for packet in capture:
417             try:
418                 self.assertEqual(packet[IP].src, src_ip)
419                 self.assertTrue(packet.haslayer(ICMP))
420                 icmp = packet[ICMP]
421                 self.assertEqual(icmp.type, icmp_type)
422                 self.assertTrue(icmp.haslayer(IPerror))
423                 inner_ip = icmp[IPerror]
424                 if inner_ip.haslayer(TCPerror):
425                     self.assertEqual(inner_ip[TCPerror].dport,
426                                      self.tcp_port_out)
427                 elif inner_ip.haslayer(UDPerror):
428                     self.assertEqual(inner_ip[UDPerror].dport,
429                                      self.udp_port_out)
430                 else:
431                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
432             except:
433                 self.logger.error(ppp("Unexpected or invalid packet "
434                                       "(outside network):", packet))
435                 raise
436
437     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
438                                            icmp_type=11):
439         """
440         Verify captured packets with ICMP errors on inside network
441
442         :param capture: Captured packets
443         :param in_if: Inside interface
444         :param packet_num: Expected number of packets (Default 3)
445         :param icmp_type: Type of error ICMP packet
446                           we are expecting (Default 11)
447         """
448         self.assertEqual(packet_num, len(capture))
449         for packet in capture:
450             try:
451                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452                 self.assertTrue(packet.haslayer(ICMP))
453                 icmp = packet[ICMP]
454                 self.assertEqual(icmp.type, icmp_type)
455                 self.assertTrue(icmp.haslayer(IPerror))
456                 inner_ip = icmp[IPerror]
457                 if inner_ip.haslayer(TCPerror):
458                     self.assertEqual(inner_ip[TCPerror].sport,
459                                      self.tcp_port_in)
460                 elif inner_ip.haslayer(UDPerror):
461                     self.assertEqual(inner_ip[UDPerror].sport,
462                                      self.udp_port_in)
463                 else:
464                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
465             except:
466                 self.logger.error(ppp("Unexpected or invalid packet "
467                                       "(inside network):", packet))
468                 raise
469
470     def create_stream_frag(self, src_if, dst, sport, dport, data):
471         """
472         Create fragmented packet stream
473
474         :param src_if: Source interface
475         :param dst: Destination IPv4 address
476         :param sport: Source TCP port
477         :param dport: Destination TCP port
478         :param data: Payload data
479         :returns: Fragmets
480         """
481         id = random.randint(0, 65535)
482         p = (IP(src=src_if.remote_ip4, dst=dst) /
483              TCP(sport=sport, dport=dport) /
484              Raw(data))
485         p = p.__class__(str(p))
486         chksum = p['TCP'].chksum
487         pkts = []
488         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490              TCP(sport=sport, dport=dport, chksum=chksum) /
491              Raw(data[0:4]))
492         pkts.append(p)
493         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495                 proto=IP_PROTOS.tcp) /
496              Raw(data[4:20]))
497         pkts.append(p)
498         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500                 id=id) /
501              Raw(data[20:]))
502         pkts.append(p)
503         return pkts
504
505     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506                                pref=None, plen=0, frag_size=128):
507         """
508         Create fragmented packet stream
509
510         :param src_if: Source interface
511         :param dst: Destination IPv4 address
512         :param sport: Source TCP port
513         :param dport: Destination TCP port
514         :param data: Payload data
515         :param pref: NAT64 prefix
516         :param plen: NAT64 prefix length
517         :param fragsize: size of fragments
518         :returns: Fragmets
519         """
520         if pref is None:
521             dst_ip6 = ''.join(['64:ff9b::', dst])
522         else:
523             dst_ip6 = self.compose_ip6(dst, pref, plen)
524
525         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528              TCP(sport=sport, dport=dport) /
529              Raw(data))
530
531         return fragment6(p, frag_size)
532
533     def reass_frags_and_verify(self, frags, src, dst):
534         """
535         Reassemble and verify fragmented packet
536
537         :param frags: Captured fragments
538         :param src: Source IPv4 address to verify
539         :param dst: Destination IPv4 address to verify
540
541         :returns: Reassembled IPv4 packet
542         """
543         buffer = StringIO.StringIO()
544         for p in frags:
545             self.assertEqual(p[IP].src, src)
546             self.assertEqual(p[IP].dst, dst)
547             self.check_ip_checksum(p)
548             buffer.seek(p[IP].frag * 8)
549             buffer.write(p[IP].payload)
550         ip = frags[0].getlayer(IP)
551         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552                 proto=frags[0][IP].proto)
553         if ip.proto == IP_PROTOS.tcp:
554             p = (ip / TCP(buffer.getvalue()))
555             self.check_tcp_checksum(p)
556         elif ip.proto == IP_PROTOS.udp:
557             p = (ip / UDP(buffer.getvalue()))
558         return p
559
560     def reass_frags_and_verify_ip6(self, frags, src, dst):
561         """
562         Reassemble and verify fragmented packet
563
564         :param frags: Captured fragments
565         :param src: Source IPv6 address to verify
566         :param dst: Destination IPv6 address to verify
567
568         :returns: Reassembled IPv6 packet
569         """
570         buffer = StringIO.StringIO()
571         for p in frags:
572             self.assertEqual(p[IPv6].src, src)
573             self.assertEqual(p[IPv6].dst, dst)
574             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575             buffer.write(p[IPv6ExtHdrFragment].payload)
576         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577                   nh=frags[0][IPv6ExtHdrFragment].nh)
578         if ip.nh == IP_PROTOS.tcp:
579             p = (ip / TCP(buffer.getvalue()))
580             self.check_tcp_checksum(p)
581         elif ip.nh == IP_PROTOS.udp:
582             p = (ip / UDP(buffer.getvalue()))
583         return p
584
585     def verify_ipfix_nat44_ses(self, data):
586         """
587         Verify IPFIX NAT44 session create/delete event
588
589         :param data: Decoded IPFIX data records
590         """
591         nat44_ses_create_num = 0
592         nat44_ses_delete_num = 0
593         self.assertEqual(6, len(data))
594         for record in data:
595             # natEvent
596             self.assertIn(ord(record[230]), [4, 5])
597             if ord(record[230]) == 4:
598                 nat44_ses_create_num += 1
599             else:
600                 nat44_ses_delete_num += 1
601             # sourceIPv4Address
602             self.assertEqual(self.pg0.remote_ip4n, record[8])
603             # postNATSourceIPv4Address
604             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
605                              record[225])
606             # ingressVRFID
607             self.assertEqual(struct.pack("!I", 0), record[234])
608             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609             if IP_PROTOS.icmp == ord(record[4]):
610                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
612                                  record[227])
613             elif IP_PROTOS.tcp == ord(record[4]):
614                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
615                                  record[7])
616                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
617                                  record[227])
618             elif IP_PROTOS.udp == ord(record[4]):
619                 self.assertEqual(struct.pack("!H", self.udp_port_in),
620                                  record[7])
621                 self.assertEqual(struct.pack("!H", self.udp_port_out),
622                                  record[227])
623             else:
624                 self.fail("Invalid protocol")
625         self.assertEqual(3, nat44_ses_create_num)
626         self.assertEqual(3, nat44_ses_delete_num)
627
628     def verify_ipfix_addr_exhausted(self, data):
629         """
630         Verify IPFIX NAT addresses event
631
632         :param data: Decoded IPFIX data records
633         """
634         self.assertEqual(1, len(data))
635         record = data[0]
636         # natEvent
637         self.assertEqual(ord(record[230]), 3)
638         # natPoolID
639         self.assertEqual(struct.pack("!I", 0), record[283])
640
641
642 class TestNAT44(MethodHolder):
643     """ NAT44 Test Cases """
644
645     @classmethod
646     def setUpClass(cls):
647         super(TestNAT44, cls).setUpClass()
648
649         try:
650             cls.tcp_port_in = 6303
651             cls.tcp_port_out = 6303
652             cls.udp_port_in = 6304
653             cls.udp_port_out = 6304
654             cls.icmp_id_in = 6305
655             cls.icmp_id_out = 6305
656             cls.nat_addr = '10.0.0.3'
657             cls.ipfix_src_port = 4739
658             cls.ipfix_domain_id = 1
659
660             cls.create_pg_interfaces(range(10))
661             cls.interfaces = list(cls.pg_interfaces[0:4])
662
663             for i in cls.interfaces:
664                 i.admin_up()
665                 i.config_ip4()
666                 i.resolve_arp()
667
668             cls.pg0.generate_remote_hosts(3)
669             cls.pg0.configure_ipv4_neighbors()
670
671             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672             cls.vapi.ip_table_add_del(10, is_add=1)
673             cls.vapi.ip_table_add_del(20, is_add=1)
674
675             cls.pg4._local_ip4 = "172.16.255.1"
676             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678             cls.pg4.set_table_ip4(10)
679             cls.pg5._local_ip4 = "172.17.255.3"
680             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682             cls.pg5.set_table_ip4(10)
683             cls.pg6._local_ip4 = "172.16.255.1"
684             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686             cls.pg6.set_table_ip4(20)
687             for i in cls.overlapping_interfaces:
688                 i.config_ip4()
689                 i.admin_up()
690                 i.resolve_arp()
691
692             cls.pg7.admin_up()
693             cls.pg8.admin_up()
694
695             cls.pg9.generate_remote_hosts(2)
696             cls.pg9.config_ip4()
697             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
699                                                   ip_addr_n,
700                                                   24)
701             cls.pg9.admin_up()
702             cls.pg9.resolve_arp()
703             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705             cls.pg9.resolve_arp()
706
707         except Exception:
708             super(TestNAT44, cls).tearDownClass()
709             raise
710
711     def clear_nat44(self):
712         """
713         Clear NAT44 configuration.
714         """
715         # I found no elegant way to do this
716         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717                                    dst_address_length=32,
718                                    next_hop_address=self.pg7.remote_ip4n,
719                                    next_hop_sw_if_index=self.pg7.sw_if_index,
720                                    is_add=0)
721         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722                                    dst_address_length=32,
723                                    next_hop_address=self.pg8.remote_ip4n,
724                                    next_hop_sw_if_index=self.pg8.sw_if_index,
725                                    is_add=0)
726
727         for intf in [self.pg7, self.pg8]:
728             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
729             for n in neighbors:
730                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
731                                               n.mac_address,
732                                               n.ip_address,
733                                               is_add=0)
734
735         if self.pg7.has_ip4_config:
736             self.pg7.unconfig_ip4()
737
738         interfaces = self.vapi.nat44_interface_addr_dump()
739         for intf in interfaces:
740             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
741
742         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
743                             domain_id=self.ipfix_domain_id)
744         self.ipfix_src_port = 4739
745         self.ipfix_domain_id = 1
746
747         interfaces = self.vapi.nat44_interface_dump()
748         for intf in interfaces:
749             if intf.is_inside > 1:
750                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
751                                                           0,
752                                                           is_add=0)
753             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
754                                                       intf.is_inside,
755                                                       is_add=0)
756
757         interfaces = self.vapi.nat44_interface_output_feature_dump()
758         for intf in interfaces:
759             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
760                                                              intf.is_inside,
761                                                              is_add=0)
762
763         static_mappings = self.vapi.nat44_static_mapping_dump()
764         for sm in static_mappings:
765             self.vapi.nat44_add_del_static_mapping(
766                 sm.local_ip_address,
767                 sm.external_ip_address,
768                 local_port=sm.local_port,
769                 external_port=sm.external_port,
770                 addr_only=sm.addr_only,
771                 vrf_id=sm.vrf_id,
772                 protocol=sm.protocol,
773                 is_add=0)
774
775         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
776         for lb_sm in lb_static_mappings:
777             self.vapi.nat44_add_del_lb_static_mapping(
778                 lb_sm.external_addr,
779                 lb_sm.external_port,
780                 lb_sm.protocol,
781                 lb_sm.vrf_id,
782                 is_add=0,
783                 local_num=0,
784                 locals=[])
785
786         identity_mappings = self.vapi.nat44_identity_mapping_dump()
787         for id_m in identity_mappings:
788             self.vapi.nat44_add_del_identity_mapping(
789                 addr_only=id_m.addr_only,
790                 ip=id_m.ip_address,
791                 port=id_m.port,
792                 sw_if_index=id_m.sw_if_index,
793                 vrf_id=id_m.vrf_id,
794                 protocol=id_m.protocol,
795                 is_add=0)
796
797         adresses = self.vapi.nat44_address_dump()
798         for addr in adresses:
799             self.vapi.nat44_add_del_address_range(addr.ip_address,
800                                                   addr.ip_address,
801                                                   is_add=0)
802
803         self.vapi.nat_set_reass()
804         self.vapi.nat_set_reass(is_ip6=1)
805
806     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
807                                  local_port=0, external_port=0, vrf_id=0,
808                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
809                                  proto=0):
810         """
811         Add/delete NAT44 static mapping
812
813         :param local_ip: Local IP address
814         :param external_ip: External IP address
815         :param local_port: Local port number (Optional)
816         :param external_port: External port number (Optional)
817         :param vrf_id: VRF ID (Default 0)
818         :param is_add: 1 if add, 0 if delete (Default add)
819         :param external_sw_if_index: External interface instead of IP address
820         :param proto: IP protocol (Mandatory if port specified)
821         """
822         addr_only = 1
823         if local_port and external_port:
824             addr_only = 0
825         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
826         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
827         self.vapi.nat44_add_del_static_mapping(
828             l_ip,
829             e_ip,
830             external_sw_if_index,
831             local_port,
832             external_port,
833             addr_only,
834             vrf_id,
835             proto,
836             is_add)
837
838     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
839         """
840         Add/delete NAT44 address
841
842         :param ip: IP address
843         :param is_add: 1 if add, 0 if delete (Default add)
844         """
845         nat_addr = socket.inet_pton(socket.AF_INET, ip)
846         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
847                                               vrf_id=vrf_id)
848
849     def test_dynamic(self):
850         """ NAT44 dynamic translation test """
851
852         self.nat44_add_address(self.nat_addr)
853         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
854         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
855                                                   is_inside=0)
856
857         # in2out
858         pkts = self.create_stream_in(self.pg0, self.pg1)
859         self.pg0.add_stream(pkts)
860         self.pg_enable_capture(self.pg_interfaces)
861         self.pg_start()
862         capture = self.pg1.get_capture(len(pkts))
863         self.verify_capture_out(capture)
864
865         # out2in
866         pkts = self.create_stream_out(self.pg1)
867         self.pg1.add_stream(pkts)
868         self.pg_enable_capture(self.pg_interfaces)
869         self.pg_start()
870         capture = self.pg0.get_capture(len(pkts))
871         self.verify_capture_in(capture, self.pg0)
872
873     def test_dynamic_icmp_errors_in2out_ttl_1(self):
874         """ NAT44 handling of client packets with TTL=1 """
875
876         self.nat44_add_address(self.nat_addr)
877         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
878         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
879                                                   is_inside=0)
880
881         # Client side - generate traffic
882         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
883         self.pg0.add_stream(pkts)
884         self.pg_enable_capture(self.pg_interfaces)
885         self.pg_start()
886
887         # Client side - verify ICMP type 11 packets
888         capture = self.pg0.get_capture(len(pkts))
889         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
890
891     def test_dynamic_icmp_errors_out2in_ttl_1(self):
892         """ NAT44 handling of server packets with TTL=1 """
893
894         self.nat44_add_address(self.nat_addr)
895         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
896         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
897                                                   is_inside=0)
898
899         # Client side - create sessions
900         pkts = self.create_stream_in(self.pg0, self.pg1)
901         self.pg0.add_stream(pkts)
902         self.pg_enable_capture(self.pg_interfaces)
903         self.pg_start()
904
905         # Server side - generate traffic
906         capture = self.pg1.get_capture(len(pkts))
907         self.verify_capture_out(capture)
908         pkts = self.create_stream_out(self.pg1, ttl=1)
909         self.pg1.add_stream(pkts)
910         self.pg_enable_capture(self.pg_interfaces)
911         self.pg_start()
912
913         # Server side - verify ICMP type 11 packets
914         capture = self.pg1.get_capture(len(pkts))
915         self.verify_capture_out_with_icmp_errors(capture,
916                                                  src_ip=self.pg1.local_ip4)
917
918     def test_dynamic_icmp_errors_in2out_ttl_2(self):
919         """ NAT44 handling of error responses to client packets with TTL=2 """
920
921         self.nat44_add_address(self.nat_addr)
922         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
923         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
924                                                   is_inside=0)
925
926         # Client side - generate traffic
927         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
928         self.pg0.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931
932         # Server side - simulate ICMP type 11 response
933         capture = self.pg1.get_capture(len(pkts))
934         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
935                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
936                 ICMP(type=11) / packet[IP] for packet in capture]
937         self.pg1.add_stream(pkts)
938         self.pg_enable_capture(self.pg_interfaces)
939         self.pg_start()
940
941         # Client side - verify ICMP type 11 packets
942         capture = self.pg0.get_capture(len(pkts))
943         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
944
945     def test_dynamic_icmp_errors_out2in_ttl_2(self):
946         """ NAT44 handling of error responses to server packets with TTL=2 """
947
948         self.nat44_add_address(self.nat_addr)
949         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
950         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
951                                                   is_inside=0)
952
953         # Client side - create sessions
954         pkts = self.create_stream_in(self.pg0, self.pg1)
955         self.pg0.add_stream(pkts)
956         self.pg_enable_capture(self.pg_interfaces)
957         self.pg_start()
958
959         # Server side - generate traffic
960         capture = self.pg1.get_capture(len(pkts))
961         self.verify_capture_out(capture)
962         pkts = self.create_stream_out(self.pg1, ttl=2)
963         self.pg1.add_stream(pkts)
964         self.pg_enable_capture(self.pg_interfaces)
965         self.pg_start()
966
967         # Client side - simulate ICMP type 11 response
968         capture = self.pg0.get_capture(len(pkts))
969         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
970                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
971                 ICMP(type=11) / packet[IP] for packet in capture]
972         self.pg0.add_stream(pkts)
973         self.pg_enable_capture(self.pg_interfaces)
974         self.pg_start()
975
976         # Server side - verify ICMP type 11 packets
977         capture = self.pg1.get_capture(len(pkts))
978         self.verify_capture_out_with_icmp_errors(capture)
979
980     def test_ping_out_interface_from_outside(self):
981         """ Ping NAT44 out interface from outside network """
982
983         self.nat44_add_address(self.nat_addr)
984         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
985         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
986                                                   is_inside=0)
987
988         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
989              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
990              ICMP(id=self.icmp_id_out, type='echo-request'))
991         pkts = [p]
992         self.pg1.add_stream(pkts)
993         self.pg_enable_capture(self.pg_interfaces)
994         self.pg_start()
995         capture = self.pg1.get_capture(len(pkts))
996         self.assertEqual(1, len(capture))
997         packet = capture[0]
998         try:
999             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1000             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1001             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1002             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1003         except:
1004             self.logger.error(ppp("Unexpected or invalid packet "
1005                                   "(outside network):", packet))
1006             raise
1007
1008     def test_ping_internal_host_from_outside(self):
1009         """ Ping internal host from outside network """
1010
1011         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1012         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1013         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1014                                                   is_inside=0)
1015
1016         # out2in
1017         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1018                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1019                ICMP(id=self.icmp_id_out, type='echo-request'))
1020         self.pg1.add_stream(pkt)
1021         self.pg_enable_capture(self.pg_interfaces)
1022         self.pg_start()
1023         capture = self.pg0.get_capture(1)
1024         self.verify_capture_in(capture, self.pg0, packet_num=1)
1025         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1026
1027         # in2out
1028         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1029                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1030                ICMP(id=self.icmp_id_in, type='echo-reply'))
1031         self.pg0.add_stream(pkt)
1032         self.pg_enable_capture(self.pg_interfaces)
1033         self.pg_start()
1034         capture = self.pg1.get_capture(1)
1035         self.verify_capture_out(capture, same_port=True, packet_num=1)
1036         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1037
1038     def test_static_in(self):
1039         """ 1:1 NAT initialized from inside network """
1040
1041         nat_ip = "10.0.0.10"
1042         self.tcp_port_out = 6303
1043         self.udp_port_out = 6304
1044         self.icmp_id_out = 6305
1045
1046         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1047         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1048         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1049                                                   is_inside=0)
1050
1051         # in2out
1052         pkts = self.create_stream_in(self.pg0, self.pg1)
1053         self.pg0.add_stream(pkts)
1054         self.pg_enable_capture(self.pg_interfaces)
1055         self.pg_start()
1056         capture = self.pg1.get_capture(len(pkts))
1057         self.verify_capture_out(capture, nat_ip, True)
1058
1059         # out2in
1060         pkts = self.create_stream_out(self.pg1, nat_ip)
1061         self.pg1.add_stream(pkts)
1062         self.pg_enable_capture(self.pg_interfaces)
1063         self.pg_start()
1064         capture = self.pg0.get_capture(len(pkts))
1065         self.verify_capture_in(capture, self.pg0)
1066
1067     def test_static_out(self):
1068         """ 1:1 NAT initialized from outside network """
1069
1070         nat_ip = "10.0.0.20"
1071         self.tcp_port_out = 6303
1072         self.udp_port_out = 6304
1073         self.icmp_id_out = 6305
1074
1075         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1076         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1077         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1078                                                   is_inside=0)
1079
1080         # out2in
1081         pkts = self.create_stream_out(self.pg1, nat_ip)
1082         self.pg1.add_stream(pkts)
1083         self.pg_enable_capture(self.pg_interfaces)
1084         self.pg_start()
1085         capture = self.pg0.get_capture(len(pkts))
1086         self.verify_capture_in(capture, self.pg0)
1087
1088         # in2out
1089         pkts = self.create_stream_in(self.pg0, self.pg1)
1090         self.pg0.add_stream(pkts)
1091         self.pg_enable_capture(self.pg_interfaces)
1092         self.pg_start()
1093         capture = self.pg1.get_capture(len(pkts))
1094         self.verify_capture_out(capture, nat_ip, True)
1095
1096     def test_static_with_port_in(self):
1097         """ 1:1 NAPT initialized from inside network """
1098
1099         self.tcp_port_out = 3606
1100         self.udp_port_out = 3607
1101         self.icmp_id_out = 3608
1102
1103         self.nat44_add_address(self.nat_addr)
1104         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1105                                       self.tcp_port_in, self.tcp_port_out,
1106                                       proto=IP_PROTOS.tcp)
1107         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1108                                       self.udp_port_in, self.udp_port_out,
1109                                       proto=IP_PROTOS.udp)
1110         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1111                                       self.icmp_id_in, self.icmp_id_out,
1112                                       proto=IP_PROTOS.icmp)
1113         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1114         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1115                                                   is_inside=0)
1116
1117         # in2out
1118         pkts = self.create_stream_in(self.pg0, self.pg1)
1119         self.pg0.add_stream(pkts)
1120         self.pg_enable_capture(self.pg_interfaces)
1121         self.pg_start()
1122         capture = self.pg1.get_capture(len(pkts))
1123         self.verify_capture_out(capture)
1124
1125         # out2in
1126         pkts = self.create_stream_out(self.pg1)
1127         self.pg1.add_stream(pkts)
1128         self.pg_enable_capture(self.pg_interfaces)
1129         self.pg_start()
1130         capture = self.pg0.get_capture(len(pkts))
1131         self.verify_capture_in(capture, self.pg0)
1132
1133     def test_static_with_port_out(self):
1134         """ 1:1 NAPT initialized from outside network """
1135
1136         self.tcp_port_out = 30606
1137         self.udp_port_out = 30607
1138         self.icmp_id_out = 30608
1139
1140         self.nat44_add_address(self.nat_addr)
1141         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1142                                       self.tcp_port_in, self.tcp_port_out,
1143                                       proto=IP_PROTOS.tcp)
1144         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1145                                       self.udp_port_in, self.udp_port_out,
1146                                       proto=IP_PROTOS.udp)
1147         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1148                                       self.icmp_id_in, self.icmp_id_out,
1149                                       proto=IP_PROTOS.icmp)
1150         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1152                                                   is_inside=0)
1153
1154         # out2in
1155         pkts = self.create_stream_out(self.pg1)
1156         self.pg1.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159         capture = self.pg0.get_capture(len(pkts))
1160         self.verify_capture_in(capture, self.pg0)
1161
1162         # in2out
1163         pkts = self.create_stream_in(self.pg0, self.pg1)
1164         self.pg0.add_stream(pkts)
1165         self.pg_enable_capture(self.pg_interfaces)
1166         self.pg_start()
1167         capture = self.pg1.get_capture(len(pkts))
1168         self.verify_capture_out(capture)
1169
1170     def test_static_vrf_aware(self):
1171         """ 1:1 NAT VRF awareness """
1172
1173         nat_ip1 = "10.0.0.30"
1174         nat_ip2 = "10.0.0.40"
1175         self.tcp_port_out = 6303
1176         self.udp_port_out = 6304
1177         self.icmp_id_out = 6305
1178
1179         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1180                                       vrf_id=10)
1181         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1182                                       vrf_id=10)
1183         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1184                                                   is_inside=0)
1185         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1186         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1187
1188         # inside interface VRF match NAT44 static mapping VRF
1189         pkts = self.create_stream_in(self.pg4, self.pg3)
1190         self.pg4.add_stream(pkts)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193         capture = self.pg3.get_capture(len(pkts))
1194         self.verify_capture_out(capture, nat_ip1, True)
1195
1196         # inside interface VRF don't match NAT44 static mapping VRF (packets
1197         # are dropped)
1198         pkts = self.create_stream_in(self.pg0, self.pg3)
1199         self.pg0.add_stream(pkts)
1200         self.pg_enable_capture(self.pg_interfaces)
1201         self.pg_start()
1202         self.pg3.assert_nothing_captured()
1203
1204     def test_identity_nat(self):
1205         """ Identity NAT """
1206
1207         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1208         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1209         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1210                                                   is_inside=0)
1211
1212         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1213              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1214              TCP(sport=12345, dport=56789))
1215         self.pg1.add_stream(p)
1216         self.pg_enable_capture(self.pg_interfaces)
1217         self.pg_start()
1218         capture = self.pg0.get_capture(1)
1219         p = capture[0]
1220         try:
1221             ip = p[IP]
1222             tcp = p[TCP]
1223             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1224             self.assertEqual(ip.src, self.pg1.remote_ip4)
1225             self.assertEqual(tcp.dport, 56789)
1226             self.assertEqual(tcp.sport, 12345)
1227             self.check_tcp_checksum(p)
1228             self.check_ip_checksum(p)
1229         except:
1230             self.logger.error(ppp("Unexpected or invalid packet:", p))
1231             raise
1232
1233     def test_static_lb(self):
1234         """ NAT44 local service load balancing """
1235         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1236         external_port = 80
1237         local_port = 8080
1238         server1 = self.pg0.remote_hosts[0]
1239         server2 = self.pg0.remote_hosts[1]
1240
1241         locals = [{'addr': server1.ip4n,
1242                    'port': local_port,
1243                    'probability': 70},
1244                   {'addr': server2.ip4n,
1245                    'port': local_port,
1246                    'probability': 30}]
1247
1248         self.nat44_add_address(self.nat_addr)
1249         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1250                                                   external_port,
1251                                                   IP_PROTOS.tcp,
1252                                                   local_num=len(locals),
1253                                                   locals=locals)
1254         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1255         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1256                                                   is_inside=0)
1257
1258         # from client to service
1259         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1260              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1261              TCP(sport=12345, dport=external_port))
1262         self.pg1.add_stream(p)
1263         self.pg_enable_capture(self.pg_interfaces)
1264         self.pg_start()
1265         capture = self.pg0.get_capture(1)
1266         p = capture[0]
1267         server = None
1268         try:
1269             ip = p[IP]
1270             tcp = p[TCP]
1271             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1272             if ip.dst == server1.ip4:
1273                 server = server1
1274             else:
1275                 server = server2
1276             self.assertEqual(tcp.dport, local_port)
1277             self.check_tcp_checksum(p)
1278             self.check_ip_checksum(p)
1279         except:
1280             self.logger.error(ppp("Unexpected or invalid packet:", p))
1281             raise
1282
1283         # from service back to client
1284         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1285              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1286              TCP(sport=local_port, dport=12345))
1287         self.pg0.add_stream(p)
1288         self.pg_enable_capture(self.pg_interfaces)
1289         self.pg_start()
1290         capture = self.pg1.get_capture(1)
1291         p = capture[0]
1292         try:
1293             ip = p[IP]
1294             tcp = p[TCP]
1295             self.assertEqual(ip.src, self.nat_addr)
1296             self.assertEqual(tcp.sport, external_port)
1297             self.check_tcp_checksum(p)
1298             self.check_ip_checksum(p)
1299         except:
1300             self.logger.error(ppp("Unexpected or invalid packet:", p))
1301             raise
1302
1303         # multiple clients
1304         server1_n = 0
1305         server2_n = 0
1306         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1307         pkts = []
1308         for client in clients:
1309             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1310                  IP(src=client, dst=self.nat_addr) /
1311                  TCP(sport=12345, dport=external_port))
1312             pkts.append(p)
1313         self.pg1.add_stream(pkts)
1314         self.pg_enable_capture(self.pg_interfaces)
1315         self.pg_start()
1316         capture = self.pg0.get_capture(len(pkts))
1317         for p in capture:
1318             if p[IP].dst == server1.ip4:
1319                 server1_n += 1
1320             else:
1321                 server2_n += 1
1322         self.assertTrue(server1_n > server2_n)
1323
1324     def test_multiple_inside_interfaces(self):
1325         """ NAT44 multiple non-overlapping address space inside interfaces """
1326
1327         self.nat44_add_address(self.nat_addr)
1328         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1329         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1330         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1331                                                   is_inside=0)
1332
1333         # between two NAT44 inside interfaces (no translation)
1334         pkts = self.create_stream_in(self.pg0, self.pg1)
1335         self.pg0.add_stream(pkts)
1336         self.pg_enable_capture(self.pg_interfaces)
1337         self.pg_start()
1338         capture = self.pg1.get_capture(len(pkts))
1339         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1340
1341         # from NAT44 inside to interface without NAT44 feature (no translation)
1342         pkts = self.create_stream_in(self.pg0, self.pg2)
1343         self.pg0.add_stream(pkts)
1344         self.pg_enable_capture(self.pg_interfaces)
1345         self.pg_start()
1346         capture = self.pg2.get_capture(len(pkts))
1347         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1348
1349         # in2out 1st interface
1350         pkts = self.create_stream_in(self.pg0, self.pg3)
1351         self.pg0.add_stream(pkts)
1352         self.pg_enable_capture(self.pg_interfaces)
1353         self.pg_start()
1354         capture = self.pg3.get_capture(len(pkts))
1355         self.verify_capture_out(capture)
1356
1357         # out2in 1st interface
1358         pkts = self.create_stream_out(self.pg3)
1359         self.pg3.add_stream(pkts)
1360         self.pg_enable_capture(self.pg_interfaces)
1361         self.pg_start()
1362         capture = self.pg0.get_capture(len(pkts))
1363         self.verify_capture_in(capture, self.pg0)
1364
1365         # in2out 2nd interface
1366         pkts = self.create_stream_in(self.pg1, self.pg3)
1367         self.pg1.add_stream(pkts)
1368         self.pg_enable_capture(self.pg_interfaces)
1369         self.pg_start()
1370         capture = self.pg3.get_capture(len(pkts))
1371         self.verify_capture_out(capture)
1372
1373         # out2in 2nd interface
1374         pkts = self.create_stream_out(self.pg3)
1375         self.pg3.add_stream(pkts)
1376         self.pg_enable_capture(self.pg_interfaces)
1377         self.pg_start()
1378         capture = self.pg1.get_capture(len(pkts))
1379         self.verify_capture_in(capture, self.pg1)
1380
1381     def test_inside_overlapping_interfaces(self):
1382         """ NAT44 multiple inside interfaces with overlapping address space """
1383
1384         static_nat_ip = "10.0.0.10"
1385         self.nat44_add_address(self.nat_addr)
1386         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1387                                                   is_inside=0)
1388         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1389         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1390         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1391         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1392                                       vrf_id=20)
1393
1394         # between NAT44 inside interfaces with same VRF (no translation)
1395         pkts = self.create_stream_in(self.pg4, self.pg5)
1396         self.pg4.add_stream(pkts)
1397         self.pg_enable_capture(self.pg_interfaces)
1398         self.pg_start()
1399         capture = self.pg5.get_capture(len(pkts))
1400         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1401
1402         # between NAT44 inside interfaces with different VRF (hairpinning)
1403         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1404              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1405              TCP(sport=1234, dport=5678))
1406         self.pg4.add_stream(p)
1407         self.pg_enable_capture(self.pg_interfaces)
1408         self.pg_start()
1409         capture = self.pg6.get_capture(1)
1410         p = capture[0]
1411         try:
1412             ip = p[IP]
1413             tcp = p[TCP]
1414             self.assertEqual(ip.src, self.nat_addr)
1415             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1416             self.assertNotEqual(tcp.sport, 1234)
1417             self.assertEqual(tcp.dport, 5678)
1418         except:
1419             self.logger.error(ppp("Unexpected or invalid packet:", p))
1420             raise
1421
1422         # in2out 1st interface
1423         pkts = self.create_stream_in(self.pg4, self.pg3)
1424         self.pg4.add_stream(pkts)
1425         self.pg_enable_capture(self.pg_interfaces)
1426         self.pg_start()
1427         capture = self.pg3.get_capture(len(pkts))
1428         self.verify_capture_out(capture)
1429
1430         # out2in 1st interface
1431         pkts = self.create_stream_out(self.pg3)
1432         self.pg3.add_stream(pkts)
1433         self.pg_enable_capture(self.pg_interfaces)
1434         self.pg_start()
1435         capture = self.pg4.get_capture(len(pkts))
1436         self.verify_capture_in(capture, self.pg4)
1437
1438         # in2out 2nd interface
1439         pkts = self.create_stream_in(self.pg5, self.pg3)
1440         self.pg5.add_stream(pkts)
1441         self.pg_enable_capture(self.pg_interfaces)
1442         self.pg_start()
1443         capture = self.pg3.get_capture(len(pkts))
1444         self.verify_capture_out(capture)
1445
1446         # out2in 2nd interface
1447         pkts = self.create_stream_out(self.pg3)
1448         self.pg3.add_stream(pkts)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg5.get_capture(len(pkts))
1452         self.verify_capture_in(capture, self.pg5)
1453
1454         # pg5 session dump
1455         addresses = self.vapi.nat44_address_dump()
1456         self.assertEqual(len(addresses), 1)
1457         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1458         self.assertEqual(len(sessions), 3)
1459         for session in sessions:
1460             self.assertFalse(session.is_static)
1461             self.assertEqual(session.inside_ip_address[0:4],
1462                              self.pg5.remote_ip4n)
1463             self.assertEqual(session.outside_ip_address,
1464                              addresses[0].ip_address)
1465         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1466         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1467         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1468         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1469         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1470         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1471         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1472         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1473         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1474
1475         # in2out 3rd interface
1476         pkts = self.create_stream_in(self.pg6, self.pg3)
1477         self.pg6.add_stream(pkts)
1478         self.pg_enable_capture(self.pg_interfaces)
1479         self.pg_start()
1480         capture = self.pg3.get_capture(len(pkts))
1481         self.verify_capture_out(capture, static_nat_ip, True)
1482
1483         # out2in 3rd interface
1484         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1485         self.pg3.add_stream(pkts)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg6.get_capture(len(pkts))
1489         self.verify_capture_in(capture, self.pg6)
1490
1491         # general user and session dump verifications
1492         users = self.vapi.nat44_user_dump()
1493         self.assertTrue(len(users) >= 3)
1494         addresses = self.vapi.nat44_address_dump()
1495         self.assertEqual(len(addresses), 1)
1496         for user in users:
1497             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1498                                                          user.vrf_id)
1499             for session in sessions:
1500                 self.assertEqual(user.ip_address, session.inside_ip_address)
1501                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1502                 self.assertTrue(session.protocol in
1503                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1504                                  IP_PROTOS.icmp])
1505
1506         # pg4 session dump
1507         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1508         self.assertTrue(len(sessions) >= 4)
1509         for session in sessions:
1510             self.assertFalse(session.is_static)
1511             self.assertEqual(session.inside_ip_address[0:4],
1512                              self.pg4.remote_ip4n)
1513             self.assertEqual(session.outside_ip_address,
1514                              addresses[0].ip_address)
1515
1516         # pg6 session dump
1517         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1518         self.assertTrue(len(sessions) >= 3)
1519         for session in sessions:
1520             self.assertTrue(session.is_static)
1521             self.assertEqual(session.inside_ip_address[0:4],
1522                              self.pg6.remote_ip4n)
1523             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1524                              map(int, static_nat_ip.split('.')))
1525             self.assertTrue(session.inside_port in
1526                             [self.tcp_port_in, self.udp_port_in,
1527                              self.icmp_id_in])
1528
1529     def test_hairpinning(self):
1530         """ NAT44 hairpinning - 1:1 NAPT """
1531
1532         host = self.pg0.remote_hosts[0]
1533         server = self.pg0.remote_hosts[1]
1534         host_in_port = 1234
1535         host_out_port = 0
1536         server_in_port = 5678
1537         server_out_port = 8765
1538
1539         self.nat44_add_address(self.nat_addr)
1540         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1542                                                   is_inside=0)
1543         # add static mapping for server
1544         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1545                                       server_in_port, server_out_port,
1546                                       proto=IP_PROTOS.tcp)
1547
1548         # send packet from host to server
1549         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1550              IP(src=host.ip4, dst=self.nat_addr) /
1551              TCP(sport=host_in_port, dport=server_out_port))
1552         self.pg0.add_stream(p)
1553         self.pg_enable_capture(self.pg_interfaces)
1554         self.pg_start()
1555         capture = self.pg0.get_capture(1)
1556         p = capture[0]
1557         try:
1558             ip = p[IP]
1559             tcp = p[TCP]
1560             self.assertEqual(ip.src, self.nat_addr)
1561             self.assertEqual(ip.dst, server.ip4)
1562             self.assertNotEqual(tcp.sport, host_in_port)
1563             self.assertEqual(tcp.dport, server_in_port)
1564             self.check_tcp_checksum(p)
1565             host_out_port = tcp.sport
1566         except:
1567             self.logger.error(ppp("Unexpected or invalid packet:", p))
1568             raise
1569
1570         # send reply from server to host
1571         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1572              IP(src=server.ip4, dst=self.nat_addr) /
1573              TCP(sport=server_in_port, dport=host_out_port))
1574         self.pg0.add_stream(p)
1575         self.pg_enable_capture(self.pg_interfaces)
1576         self.pg_start()
1577         capture = self.pg0.get_capture(1)
1578         p = capture[0]
1579         try:
1580             ip = p[IP]
1581             tcp = p[TCP]
1582             self.assertEqual(ip.src, self.nat_addr)
1583             self.assertEqual(ip.dst, host.ip4)
1584             self.assertEqual(tcp.sport, server_out_port)
1585             self.assertEqual(tcp.dport, host_in_port)
1586             self.check_tcp_checksum(p)
1587         except:
1588             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1589             raise
1590
1591     def test_hairpinning2(self):
1592         """ NAT44 hairpinning - 1:1 NAT"""
1593
1594         server1_nat_ip = "10.0.0.10"
1595         server2_nat_ip = "10.0.0.11"
1596         host = self.pg0.remote_hosts[0]
1597         server1 = self.pg0.remote_hosts[1]
1598         server2 = self.pg0.remote_hosts[2]
1599         server_tcp_port = 22
1600         server_udp_port = 20
1601
1602         self.nat44_add_address(self.nat_addr)
1603         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1604         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1605                                                   is_inside=0)
1606
1607         # add static mapping for servers
1608         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1609         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1610
1611         # host to server1
1612         pkts = []
1613         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614              IP(src=host.ip4, dst=server1_nat_ip) /
1615              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1616         pkts.append(p)
1617         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618              IP(src=host.ip4, dst=server1_nat_ip) /
1619              UDP(sport=self.udp_port_in, dport=server_udp_port))
1620         pkts.append(p)
1621         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622              IP(src=host.ip4, dst=server1_nat_ip) /
1623              ICMP(id=self.icmp_id_in, type='echo-request'))
1624         pkts.append(p)
1625         self.pg0.add_stream(pkts)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg0.get_capture(len(pkts))
1629         for packet in capture:
1630             try:
1631                 self.assertEqual(packet[IP].src, self.nat_addr)
1632                 self.assertEqual(packet[IP].dst, server1.ip4)
1633                 if packet.haslayer(TCP):
1634                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1635                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1636                     self.tcp_port_out = packet[TCP].sport
1637                     self.check_tcp_checksum(packet)
1638                 elif packet.haslayer(UDP):
1639                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1640                     self.assertEqual(packet[UDP].dport, server_udp_port)
1641                     self.udp_port_out = packet[UDP].sport
1642                 else:
1643                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1644                     self.icmp_id_out = packet[ICMP].id
1645             except:
1646                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1647                 raise
1648
1649         # server1 to host
1650         pkts = []
1651         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1652              IP(src=server1.ip4, dst=self.nat_addr) /
1653              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1654         pkts.append(p)
1655         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1656              IP(src=server1.ip4, dst=self.nat_addr) /
1657              UDP(sport=server_udp_port, dport=self.udp_port_out))
1658         pkts.append(p)
1659         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1660              IP(src=server1.ip4, dst=self.nat_addr) /
1661              ICMP(id=self.icmp_id_out, type='echo-reply'))
1662         pkts.append(p)
1663         self.pg0.add_stream(pkts)
1664         self.pg_enable_capture(self.pg_interfaces)
1665         self.pg_start()
1666         capture = self.pg0.get_capture(len(pkts))
1667         for packet in capture:
1668             try:
1669                 self.assertEqual(packet[IP].src, server1_nat_ip)
1670                 self.assertEqual(packet[IP].dst, host.ip4)
1671                 if packet.haslayer(TCP):
1672                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1673                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1674                     self.check_tcp_checksum(packet)
1675                 elif packet.haslayer(UDP):
1676                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1677                     self.assertEqual(packet[UDP].sport, server_udp_port)
1678                 else:
1679                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1680             except:
1681                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1682                 raise
1683
1684         # server2 to server1
1685         pkts = []
1686         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687              IP(src=server2.ip4, dst=server1_nat_ip) /
1688              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1689         pkts.append(p)
1690         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691              IP(src=server2.ip4, dst=server1_nat_ip) /
1692              UDP(sport=self.udp_port_in, dport=server_udp_port))
1693         pkts.append(p)
1694         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695              IP(src=server2.ip4, dst=server1_nat_ip) /
1696              ICMP(id=self.icmp_id_in, type='echo-request'))
1697         pkts.append(p)
1698         self.pg0.add_stream(pkts)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         capture = self.pg0.get_capture(len(pkts))
1702         for packet in capture:
1703             try:
1704                 self.assertEqual(packet[IP].src, server2_nat_ip)
1705                 self.assertEqual(packet[IP].dst, server1.ip4)
1706                 if packet.haslayer(TCP):
1707                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1708                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1709                     self.tcp_port_out = packet[TCP].sport
1710                     self.check_tcp_checksum(packet)
1711                 elif packet.haslayer(UDP):
1712                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1713                     self.assertEqual(packet[UDP].dport, server_udp_port)
1714                     self.udp_port_out = packet[UDP].sport
1715                 else:
1716                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1717                     self.icmp_id_out = packet[ICMP].id
1718             except:
1719                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1720                 raise
1721
1722         # server1 to server2
1723         pkts = []
1724         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1725              IP(src=server1.ip4, dst=server2_nat_ip) /
1726              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1727         pkts.append(p)
1728         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1729              IP(src=server1.ip4, dst=server2_nat_ip) /
1730              UDP(sport=server_udp_port, dport=self.udp_port_out))
1731         pkts.append(p)
1732         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733              IP(src=server1.ip4, dst=server2_nat_ip) /
1734              ICMP(id=self.icmp_id_out, type='echo-reply'))
1735         pkts.append(p)
1736         self.pg0.add_stream(pkts)
1737         self.pg_enable_capture(self.pg_interfaces)
1738         self.pg_start()
1739         capture = self.pg0.get_capture(len(pkts))
1740         for packet in capture:
1741             try:
1742                 self.assertEqual(packet[IP].src, server1_nat_ip)
1743                 self.assertEqual(packet[IP].dst, server2.ip4)
1744                 if packet.haslayer(TCP):
1745                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1746                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1747                     self.check_tcp_checksum(packet)
1748                 elif packet.haslayer(UDP):
1749                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1750                     self.assertEqual(packet[UDP].sport, server_udp_port)
1751                 else:
1752                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1753             except:
1754                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1755                 raise
1756
1757     def test_max_translations_per_user(self):
1758         """ MAX translations per user - recycle the least recently used """
1759
1760         self.nat44_add_address(self.nat_addr)
1761         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1762         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1763                                                   is_inside=0)
1764
1765         # get maximum number of translations per user
1766         nat44_config = self.vapi.nat_show_config()
1767
1768         # send more than maximum number of translations per user packets
1769         pkts_num = nat44_config.max_translations_per_user + 5
1770         pkts = []
1771         for port in range(0, pkts_num):
1772             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1773                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1774                  TCP(sport=1025 + port))
1775             pkts.append(p)
1776         self.pg0.add_stream(pkts)
1777         self.pg_enable_capture(self.pg_interfaces)
1778         self.pg_start()
1779
1780         # verify number of translated packet
1781         self.pg1.get_capture(pkts_num)
1782
1783     def test_interface_addr(self):
1784         """ Acquire NAT44 addresses from interface """
1785         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1786
1787         # no address in NAT pool
1788         adresses = self.vapi.nat44_address_dump()
1789         self.assertEqual(0, len(adresses))
1790
1791         # configure interface address and check NAT address pool
1792         self.pg7.config_ip4()
1793         adresses = self.vapi.nat44_address_dump()
1794         self.assertEqual(1, len(adresses))
1795         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1796
1797         # remove interface address and check NAT address pool
1798         self.pg7.unconfig_ip4()
1799         adresses = self.vapi.nat44_address_dump()
1800         self.assertEqual(0, len(adresses))
1801
1802     def test_interface_addr_static_mapping(self):
1803         """ Static mapping with addresses from interface """
1804         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1805         self.nat44_add_static_mapping(
1806             '1.2.3.4',
1807             external_sw_if_index=self.pg7.sw_if_index)
1808
1809         # static mappings with external interface
1810         static_mappings = self.vapi.nat44_static_mapping_dump()
1811         self.assertEqual(1, len(static_mappings))
1812         self.assertEqual(self.pg7.sw_if_index,
1813                          static_mappings[0].external_sw_if_index)
1814
1815         # configure interface address and check static mappings
1816         self.pg7.config_ip4()
1817         static_mappings = self.vapi.nat44_static_mapping_dump()
1818         self.assertEqual(1, len(static_mappings))
1819         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1820                          self.pg7.local_ip4n)
1821         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1822
1823         # remove interface address and check static mappings
1824         self.pg7.unconfig_ip4()
1825         static_mappings = self.vapi.nat44_static_mapping_dump()
1826         self.assertEqual(0, len(static_mappings))
1827
1828     def test_interface_addr_identity_nat(self):
1829         """ Identity NAT with addresses from interface """
1830
1831         port = 53053
1832         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1833         self.vapi.nat44_add_del_identity_mapping(
1834             sw_if_index=self.pg7.sw_if_index,
1835             port=port,
1836             protocol=IP_PROTOS.tcp,
1837             addr_only=0)
1838
1839         # identity mappings with external interface
1840         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1841         self.assertEqual(1, len(identity_mappings))
1842         self.assertEqual(self.pg7.sw_if_index,
1843                          identity_mappings[0].sw_if_index)
1844
1845         # configure interface address and check identity mappings
1846         self.pg7.config_ip4()
1847         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1848         self.assertEqual(1, len(identity_mappings))
1849         self.assertEqual(identity_mappings[0].ip_address,
1850                          self.pg7.local_ip4n)
1851         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1852         self.assertEqual(port, identity_mappings[0].port)
1853         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1854
1855         # remove interface address and check identity mappings
1856         self.pg7.unconfig_ip4()
1857         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1858         self.assertEqual(0, len(identity_mappings))
1859
1860     def test_ipfix_nat44_sess(self):
1861         """ IPFIX logging NAT44 session created/delted """
1862         self.ipfix_domain_id = 10
1863         self.ipfix_src_port = 20202
1864         colector_port = 30303
1865         bind_layers(UDP, IPFIX, dport=30303)
1866         self.nat44_add_address(self.nat_addr)
1867         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1868         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1869                                                   is_inside=0)
1870         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1871                                      src_address=self.pg3.local_ip4n,
1872                                      path_mtu=512,
1873                                      template_interval=10,
1874                                      collector_port=colector_port)
1875         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1876                             src_port=self.ipfix_src_port)
1877
1878         pkts = self.create_stream_in(self.pg0, self.pg1)
1879         self.pg0.add_stream(pkts)
1880         self.pg_enable_capture(self.pg_interfaces)
1881         self.pg_start()
1882         capture = self.pg1.get_capture(len(pkts))
1883         self.verify_capture_out(capture)
1884         self.nat44_add_address(self.nat_addr, is_add=0)
1885         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1886         capture = self.pg3.get_capture(3)
1887         ipfix = IPFIXDecoder()
1888         # first load template
1889         for p in capture:
1890             self.assertTrue(p.haslayer(IPFIX))
1891             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1892             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1893             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1894             self.assertEqual(p[UDP].dport, colector_port)
1895             self.assertEqual(p[IPFIX].observationDomainID,
1896                              self.ipfix_domain_id)
1897             if p.haslayer(Template):
1898                 ipfix.add_template(p.getlayer(Template))
1899         # verify events in data set
1900         for p in capture:
1901             if p.haslayer(Data):
1902                 data = ipfix.decode_data_set(p.getlayer(Set))
1903                 self.verify_ipfix_nat44_ses(data)
1904
1905     def test_ipfix_addr_exhausted(self):
1906         """ IPFIX logging NAT addresses exhausted """
1907         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1908         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1909                                                   is_inside=0)
1910         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1911                                      src_address=self.pg3.local_ip4n,
1912                                      path_mtu=512,
1913                                      template_interval=10)
1914         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1915                             src_port=self.ipfix_src_port)
1916
1917         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1918              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1919              TCP(sport=3025))
1920         self.pg0.add_stream(p)
1921         self.pg_enable_capture(self.pg_interfaces)
1922         self.pg_start()
1923         capture = self.pg1.get_capture(0)
1924         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1925         capture = self.pg3.get_capture(3)
1926         ipfix = IPFIXDecoder()
1927         # first load template
1928         for p in capture:
1929             self.assertTrue(p.haslayer(IPFIX))
1930             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1931             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1932             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1933             self.assertEqual(p[UDP].dport, 4739)
1934             self.assertEqual(p[IPFIX].observationDomainID,
1935                              self.ipfix_domain_id)
1936             if p.haslayer(Template):
1937                 ipfix.add_template(p.getlayer(Template))
1938         # verify events in data set
1939         for p in capture:
1940             if p.haslayer(Data):
1941                 data = ipfix.decode_data_set(p.getlayer(Set))
1942                 self.verify_ipfix_addr_exhausted(data)
1943
1944     def test_pool_addr_fib(self):
1945         """ NAT44 add pool addresses to FIB """
1946         static_addr = '10.0.0.10'
1947         self.nat44_add_address(self.nat_addr)
1948         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1949         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1950                                                   is_inside=0)
1951         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1952
1953         # NAT44 address
1954         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1955              ARP(op=ARP.who_has, pdst=self.nat_addr,
1956                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1957         self.pg1.add_stream(p)
1958         self.pg_enable_capture(self.pg_interfaces)
1959         self.pg_start()
1960         capture = self.pg1.get_capture(1)
1961         self.assertTrue(capture[0].haslayer(ARP))
1962         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1963
1964         # 1:1 NAT address
1965         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1966              ARP(op=ARP.who_has, pdst=static_addr,
1967                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1968         self.pg1.add_stream(p)
1969         self.pg_enable_capture(self.pg_interfaces)
1970         self.pg_start()
1971         capture = self.pg1.get_capture(1)
1972         self.assertTrue(capture[0].haslayer(ARP))
1973         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1974
1975         # send ARP to non-NAT44 interface
1976         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1977              ARP(op=ARP.who_has, pdst=self.nat_addr,
1978                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1979         self.pg2.add_stream(p)
1980         self.pg_enable_capture(self.pg_interfaces)
1981         self.pg_start()
1982         capture = self.pg1.get_capture(0)
1983
1984         # remove addresses and verify
1985         self.nat44_add_address(self.nat_addr, is_add=0)
1986         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1987                                       is_add=0)
1988
1989         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1990              ARP(op=ARP.who_has, pdst=self.nat_addr,
1991                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1992         self.pg1.add_stream(p)
1993         self.pg_enable_capture(self.pg_interfaces)
1994         self.pg_start()
1995         capture = self.pg1.get_capture(0)
1996
1997         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1998              ARP(op=ARP.who_has, pdst=static_addr,
1999                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2000         self.pg1.add_stream(p)
2001         self.pg_enable_capture(self.pg_interfaces)
2002         self.pg_start()
2003         capture = self.pg1.get_capture(0)
2004
2005     def test_vrf_mode(self):
2006         """ NAT44 tenant VRF aware address pool mode """
2007
2008         vrf_id1 = 1
2009         vrf_id2 = 2
2010         nat_ip1 = "10.0.0.10"
2011         nat_ip2 = "10.0.0.11"
2012
2013         self.pg0.unconfig_ip4()
2014         self.pg1.unconfig_ip4()
2015         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2016         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2017         self.pg0.set_table_ip4(vrf_id1)
2018         self.pg1.set_table_ip4(vrf_id2)
2019         self.pg0.config_ip4()
2020         self.pg1.config_ip4()
2021
2022         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2023         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2024         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2025         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2026         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2027                                                   is_inside=0)
2028
2029         # first VRF
2030         pkts = self.create_stream_in(self.pg0, self.pg2)
2031         self.pg0.add_stream(pkts)
2032         self.pg_enable_capture(self.pg_interfaces)
2033         self.pg_start()
2034         capture = self.pg2.get_capture(len(pkts))
2035         self.verify_capture_out(capture, nat_ip1)
2036
2037         # second VRF
2038         pkts = self.create_stream_in(self.pg1, self.pg2)
2039         self.pg1.add_stream(pkts)
2040         self.pg_enable_capture(self.pg_interfaces)
2041         self.pg_start()
2042         capture = self.pg2.get_capture(len(pkts))
2043         self.verify_capture_out(capture, nat_ip2)
2044
2045         self.pg0.unconfig_ip4()
2046         self.pg1.unconfig_ip4()
2047         self.pg0.set_table_ip4(0)
2048         self.pg1.set_table_ip4(0)
2049         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2050         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2051
2052     def test_vrf_feature_independent(self):
2053         """ NAT44 tenant VRF independent address pool mode """
2054
2055         nat_ip1 = "10.0.0.10"
2056         nat_ip2 = "10.0.0.11"
2057
2058         self.nat44_add_address(nat_ip1)
2059         self.nat44_add_address(nat_ip2, vrf_id=99)
2060         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2061         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2062         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2063                                                   is_inside=0)
2064
2065         # first VRF
2066         pkts = self.create_stream_in(self.pg0, self.pg2)
2067         self.pg0.add_stream(pkts)
2068         self.pg_enable_capture(self.pg_interfaces)
2069         self.pg_start()
2070         capture = self.pg2.get_capture(len(pkts))
2071         self.verify_capture_out(capture, nat_ip1)
2072
2073         # second VRF
2074         pkts = self.create_stream_in(self.pg1, self.pg2)
2075         self.pg1.add_stream(pkts)
2076         self.pg_enable_capture(self.pg_interfaces)
2077         self.pg_start()
2078         capture = self.pg2.get_capture(len(pkts))
2079         self.verify_capture_out(capture, nat_ip1)
2080
2081     def test_dynamic_ipless_interfaces(self):
2082         """ NAT44 interfaces without configured IP address """
2083
2084         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2085                                       mactobinary(self.pg7.remote_mac),
2086                                       self.pg7.remote_ip4n,
2087                                       is_static=1)
2088         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2089                                       mactobinary(self.pg8.remote_mac),
2090                                       self.pg8.remote_ip4n,
2091                                       is_static=1)
2092
2093         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2094                                    dst_address_length=32,
2095                                    next_hop_address=self.pg7.remote_ip4n,
2096                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2097         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2098                                    dst_address_length=32,
2099                                    next_hop_address=self.pg8.remote_ip4n,
2100                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2101
2102         self.nat44_add_address(self.nat_addr)
2103         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2104         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2105                                                   is_inside=0)
2106
2107         # in2out
2108         pkts = self.create_stream_in(self.pg7, self.pg8)
2109         self.pg7.add_stream(pkts)
2110         self.pg_enable_capture(self.pg_interfaces)
2111         self.pg_start()
2112         capture = self.pg8.get_capture(len(pkts))
2113         self.verify_capture_out(capture)
2114
2115         # out2in
2116         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2117         self.pg8.add_stream(pkts)
2118         self.pg_enable_capture(self.pg_interfaces)
2119         self.pg_start()
2120         capture = self.pg7.get_capture(len(pkts))
2121         self.verify_capture_in(capture, self.pg7)
2122
2123     def test_static_ipless_interfaces(self):
2124         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2125
2126         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2127                                       mactobinary(self.pg7.remote_mac),
2128                                       self.pg7.remote_ip4n,
2129                                       is_static=1)
2130         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2131                                       mactobinary(self.pg8.remote_mac),
2132                                       self.pg8.remote_ip4n,
2133                                       is_static=1)
2134
2135         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2136                                    dst_address_length=32,
2137                                    next_hop_address=self.pg7.remote_ip4n,
2138                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2139         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2140                                    dst_address_length=32,
2141                                    next_hop_address=self.pg8.remote_ip4n,
2142                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2143
2144         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2145         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2146         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2147                                                   is_inside=0)
2148
2149         # out2in
2150         pkts = self.create_stream_out(self.pg8)
2151         self.pg8.add_stream(pkts)
2152         self.pg_enable_capture(self.pg_interfaces)
2153         self.pg_start()
2154         capture = self.pg7.get_capture(len(pkts))
2155         self.verify_capture_in(capture, self.pg7)
2156
2157         # in2out
2158         pkts = self.create_stream_in(self.pg7, self.pg8)
2159         self.pg7.add_stream(pkts)
2160         self.pg_enable_capture(self.pg_interfaces)
2161         self.pg_start()
2162         capture = self.pg8.get_capture(len(pkts))
2163         self.verify_capture_out(capture, self.nat_addr, True)
2164
2165     def test_static_with_port_ipless_interfaces(self):
2166         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2167
2168         self.tcp_port_out = 30606
2169         self.udp_port_out = 30607
2170         self.icmp_id_out = 30608
2171
2172         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2173                                       mactobinary(self.pg7.remote_mac),
2174                                       self.pg7.remote_ip4n,
2175                                       is_static=1)
2176         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2177                                       mactobinary(self.pg8.remote_mac),
2178                                       self.pg8.remote_ip4n,
2179                                       is_static=1)
2180
2181         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2182                                    dst_address_length=32,
2183                                    next_hop_address=self.pg7.remote_ip4n,
2184                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2185         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2186                                    dst_address_length=32,
2187                                    next_hop_address=self.pg8.remote_ip4n,
2188                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2189
2190         self.nat44_add_address(self.nat_addr)
2191         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2192                                       self.tcp_port_in, self.tcp_port_out,
2193                                       proto=IP_PROTOS.tcp)
2194         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2195                                       self.udp_port_in, self.udp_port_out,
2196                                       proto=IP_PROTOS.udp)
2197         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2198                                       self.icmp_id_in, self.icmp_id_out,
2199                                       proto=IP_PROTOS.icmp)
2200         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2201         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2202                                                   is_inside=0)
2203
2204         # out2in
2205         pkts = self.create_stream_out(self.pg8)
2206         self.pg8.add_stream(pkts)
2207         self.pg_enable_capture(self.pg_interfaces)
2208         self.pg_start()
2209         capture = self.pg7.get_capture(len(pkts))
2210         self.verify_capture_in(capture, self.pg7)
2211
2212         # in2out
2213         pkts = self.create_stream_in(self.pg7, self.pg8)
2214         self.pg7.add_stream(pkts)
2215         self.pg_enable_capture(self.pg_interfaces)
2216         self.pg_start()
2217         capture = self.pg8.get_capture(len(pkts))
2218         self.verify_capture_out(capture)
2219
2220     def test_static_unknown_proto(self):
2221         """ 1:1 NAT translate packet with unknown protocol """
2222         nat_ip = "10.0.0.10"
2223         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2224         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2225         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2226                                                   is_inside=0)
2227
2228         # in2out
2229         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2230              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2231              GRE() /
2232              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2233              TCP(sport=1234, dport=1234))
2234         self.pg0.add_stream(p)
2235         self.pg_enable_capture(self.pg_interfaces)
2236         self.pg_start()
2237         p = self.pg1.get_capture(1)
2238         packet = p[0]
2239         try:
2240             self.assertEqual(packet[IP].src, nat_ip)
2241             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2242             self.assertTrue(packet.haslayer(GRE))
2243             self.check_ip_checksum(packet)
2244         except:
2245             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2246             raise
2247
2248         # out2in
2249         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2250              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2251              GRE() /
2252              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2253              TCP(sport=1234, dport=1234))
2254         self.pg1.add_stream(p)
2255         self.pg_enable_capture(self.pg_interfaces)
2256         self.pg_start()
2257         p = self.pg0.get_capture(1)
2258         packet = p[0]
2259         try:
2260             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2261             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2262             self.assertTrue(packet.haslayer(GRE))
2263             self.check_ip_checksum(packet)
2264         except:
2265             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2266             raise
2267
2268     def test_hairpinning_static_unknown_proto(self):
2269         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2270
2271         host = self.pg0.remote_hosts[0]
2272         server = self.pg0.remote_hosts[1]
2273
2274         host_nat_ip = "10.0.0.10"
2275         server_nat_ip = "10.0.0.11"
2276
2277         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2278         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2279         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2280         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2281                                                   is_inside=0)
2282
2283         # host to server
2284         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2285              IP(src=host.ip4, dst=server_nat_ip) /
2286              GRE() /
2287              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2288              TCP(sport=1234, dport=1234))
2289         self.pg0.add_stream(p)
2290         self.pg_enable_capture(self.pg_interfaces)
2291         self.pg_start()
2292         p = self.pg0.get_capture(1)
2293         packet = p[0]
2294         try:
2295             self.assertEqual(packet[IP].src, host_nat_ip)
2296             self.assertEqual(packet[IP].dst, server.ip4)
2297             self.assertTrue(packet.haslayer(GRE))
2298             self.check_ip_checksum(packet)
2299         except:
2300             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2301             raise
2302
2303         # server to host
2304         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2305              IP(src=server.ip4, dst=host_nat_ip) /
2306              GRE() /
2307              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2308              TCP(sport=1234, dport=1234))
2309         self.pg0.add_stream(p)
2310         self.pg_enable_capture(self.pg_interfaces)
2311         self.pg_start()
2312         p = self.pg0.get_capture(1)
2313         packet = p[0]
2314         try:
2315             self.assertEqual(packet[IP].src, server_nat_ip)
2316             self.assertEqual(packet[IP].dst, host.ip4)
2317             self.assertTrue(packet.haslayer(GRE))
2318             self.check_ip_checksum(packet)
2319         except:
2320             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2321             raise
2322
2323     def test_unknown_proto(self):
2324         """ NAT44 translate packet with unknown protocol """
2325         self.nat44_add_address(self.nat_addr)
2326         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2327         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2328                                                   is_inside=0)
2329
2330         # in2out
2331         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2332              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2333              TCP(sport=self.tcp_port_in, dport=20))
2334         self.pg0.add_stream(p)
2335         self.pg_enable_capture(self.pg_interfaces)
2336         self.pg_start()
2337         p = self.pg1.get_capture(1)
2338
2339         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2340              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2341              GRE() /
2342              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2343              TCP(sport=1234, dport=1234))
2344         self.pg0.add_stream(p)
2345         self.pg_enable_capture(self.pg_interfaces)
2346         self.pg_start()
2347         p = self.pg1.get_capture(1)
2348         packet = p[0]
2349         try:
2350             self.assertEqual(packet[IP].src, self.nat_addr)
2351             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2352             self.assertTrue(packet.haslayer(GRE))
2353             self.check_ip_checksum(packet)
2354         except:
2355             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2356             raise
2357
2358         # out2in
2359         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2360              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2361              GRE() /
2362              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2363              TCP(sport=1234, dport=1234))
2364         self.pg1.add_stream(p)
2365         self.pg_enable_capture(self.pg_interfaces)
2366         self.pg_start()
2367         p = self.pg0.get_capture(1)
2368         packet = p[0]
2369         try:
2370             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2371             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2372             self.assertTrue(packet.haslayer(GRE))
2373             self.check_ip_checksum(packet)
2374         except:
2375             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2376             raise
2377
2378     def test_hairpinning_unknown_proto(self):
2379         """ NAT44 translate packet with unknown protocol - hairpinning """
2380         host = self.pg0.remote_hosts[0]
2381         server = self.pg0.remote_hosts[1]
2382         host_in_port = 1234
2383         host_out_port = 0
2384         server_in_port = 5678
2385         server_out_port = 8765
2386         server_nat_ip = "10.0.0.11"
2387
2388         self.nat44_add_address(self.nat_addr)
2389         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2390         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2391                                                   is_inside=0)
2392
2393         # add static mapping for server
2394         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2395
2396         # host to server
2397         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2398              IP(src=host.ip4, dst=server_nat_ip) /
2399              TCP(sport=host_in_port, dport=server_out_port))
2400         self.pg0.add_stream(p)
2401         self.pg_enable_capture(self.pg_interfaces)
2402         self.pg_start()
2403         capture = self.pg0.get_capture(1)
2404
2405         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2406              IP(src=host.ip4, dst=server_nat_ip) /
2407              GRE() /
2408              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2409              TCP(sport=1234, dport=1234))
2410         self.pg0.add_stream(p)
2411         self.pg_enable_capture(self.pg_interfaces)
2412         self.pg_start()
2413         p = self.pg0.get_capture(1)
2414         packet = p[0]
2415         try:
2416             self.assertEqual(packet[IP].src, self.nat_addr)
2417             self.assertEqual(packet[IP].dst, server.ip4)
2418             self.assertTrue(packet.haslayer(GRE))
2419             self.check_ip_checksum(packet)
2420         except:
2421             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2422             raise
2423
2424         # server to host
2425         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2426              IP(src=server.ip4, dst=self.nat_addr) /
2427              GRE() /
2428              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2429              TCP(sport=1234, dport=1234))
2430         self.pg0.add_stream(p)
2431         self.pg_enable_capture(self.pg_interfaces)
2432         self.pg_start()
2433         p = self.pg0.get_capture(1)
2434         packet = p[0]
2435         try:
2436             self.assertEqual(packet[IP].src, server_nat_ip)
2437             self.assertEqual(packet[IP].dst, host.ip4)
2438             self.assertTrue(packet.haslayer(GRE))
2439             self.check_ip_checksum(packet)
2440         except:
2441             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2442             raise
2443
2444     def test_output_feature(self):
2445         """ NAT44 interface output feature (in2out postrouting) """
2446         self.nat44_add_address(self.nat_addr)
2447         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2448         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2449         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2450                                                          is_inside=0)
2451
2452         # in2out
2453         pkts = self.create_stream_in(self.pg0, self.pg3)
2454         self.pg0.add_stream(pkts)
2455         self.pg_enable_capture(self.pg_interfaces)
2456         self.pg_start()
2457         capture = self.pg3.get_capture(len(pkts))
2458         self.verify_capture_out(capture)
2459
2460         # out2in
2461         pkts = self.create_stream_out(self.pg3)
2462         self.pg3.add_stream(pkts)
2463         self.pg_enable_capture(self.pg_interfaces)
2464         self.pg_start()
2465         capture = self.pg0.get_capture(len(pkts))
2466         self.verify_capture_in(capture, self.pg0)
2467
2468         # from non-NAT interface to NAT inside interface
2469         pkts = self.create_stream_in(self.pg2, self.pg0)
2470         self.pg2.add_stream(pkts)
2471         self.pg_enable_capture(self.pg_interfaces)
2472         self.pg_start()
2473         capture = self.pg0.get_capture(len(pkts))
2474         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2475
2476     def test_output_feature_vrf_aware(self):
2477         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2478         nat_ip_vrf10 = "10.0.0.10"
2479         nat_ip_vrf20 = "10.0.0.20"
2480
2481         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2482                                    dst_address_length=32,
2483                                    next_hop_address=self.pg3.remote_ip4n,
2484                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2485                                    table_id=10)
2486         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2487                                    dst_address_length=32,
2488                                    next_hop_address=self.pg3.remote_ip4n,
2489                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2490                                    table_id=20)
2491
2492         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2493         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2494         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2495         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2496         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2497                                                          is_inside=0)
2498
2499         # in2out VRF 10
2500         pkts = self.create_stream_in(self.pg4, self.pg3)
2501         self.pg4.add_stream(pkts)
2502         self.pg_enable_capture(self.pg_interfaces)
2503         self.pg_start()
2504         capture = self.pg3.get_capture(len(pkts))
2505         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2506
2507         # out2in VRF 10
2508         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2509         self.pg3.add_stream(pkts)
2510         self.pg_enable_capture(self.pg_interfaces)
2511         self.pg_start()
2512         capture = self.pg4.get_capture(len(pkts))
2513         self.verify_capture_in(capture, self.pg4)
2514
2515         # in2out VRF 20
2516         pkts = self.create_stream_in(self.pg6, self.pg3)
2517         self.pg6.add_stream(pkts)
2518         self.pg_enable_capture(self.pg_interfaces)
2519         self.pg_start()
2520         capture = self.pg3.get_capture(len(pkts))
2521         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2522
2523         # out2in VRF 20
2524         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2525         self.pg3.add_stream(pkts)
2526         self.pg_enable_capture(self.pg_interfaces)
2527         self.pg_start()
2528         capture = self.pg6.get_capture(len(pkts))
2529         self.verify_capture_in(capture, self.pg6)
2530
2531     def test_output_feature_hairpinning(self):
2532         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2533         host = self.pg0.remote_hosts[0]
2534         server = self.pg0.remote_hosts[1]
2535         host_in_port = 1234
2536         host_out_port = 0
2537         server_in_port = 5678
2538         server_out_port = 8765
2539
2540         self.nat44_add_address(self.nat_addr)
2541         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2542         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2543                                                          is_inside=0)
2544
2545         # add static mapping for server
2546         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2547                                       server_in_port, server_out_port,
2548                                       proto=IP_PROTOS.tcp)
2549
2550         # send packet from host to server
2551         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2552              IP(src=host.ip4, dst=self.nat_addr) /
2553              TCP(sport=host_in_port, dport=server_out_port))
2554         self.pg0.add_stream(p)
2555         self.pg_enable_capture(self.pg_interfaces)
2556         self.pg_start()
2557         capture = self.pg0.get_capture(1)
2558         p = capture[0]
2559         try:
2560             ip = p[IP]
2561             tcp = p[TCP]
2562             self.assertEqual(ip.src, self.nat_addr)
2563             self.assertEqual(ip.dst, server.ip4)
2564             self.assertNotEqual(tcp.sport, host_in_port)
2565             self.assertEqual(tcp.dport, server_in_port)
2566             self.check_tcp_checksum(p)
2567             host_out_port = tcp.sport
2568         except:
2569             self.logger.error(ppp("Unexpected or invalid packet:", p))
2570             raise
2571
2572         # send reply from server to host
2573         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2574              IP(src=server.ip4, dst=self.nat_addr) /
2575              TCP(sport=server_in_port, dport=host_out_port))
2576         self.pg0.add_stream(p)
2577         self.pg_enable_capture(self.pg_interfaces)
2578         self.pg_start()
2579         capture = self.pg0.get_capture(1)
2580         p = capture[0]
2581         try:
2582             ip = p[IP]
2583             tcp = p[TCP]
2584             self.assertEqual(ip.src, self.nat_addr)
2585             self.assertEqual(ip.dst, host.ip4)
2586             self.assertEqual(tcp.sport, server_out_port)
2587             self.assertEqual(tcp.dport, host_in_port)
2588             self.check_tcp_checksum(p)
2589         except:
2590             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2591             raise
2592
2593     def test_one_armed_nat44(self):
2594         """ One armed NAT44 """
2595         remote_host = self.pg9.remote_hosts[0]
2596         local_host = self.pg9.remote_hosts[1]
2597         external_port = 0
2598
2599         self.nat44_add_address(self.nat_addr)
2600         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2601         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2602                                                   is_inside=0)
2603
2604         # in2out
2605         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2606              IP(src=local_host.ip4, dst=remote_host.ip4) /
2607              TCP(sport=12345, dport=80))
2608         self.pg9.add_stream(p)
2609         self.pg_enable_capture(self.pg_interfaces)
2610         self.pg_start()
2611         capture = self.pg9.get_capture(1)
2612         p = capture[0]
2613         try:
2614             ip = p[IP]
2615             tcp = p[TCP]
2616             self.assertEqual(ip.src, self.nat_addr)
2617             self.assertEqual(ip.dst, remote_host.ip4)
2618             self.assertNotEqual(tcp.sport, 12345)
2619             external_port = tcp.sport
2620             self.assertEqual(tcp.dport, 80)
2621             self.check_tcp_checksum(p)
2622             self.check_ip_checksum(p)
2623         except:
2624             self.logger.error(ppp("Unexpected or invalid packet:", p))
2625             raise
2626
2627         # out2in
2628         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2629              IP(src=remote_host.ip4, dst=self.nat_addr) /
2630              TCP(sport=80, dport=external_port))
2631         self.pg9.add_stream(p)
2632         self.pg_enable_capture(self.pg_interfaces)
2633         self.pg_start()
2634         capture = self.pg9.get_capture(1)
2635         p = capture[0]
2636         try:
2637             ip = p[IP]
2638             tcp = p[TCP]
2639             self.assertEqual(ip.src, remote_host.ip4)
2640             self.assertEqual(ip.dst, local_host.ip4)
2641             self.assertEqual(tcp.sport, 80)
2642             self.assertEqual(tcp.dport, 12345)
2643             self.check_tcp_checksum(p)
2644             self.check_ip_checksum(p)
2645         except:
2646             self.logger.error(ppp("Unexpected or invalid packet:", p))
2647             raise
2648
2649     def test_del_session(self):
2650         """ Delete NAT44 session """
2651         self.nat44_add_address(self.nat_addr)
2652         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2653         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2654                                                   is_inside=0)
2655
2656         pkts = self.create_stream_in(self.pg0, self.pg1)
2657         self.pg0.add_stream(pkts)
2658         self.pg_enable_capture(self.pg_interfaces)
2659         self.pg_start()
2660         capture = self.pg1.get_capture(len(pkts))
2661
2662         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2663         nsessions = len(sessions)
2664
2665         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2666                                     sessions[0].inside_port,
2667                                     sessions[0].protocol)
2668         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2669                                     sessions[1].outside_port,
2670                                     sessions[1].protocol,
2671                                     is_in=0)
2672
2673         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2674         self.assertEqual(nsessions - len(sessions), 2)
2675
2676     def test_set_get_reass(self):
2677         """ NAT44 set/get virtual fragmentation reassembly """
2678         reas_cfg1 = self.vapi.nat_get_reass()
2679
2680         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2681                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2682                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2683
2684         reas_cfg2 = self.vapi.nat_get_reass()
2685
2686         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2687         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2688         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2689
2690         self.vapi.nat_set_reass(drop_frag=1)
2691         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2692
2693     def test_frag_in_order(self):
2694         """ NAT44 translate fragments arriving in order """
2695         self.nat44_add_address(self.nat_addr)
2696         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2697         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2698                                                   is_inside=0)
2699
2700         data = "A" * 4 + "B" * 16 + "C" * 3
2701         self.tcp_port_in = random.randint(1025, 65535)
2702
2703         reass = self.vapi.nat_reass_dump()
2704         reass_n_start = len(reass)
2705
2706         # in2out
2707         pkts = self.create_stream_frag(self.pg0,
2708                                        self.pg1.remote_ip4,
2709                                        self.tcp_port_in,
2710                                        20,
2711                                        data)
2712         self.pg0.add_stream(pkts)
2713         self.pg_enable_capture(self.pg_interfaces)
2714         self.pg_start()
2715         frags = self.pg1.get_capture(len(pkts))
2716         p = self.reass_frags_and_verify(frags,
2717                                         self.nat_addr,
2718                                         self.pg1.remote_ip4)
2719         self.assertEqual(p[TCP].dport, 20)
2720         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2721         self.tcp_port_out = p[TCP].sport
2722         self.assertEqual(data, p[Raw].load)
2723
2724         # out2in
2725         pkts = self.create_stream_frag(self.pg1,
2726                                        self.nat_addr,
2727                                        20,
2728                                        self.tcp_port_out,
2729                                        data)
2730         self.pg1.add_stream(pkts)
2731         self.pg_enable_capture(self.pg_interfaces)
2732         self.pg_start()
2733         frags = self.pg0.get_capture(len(pkts))
2734         p = self.reass_frags_and_verify(frags,
2735                                         self.pg1.remote_ip4,
2736                                         self.pg0.remote_ip4)
2737         self.assertEqual(p[TCP].sport, 20)
2738         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2739         self.assertEqual(data, p[Raw].load)
2740
2741         reass = self.vapi.nat_reass_dump()
2742         reass_n_end = len(reass)
2743
2744         self.assertEqual(reass_n_end - reass_n_start, 2)
2745
2746     def test_reass_hairpinning(self):
2747         """ NAT44 fragments hairpinning """
2748         host = self.pg0.remote_hosts[0]
2749         server = self.pg0.remote_hosts[1]
2750         host_in_port = random.randint(1025, 65535)
2751         host_out_port = 0
2752         server_in_port = random.randint(1025, 65535)
2753         server_out_port = random.randint(1025, 65535)
2754         data = "A" * 4 + "B" * 16 + "C" * 3
2755
2756         self.nat44_add_address(self.nat_addr)
2757         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2758         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2759                                                   is_inside=0)
2760         # add static mapping for server
2761         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2762                                       server_in_port, server_out_port,
2763                                       proto=IP_PROTOS.tcp)
2764
2765         # send packet from host to server
2766         pkts = self.create_stream_frag(self.pg0,
2767                                        self.nat_addr,
2768                                        host_in_port,
2769                                        server_out_port,
2770                                        data)
2771         self.pg0.add_stream(pkts)
2772         self.pg_enable_capture(self.pg_interfaces)
2773         self.pg_start()
2774         frags = self.pg0.get_capture(len(pkts))
2775         p = self.reass_frags_and_verify(frags,
2776                                         self.nat_addr,
2777                                         server.ip4)
2778         self.assertNotEqual(p[TCP].sport, host_in_port)
2779         self.assertEqual(p[TCP].dport, server_in_port)
2780         self.assertEqual(data, p[Raw].load)
2781
2782     def test_frag_out_of_order(self):
2783         """ NAT44 translate fragments arriving out of order """
2784         self.nat44_add_address(self.nat_addr)
2785         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2786         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2787                                                   is_inside=0)
2788
2789         data = "A" * 4 + "B" * 16 + "C" * 3
2790         random.randint(1025, 65535)
2791
2792         # in2out
2793         pkts = self.create_stream_frag(self.pg0,
2794                                        self.pg1.remote_ip4,
2795                                        self.tcp_port_in,
2796                                        20,
2797                                        data)
2798         pkts.reverse()
2799         self.pg0.add_stream(pkts)
2800         self.pg_enable_capture(self.pg_interfaces)
2801         self.pg_start()
2802         frags = self.pg1.get_capture(len(pkts))
2803         p = self.reass_frags_and_verify(frags,
2804                                         self.nat_addr,
2805                                         self.pg1.remote_ip4)
2806         self.assertEqual(p[TCP].dport, 20)
2807         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2808         self.tcp_port_out = p[TCP].sport
2809         self.assertEqual(data, p[Raw].load)
2810
2811         # out2in
2812         pkts = self.create_stream_frag(self.pg1,
2813                                        self.nat_addr,
2814                                        20,
2815                                        self.tcp_port_out,
2816                                        data)
2817         pkts.reverse()
2818         self.pg1.add_stream(pkts)
2819         self.pg_enable_capture(self.pg_interfaces)
2820         self.pg_start()
2821         frags = self.pg0.get_capture(len(pkts))
2822         p = self.reass_frags_and_verify(frags,
2823                                         self.pg1.remote_ip4,
2824                                         self.pg0.remote_ip4)
2825         self.assertEqual(p[TCP].sport, 20)
2826         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2827         self.assertEqual(data, p[Raw].load)
2828
2829     def test_port_restricted(self):
2830         """ Port restricted NAT44 (MAP-E CE) """
2831         self.nat44_add_address(self.nat_addr)
2832         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2833         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2834                                                   is_inside=0)
2835         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
2836                       "psid-offset 6 psid-len 6")
2837
2838         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2839              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2840              TCP(sport=4567, dport=22))
2841         self.pg0.add_stream(p)
2842         self.pg_enable_capture(self.pg_interfaces)
2843         self.pg_start()
2844         capture = self.pg1.get_capture(1)
2845         p = capture[0]
2846         try:
2847             ip = p[IP]
2848             tcp = p[TCP]
2849             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2850             self.assertEqual(ip.src, self.nat_addr)
2851             self.assertEqual(tcp.dport, 22)
2852             self.assertNotEqual(tcp.sport, 4567)
2853             self.assertEqual((tcp.sport >> 6) & 63, 10)
2854             self.check_tcp_checksum(p)
2855             self.check_ip_checksum(p)
2856         except:
2857             self.logger.error(ppp("Unexpected or invalid packet:", p))
2858             raise
2859
2860     def tearDown(self):
2861         super(TestNAT44, self).tearDown()
2862         if not self.vpp_dead:
2863             self.logger.info(self.vapi.cli("show nat44 verbose"))
2864             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2865             self.vapi.cli("nat addr-port-assignment-alg default")
2866             self.clear_nat44()
2867
2868
2869 class TestDeterministicNAT(MethodHolder):
2870     """ Deterministic NAT Test Cases """
2871
2872     @classmethod
2873     def setUpConstants(cls):
2874         super(TestDeterministicNAT, cls).setUpConstants()
2875         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2876
2877     @classmethod
2878     def setUpClass(cls):
2879         super(TestDeterministicNAT, cls).setUpClass()
2880
2881         try:
2882             cls.tcp_port_in = 6303
2883             cls.tcp_external_port = 6303
2884             cls.udp_port_in = 6304
2885             cls.udp_external_port = 6304
2886             cls.icmp_id_in = 6305
2887             cls.nat_addr = '10.0.0.3'
2888
2889             cls.create_pg_interfaces(range(3))
2890             cls.interfaces = list(cls.pg_interfaces)
2891
2892             for i in cls.interfaces:
2893                 i.admin_up()
2894                 i.config_ip4()
2895                 i.resolve_arp()
2896
2897             cls.pg0.generate_remote_hosts(2)
2898             cls.pg0.configure_ipv4_neighbors()
2899
2900         except Exception:
2901             super(TestDeterministicNAT, cls).tearDownClass()
2902             raise
2903
2904     def create_stream_in(self, in_if, out_if, ttl=64):
2905         """
2906         Create packet stream for inside network
2907
2908         :param in_if: Inside interface
2909         :param out_if: Outside interface
2910         :param ttl: TTL of generated packets
2911         """
2912         pkts = []
2913         # TCP
2914         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2915              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2916              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2917         pkts.append(p)
2918
2919         # UDP
2920         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2921              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2922              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2923         pkts.append(p)
2924
2925         # ICMP
2926         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2927              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2928              ICMP(id=self.icmp_id_in, type='echo-request'))
2929         pkts.append(p)
2930
2931         return pkts
2932
2933     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2934         """
2935         Create packet stream for outside network
2936
2937         :param out_if: Outside interface
2938         :param dst_ip: Destination IP address (Default use global NAT address)
2939         :param ttl: TTL of generated packets
2940         """
2941         if dst_ip is None:
2942             dst_ip = self.nat_addr
2943         pkts = []
2944         # TCP
2945         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2946              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2947              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2948         pkts.append(p)
2949
2950         # UDP
2951         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2952              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2953              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2954         pkts.append(p)
2955
2956         # ICMP
2957         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2958              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2959              ICMP(id=self.icmp_external_id, type='echo-reply'))
2960         pkts.append(p)
2961
2962         return pkts
2963
2964     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2965         """
2966         Verify captured packets on outside network
2967
2968         :param capture: Captured packets
2969         :param nat_ip: Translated IP address (Default use global NAT address)
2970         :param same_port: Sorce port number is not translated (Default False)
2971         :param packet_num: Expected number of packets (Default 3)
2972         """
2973         if nat_ip is None:
2974             nat_ip = self.nat_addr
2975         self.assertEqual(packet_num, len(capture))
2976         for packet in capture:
2977             try:
2978                 self.assertEqual(packet[IP].src, nat_ip)
2979                 if packet.haslayer(TCP):
2980                     self.tcp_port_out = packet[TCP].sport
2981                 elif packet.haslayer(UDP):
2982                     self.udp_port_out = packet[UDP].sport
2983                 else:
2984                     self.icmp_external_id = packet[ICMP].id
2985             except:
2986                 self.logger.error(ppp("Unexpected or invalid packet "
2987                                       "(outside network):", packet))
2988                 raise
2989
2990     def initiate_tcp_session(self, in_if, out_if):
2991         """
2992         Initiates TCP session
2993
2994         :param in_if: Inside interface
2995         :param out_if: Outside interface
2996         """
2997         try:
2998             # SYN packet in->out
2999             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3000                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3001                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3002                      flags="S"))
3003             in_if.add_stream(p)
3004             self.pg_enable_capture(self.pg_interfaces)
3005             self.pg_start()
3006             capture = out_if.get_capture(1)
3007             p = capture[0]
3008             self.tcp_port_out = p[TCP].sport
3009
3010             # SYN + ACK packet out->in
3011             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3012                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3013                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3014                      flags="SA"))
3015             out_if.add_stream(p)
3016             self.pg_enable_capture(self.pg_interfaces)
3017             self.pg_start()
3018             in_if.get_capture(1)
3019
3020             # ACK packet in->out
3021             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3022                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3023                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3024                      flags="A"))
3025             in_if.add_stream(p)
3026             self.pg_enable_capture(self.pg_interfaces)
3027             self.pg_start()
3028             out_if.get_capture(1)
3029
3030         except:
3031             self.logger.error("TCP 3 way handshake failed")
3032             raise
3033
3034     def verify_ipfix_max_entries_per_user(self, data):
3035         """
3036         Verify IPFIX maximum entries per user exceeded event
3037
3038         :param data: Decoded IPFIX data records
3039         """
3040         self.assertEqual(1, len(data))
3041         record = data[0]
3042         # natEvent
3043         self.assertEqual(ord(record[230]), 13)
3044         # natQuotaExceededEvent
3045         self.assertEqual('\x03\x00\x00\x00', record[466])
3046         # sourceIPv4Address
3047         self.assertEqual(self.pg0.remote_ip4n, record[8])
3048
3049     def test_deterministic_mode(self):
3050         """ NAT plugin run deterministic mode """
3051         in_addr = '172.16.255.0'
3052         out_addr = '172.17.255.50'
3053         in_addr_t = '172.16.255.20'
3054         in_addr_n = socket.inet_aton(in_addr)
3055         out_addr_n = socket.inet_aton(out_addr)
3056         in_addr_t_n = socket.inet_aton(in_addr_t)
3057         in_plen = 24
3058         out_plen = 32
3059
3060         nat_config = self.vapi.nat_show_config()
3061         self.assertEqual(1, nat_config.deterministic)
3062
3063         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3064
3065         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3066         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3067         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3068         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3069
3070         deterministic_mappings = self.vapi.nat_det_map_dump()
3071         self.assertEqual(len(deterministic_mappings), 1)
3072         dsm = deterministic_mappings[0]
3073         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3074         self.assertEqual(in_plen, dsm.in_plen)
3075         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3076         self.assertEqual(out_plen, dsm.out_plen)
3077
3078         self.clear_nat_det()
3079         deterministic_mappings = self.vapi.nat_det_map_dump()
3080         self.assertEqual(len(deterministic_mappings), 0)
3081
3082     def test_set_timeouts(self):
3083         """ Set deterministic NAT timeouts """
3084         timeouts_before = self.vapi.nat_det_get_timeouts()
3085
3086         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3087                                        timeouts_before.tcp_established + 10,
3088                                        timeouts_before.tcp_transitory + 10,
3089                                        timeouts_before.icmp + 10)
3090
3091         timeouts_after = self.vapi.nat_det_get_timeouts()
3092
3093         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3094         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3095         self.assertNotEqual(timeouts_before.tcp_established,
3096                             timeouts_after.tcp_established)
3097         self.assertNotEqual(timeouts_before.tcp_transitory,
3098                             timeouts_after.tcp_transitory)
3099
3100     def test_det_in(self):
3101         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3102
3103         nat_ip = "10.0.0.10"
3104
3105         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3106                                       32,
3107                                       socket.inet_aton(nat_ip),
3108                                       32)
3109         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3110         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3111                                                   is_inside=0)
3112
3113         # in2out
3114         pkts = self.create_stream_in(self.pg0, self.pg1)
3115         self.pg0.add_stream(pkts)
3116         self.pg_enable_capture(self.pg_interfaces)
3117         self.pg_start()
3118         capture = self.pg1.get_capture(len(pkts))
3119         self.verify_capture_out(capture, nat_ip)
3120
3121         # out2in
3122         pkts = self.create_stream_out(self.pg1, nat_ip)
3123         self.pg1.add_stream(pkts)
3124         self.pg_enable_capture(self.pg_interfaces)
3125         self.pg_start()
3126         capture = self.pg0.get_capture(len(pkts))
3127         self.verify_capture_in(capture, self.pg0)
3128
3129         # session dump test
3130         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3131         self.assertEqual(len(sessions), 3)
3132
3133         # TCP session
3134         s = sessions[0]
3135         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3136         self.assertEqual(s.in_port, self.tcp_port_in)
3137         self.assertEqual(s.out_port, self.tcp_port_out)
3138         self.assertEqual(s.ext_port, self.tcp_external_port)
3139
3140         # UDP session
3141         s = sessions[1]
3142         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3143         self.assertEqual(s.in_port, self.udp_port_in)
3144         self.assertEqual(s.out_port, self.udp_port_out)
3145         self.assertEqual(s.ext_port, self.udp_external_port)
3146
3147         # ICMP session
3148         s = sessions[2]
3149         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3150         self.assertEqual(s.in_port, self.icmp_id_in)
3151         self.assertEqual(s.out_port, self.icmp_external_id)
3152
3153     def test_multiple_users(self):
3154         """ Deterministic NAT multiple users """
3155
3156         nat_ip = "10.0.0.10"
3157         port_in = 80
3158         external_port = 6303
3159
3160         host0 = self.pg0.remote_hosts[0]
3161         host1 = self.pg0.remote_hosts[1]
3162
3163         self.vapi.nat_det_add_del_map(host0.ip4n,
3164                                       24,
3165                                       socket.inet_aton(nat_ip),
3166                                       32)
3167         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3168         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3169                                                   is_inside=0)
3170
3171         # host0 to out
3172         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3173              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3174              TCP(sport=port_in, dport=external_port))
3175         self.pg0.add_stream(p)
3176         self.pg_enable_capture(self.pg_interfaces)
3177         self.pg_start()
3178         capture = self.pg1.get_capture(1)
3179         p = capture[0]
3180         try:
3181             ip = p[IP]
3182             tcp = p[TCP]
3183             self.assertEqual(ip.src, nat_ip)
3184             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3185             self.assertEqual(tcp.dport, external_port)
3186             port_out0 = tcp.sport
3187         except:
3188             self.logger.error(ppp("Unexpected or invalid packet:", p))
3189             raise
3190
3191         # host1 to out
3192         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3193              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3194              TCP(sport=port_in, dport=external_port))
3195         self.pg0.add_stream(p)
3196         self.pg_enable_capture(self.pg_interfaces)
3197         self.pg_start()
3198         capture = self.pg1.get_capture(1)
3199         p = capture[0]
3200         try:
3201             ip = p[IP]
3202             tcp = p[TCP]
3203             self.assertEqual(ip.src, nat_ip)
3204             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3205             self.assertEqual(tcp.dport, external_port)
3206             port_out1 = tcp.sport
3207         except:
3208             self.logger.error(ppp("Unexpected or invalid packet:", p))
3209             raise
3210
3211         dms = self.vapi.nat_det_map_dump()
3212         self.assertEqual(1, len(dms))
3213         self.assertEqual(2, dms[0].ses_num)
3214
3215         # out to host0
3216         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3217              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3218              TCP(sport=external_port, dport=port_out0))
3219         self.pg1.add_stream(p)
3220         self.pg_enable_capture(self.pg_interfaces)
3221         self.pg_start()
3222         capture = self.pg0.get_capture(1)
3223         p = capture[0]
3224         try:
3225             ip = p[IP]
3226             tcp = p[TCP]
3227             self.assertEqual(ip.src, self.pg1.remote_ip4)
3228             self.assertEqual(ip.dst, host0.ip4)
3229             self.assertEqual(tcp.dport, port_in)
3230             self.assertEqual(tcp.sport, external_port)
3231         except:
3232             self.logger.error(ppp("Unexpected or invalid packet:", p))
3233             raise
3234
3235         # out to host1
3236         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3237              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3238              TCP(sport=external_port, dport=port_out1))
3239         self.pg1.add_stream(p)
3240         self.pg_enable_capture(self.pg_interfaces)
3241         self.pg_start()
3242         capture = self.pg0.get_capture(1)
3243         p = capture[0]
3244         try:
3245             ip = p[IP]
3246             tcp = p[TCP]
3247             self.assertEqual(ip.src, self.pg1.remote_ip4)
3248             self.assertEqual(ip.dst, host1.ip4)
3249             self.assertEqual(tcp.dport, port_in)
3250             self.assertEqual(tcp.sport, external_port)
3251         except:
3252             self.logger.error(ppp("Unexpected or invalid packet", p))
3253             raise
3254
3255         # session close api test
3256         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3257                                             port_out1,
3258                                             self.pg1.remote_ip4n,
3259                                             external_port)
3260         dms = self.vapi.nat_det_map_dump()
3261         self.assertEqual(dms[0].ses_num, 1)
3262
3263         self.vapi.nat_det_close_session_in(host0.ip4n,
3264                                            port_in,
3265                                            self.pg1.remote_ip4n,
3266                                            external_port)
3267         dms = self.vapi.nat_det_map_dump()
3268         self.assertEqual(dms[0].ses_num, 0)
3269
3270     def test_tcp_session_close_detection_in(self):
3271         """ Deterministic NAT TCP session close from inside network """
3272         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3273                                       32,
3274                                       socket.inet_aton(self.nat_addr),
3275                                       32)
3276         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3277         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3278                                                   is_inside=0)
3279
3280         self.initiate_tcp_session(self.pg0, self.pg1)
3281
3282         # close the session from inside
3283         try:
3284             # FIN packet in -> out
3285             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3286                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3287                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3288                      flags="F"))
3289             self.pg0.add_stream(p)
3290             self.pg_enable_capture(self.pg_interfaces)
3291             self.pg_start()
3292             self.pg1.get_capture(1)
3293
3294             pkts = []
3295
3296             # ACK packet out -> in
3297             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3298                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3299                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3300                      flags="A"))
3301             pkts.append(p)
3302
3303             # FIN packet out -> in
3304             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3305                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3306                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3307                      flags="F"))
3308             pkts.append(p)
3309
3310             self.pg1.add_stream(pkts)
3311             self.pg_enable_capture(self.pg_interfaces)
3312             self.pg_start()
3313             self.pg0.get_capture(2)
3314
3315             # ACK packet in -> out
3316             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3317                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3318                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3319                      flags="A"))
3320             self.pg0.add_stream(p)
3321             self.pg_enable_capture(self.pg_interfaces)
3322             self.pg_start()
3323             self.pg1.get_capture(1)
3324
3325             # Check if deterministic NAT44 closed the session
3326             dms = self.vapi.nat_det_map_dump()
3327             self.assertEqual(0, dms[0].ses_num)
3328         except:
3329             self.logger.error("TCP session termination failed")
3330             raise
3331
3332     def test_tcp_session_close_detection_out(self):
3333         """ Deterministic NAT TCP session close from outside network """
3334         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3335                                       32,
3336                                       socket.inet_aton(self.nat_addr),
3337                                       32)
3338         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3339         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3340                                                   is_inside=0)
3341
3342         self.initiate_tcp_session(self.pg0, self.pg1)
3343
3344         # close the session from outside
3345         try:
3346             # FIN packet out -> in
3347             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3348                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3349                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3350                      flags="F"))
3351             self.pg1.add_stream(p)
3352             self.pg_enable_capture(self.pg_interfaces)
3353             self.pg_start()
3354             self.pg0.get_capture(1)
3355
3356             pkts = []
3357
3358             # ACK packet in -> out
3359             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3360                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3361                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3362                      flags="A"))
3363             pkts.append(p)
3364
3365             # ACK packet in -> out
3366             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3367                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3368                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3369                      flags="F"))
3370             pkts.append(p)
3371
3372             self.pg0.add_stream(pkts)
3373             self.pg_enable_capture(self.pg_interfaces)
3374             self.pg_start()
3375             self.pg1.get_capture(2)
3376
3377             # ACK packet out -> in
3378             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3379                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3380                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3381                      flags="A"))
3382             self.pg1.add_stream(p)
3383             self.pg_enable_capture(self.pg_interfaces)
3384             self.pg_start()
3385             self.pg0.get_capture(1)
3386
3387             # Check if deterministic NAT44 closed the session
3388             dms = self.vapi.nat_det_map_dump()
3389             self.assertEqual(0, dms[0].ses_num)
3390         except:
3391             self.logger.error("TCP session termination failed")
3392             raise
3393
3394     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3395     def test_session_timeout(self):
3396         """ Deterministic NAT session timeouts """
3397         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3398                                       32,
3399                                       socket.inet_aton(self.nat_addr),
3400                                       32)
3401         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3402         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3403                                                   is_inside=0)
3404
3405         self.initiate_tcp_session(self.pg0, self.pg1)
3406         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3407         pkts = self.create_stream_in(self.pg0, self.pg1)
3408         self.pg0.add_stream(pkts)
3409         self.pg_enable_capture(self.pg_interfaces)
3410         self.pg_start()
3411         capture = self.pg1.get_capture(len(pkts))
3412         sleep(15)
3413
3414         dms = self.vapi.nat_det_map_dump()
3415         self.assertEqual(0, dms[0].ses_num)
3416
3417     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3418     def test_session_limit_per_user(self):
3419         """ Deterministic NAT maximum sessions per user limit """
3420         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3421                                       32,
3422                                       socket.inet_aton(self.nat_addr),
3423                                       32)
3424         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3425         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3426                                                   is_inside=0)
3427         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3428                                      src_address=self.pg2.local_ip4n,
3429                                      path_mtu=512,
3430                                      template_interval=10)
3431         self.vapi.nat_ipfix()
3432
3433         pkts = []
3434         for port in range(1025, 2025):
3435             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3436                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3437                  UDP(sport=port, dport=port))
3438             pkts.append(p)
3439
3440         self.pg0.add_stream(pkts)
3441         self.pg_enable_capture(self.pg_interfaces)
3442         self.pg_start()
3443         capture = self.pg1.get_capture(len(pkts))
3444
3445         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3446              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3447              UDP(sport=3001, dport=3002))
3448         self.pg0.add_stream(p)
3449         self.pg_enable_capture(self.pg_interfaces)
3450         self.pg_start()
3451         capture = self.pg1.assert_nothing_captured()
3452
3453         # verify ICMP error packet
3454         capture = self.pg0.get_capture(1)
3455         p = capture[0]
3456         self.assertTrue(p.haslayer(ICMP))
3457         icmp = p[ICMP]
3458         self.assertEqual(icmp.type, 3)
3459         self.assertEqual(icmp.code, 1)
3460         self.assertTrue(icmp.haslayer(IPerror))
3461         inner_ip = icmp[IPerror]
3462         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3463         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3464
3465         dms = self.vapi.nat_det_map_dump()
3466
3467         self.assertEqual(1000, dms[0].ses_num)
3468
3469         # verify IPFIX logging
3470         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3471         sleep(1)
3472         capture = self.pg2.get_capture(2)
3473         ipfix = IPFIXDecoder()
3474         # first load template
3475         for p in capture:
3476             self.assertTrue(p.haslayer(IPFIX))
3477             if p.haslayer(Template):
3478                 ipfix.add_template(p.getlayer(Template))
3479         # verify events in data set
3480         for p in capture:
3481             if p.haslayer(Data):
3482                 data = ipfix.decode_data_set(p.getlayer(Set))
3483                 self.verify_ipfix_max_entries_per_user(data)
3484
3485     def clear_nat_det(self):
3486         """
3487         Clear deterministic NAT configuration.
3488         """
3489         self.vapi.nat_ipfix(enable=0)
3490         self.vapi.nat_det_set_timeouts()
3491         deterministic_mappings = self.vapi.nat_det_map_dump()
3492         for dsm in deterministic_mappings:
3493             self.vapi.nat_det_add_del_map(dsm.in_addr,
3494                                           dsm.in_plen,
3495                                           dsm.out_addr,
3496                                           dsm.out_plen,
3497                                           is_add=0)
3498
3499         interfaces = self.vapi.nat44_interface_dump()
3500         for intf in interfaces:
3501             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3502                                                       intf.is_inside,
3503                                                       is_add=0)
3504
3505     def tearDown(self):
3506         super(TestDeterministicNAT, self).tearDown()
3507         if not self.vpp_dead:
3508             self.logger.info(self.vapi.cli("show nat44 detail"))
3509             self.clear_nat_det()
3510
3511
3512 class TestNAT64(MethodHolder):
3513     """ NAT64 Test Cases """
3514
3515     @classmethod
3516     def setUpClass(cls):
3517         super(TestNAT64, cls).setUpClass()
3518
3519         try:
3520             cls.tcp_port_in = 6303
3521             cls.tcp_port_out = 6303
3522             cls.udp_port_in = 6304
3523             cls.udp_port_out = 6304
3524             cls.icmp_id_in = 6305
3525             cls.icmp_id_out = 6305
3526             cls.nat_addr = '10.0.0.3'
3527             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3528             cls.vrf1_id = 10
3529             cls.vrf1_nat_addr = '10.0.10.3'
3530             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3531                                                    cls.vrf1_nat_addr)
3532
3533             cls.create_pg_interfaces(range(5))
3534             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3535             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3536             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3537
3538             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3539
3540             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3541
3542             cls.pg0.generate_remote_hosts(2)
3543
3544             for i in cls.ip6_interfaces:
3545                 i.admin_up()
3546                 i.config_ip6()
3547                 i.configure_ipv6_neighbors()
3548
3549             for i in cls.ip4_interfaces:
3550                 i.admin_up()
3551                 i.config_ip4()
3552                 i.resolve_arp()
3553
3554             cls.pg3.admin_up()
3555             cls.pg3.config_ip4()
3556             cls.pg3.resolve_arp()
3557             cls.pg3.config_ip6()
3558             cls.pg3.configure_ipv6_neighbors()
3559
3560         except Exception:
3561             super(TestNAT64, cls).tearDownClass()
3562             raise
3563
3564     def test_pool(self):
3565         """ Add/delete address to NAT64 pool """
3566         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3567
3568         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3569
3570         addresses = self.vapi.nat64_pool_addr_dump()
3571         self.assertEqual(len(addresses), 1)
3572         self.assertEqual(addresses[0].address, nat_addr)
3573
3574         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3575
3576         addresses = self.vapi.nat64_pool_addr_dump()
3577         self.assertEqual(len(addresses), 0)
3578
3579     def test_interface(self):
3580         """ Enable/disable NAT64 feature on the interface """
3581         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3582         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3583
3584         interfaces = self.vapi.nat64_interface_dump()
3585         self.assertEqual(len(interfaces), 2)
3586         pg0_found = False
3587         pg1_found = False
3588         for intf in interfaces:
3589             if intf.sw_if_index == self.pg0.sw_if_index:
3590                 self.assertEqual(intf.is_inside, 1)
3591                 pg0_found = True
3592             elif intf.sw_if_index == self.pg1.sw_if_index:
3593                 self.assertEqual(intf.is_inside, 0)
3594                 pg1_found = True
3595         self.assertTrue(pg0_found)
3596         self.assertTrue(pg1_found)
3597
3598         features = self.vapi.cli("show interface features pg0")
3599         self.assertNotEqual(features.find('nat64-in2out'), -1)
3600         features = self.vapi.cli("show interface features pg1")
3601         self.assertNotEqual(features.find('nat64-out2in'), -1)
3602
3603         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3604         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3605
3606         interfaces = self.vapi.nat64_interface_dump()
3607         self.assertEqual(len(interfaces), 0)
3608
3609     def test_static_bib(self):
3610         """ Add/delete static BIB entry """
3611         in_addr = socket.inet_pton(socket.AF_INET6,
3612                                    '2001:db8:85a3::8a2e:370:7334')
3613         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3614         in_port = 1234
3615         out_port = 5678
3616         proto = IP_PROTOS.tcp
3617
3618         self.vapi.nat64_add_del_static_bib(in_addr,
3619                                            out_addr,
3620                                            in_port,
3621                                            out_port,
3622                                            proto)
3623         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3624         static_bib_num = 0
3625         for bibe in bib:
3626             if bibe.is_static:
3627                 static_bib_num += 1
3628                 self.assertEqual(bibe.i_addr, in_addr)
3629                 self.assertEqual(bibe.o_addr, out_addr)
3630                 self.assertEqual(bibe.i_port, in_port)
3631                 self.assertEqual(bibe.o_port, out_port)
3632         self.assertEqual(static_bib_num, 1)
3633
3634         self.vapi.nat64_add_del_static_bib(in_addr,
3635                                            out_addr,
3636                                            in_port,
3637                                            out_port,
3638                                            proto,
3639                                            is_add=0)
3640         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3641         static_bib_num = 0
3642         for bibe in bib:
3643             if bibe.is_static:
3644                 static_bib_num += 1
3645         self.assertEqual(static_bib_num, 0)
3646
3647     def test_set_timeouts(self):
3648         """ Set NAT64 timeouts """
3649         # verify default values
3650         timeouts = self.vapi.nat64_get_timeouts()
3651         self.assertEqual(timeouts.udp, 300)
3652         self.assertEqual(timeouts.icmp, 60)
3653         self.assertEqual(timeouts.tcp_trans, 240)
3654         self.assertEqual(timeouts.tcp_est, 7440)
3655         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3656
3657         # set and verify custom values
3658         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3659                                      tcp_est=7450, tcp_incoming_syn=10)
3660         timeouts = self.vapi.nat64_get_timeouts()
3661         self.assertEqual(timeouts.udp, 200)
3662         self.assertEqual(timeouts.icmp, 30)
3663         self.assertEqual(timeouts.tcp_trans, 250)
3664         self.assertEqual(timeouts.tcp_est, 7450)
3665         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3666
3667     def test_dynamic(self):
3668         """ NAT64 dynamic translation test """
3669         self.tcp_port_in = 6303
3670         self.udp_port_in = 6304
3671         self.icmp_id_in = 6305
3672
3673         ses_num_start = self.nat64_get_ses_num()
3674
3675         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3676                                                 self.nat_addr_n)
3677         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3678         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3679
3680         # in2out
3681         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3682         self.pg0.add_stream(pkts)
3683         self.pg_enable_capture(self.pg_interfaces)
3684         self.pg_start()
3685         capture = self.pg1.get_capture(len(pkts))
3686         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3687                                 dst_ip=self.pg1.remote_ip4)
3688
3689         # out2in
3690         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3691         self.pg1.add_stream(pkts)
3692         self.pg_enable_capture(self.pg_interfaces)
3693         self.pg_start()
3694         capture = self.pg0.get_capture(len(pkts))
3695         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3696         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3697
3698         # in2out
3699         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3700         self.pg0.add_stream(pkts)
3701         self.pg_enable_capture(self.pg_interfaces)
3702         self.pg_start()
3703         capture = self.pg1.get_capture(len(pkts))
3704         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3705                                 dst_ip=self.pg1.remote_ip4)
3706
3707         # out2in
3708         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3709         self.pg1.add_stream(pkts)
3710         self.pg_enable_capture(self.pg_interfaces)
3711         self.pg_start()
3712         capture = self.pg0.get_capture(len(pkts))
3713         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3714
3715         ses_num_end = self.nat64_get_ses_num()
3716
3717         self.assertEqual(ses_num_end - ses_num_start, 3)
3718
3719         # tenant with specific VRF
3720         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3721                                                 self.vrf1_nat_addr_n,
3722                                                 vrf_id=self.vrf1_id)
3723         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3724
3725         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3726         self.pg2.add_stream(pkts)
3727         self.pg_enable_capture(self.pg_interfaces)
3728         self.pg_start()
3729         capture = self.pg1.get_capture(len(pkts))
3730         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3731                                 dst_ip=self.pg1.remote_ip4)
3732
3733         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3734         self.pg1.add_stream(pkts)
3735         self.pg_enable_capture(self.pg_interfaces)
3736         self.pg_start()
3737         capture = self.pg2.get_capture(len(pkts))
3738         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3739
3740     def test_static(self):
3741         """ NAT64 static translation test """
3742         self.tcp_port_in = 60303
3743         self.udp_port_in = 60304
3744         self.icmp_id_in = 60305
3745         self.tcp_port_out = 60303
3746         self.udp_port_out = 60304
3747         self.icmp_id_out = 60305
3748
3749         ses_num_start = self.nat64_get_ses_num()
3750
3751         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3752                                                 self.nat_addr_n)
3753         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3754         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3755
3756         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3757                                            self.nat_addr_n,
3758                                            self.tcp_port_in,
3759                                            self.tcp_port_out,
3760                                            IP_PROTOS.tcp)
3761         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3762                                            self.nat_addr_n,
3763                                            self.udp_port_in,
3764                                            self.udp_port_out,
3765                                            IP_PROTOS.udp)
3766         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3767                                            self.nat_addr_n,
3768                                            self.icmp_id_in,
3769                                            self.icmp_id_out,
3770                                            IP_PROTOS.icmp)
3771
3772         # in2out
3773         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3774         self.pg0.add_stream(pkts)
3775         self.pg_enable_capture(self.pg_interfaces)
3776         self.pg_start()
3777         capture = self.pg1.get_capture(len(pkts))
3778         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3779                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3780
3781         # out2in
3782         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3783         self.pg1.add_stream(pkts)
3784         self.pg_enable_capture(self.pg_interfaces)
3785         self.pg_start()
3786         capture = self.pg0.get_capture(len(pkts))
3787         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3788         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3789
3790         ses_num_end = self.nat64_get_ses_num()
3791
3792         self.assertEqual(ses_num_end - ses_num_start, 3)
3793
3794     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3795     def test_session_timeout(self):
3796         """ NAT64 session timeout """
3797         self.icmp_id_in = 1234
3798         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3799                                                 self.nat_addr_n)
3800         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3801         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3802         self.vapi.nat64_set_timeouts(icmp=5)
3803
3804         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3805         self.pg0.add_stream(pkts)
3806         self.pg_enable_capture(self.pg_interfaces)
3807         self.pg_start()
3808         capture = self.pg1.get_capture(len(pkts))
3809
3810         ses_num_before_timeout = self.nat64_get_ses_num()
3811
3812         sleep(15)
3813
3814         # ICMP session after timeout
3815         ses_num_after_timeout = self.nat64_get_ses_num()
3816         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3817
3818     def test_icmp_error(self):
3819         """ NAT64 ICMP Error message translation """
3820         self.tcp_port_in = 6303
3821         self.udp_port_in = 6304
3822         self.icmp_id_in = 6305
3823
3824         ses_num_start = self.nat64_get_ses_num()
3825
3826         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3827                                                 self.nat_addr_n)
3828         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3829         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3830
3831         # send some packets to create sessions
3832         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3833         self.pg0.add_stream(pkts)
3834         self.pg_enable_capture(self.pg_interfaces)
3835         self.pg_start()
3836         capture_ip4 = self.pg1.get_capture(len(pkts))
3837         self.verify_capture_out(capture_ip4,
3838                                 nat_ip=self.nat_addr,
3839                                 dst_ip=self.pg1.remote_ip4)
3840
3841         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3842         self.pg1.add_stream(pkts)
3843         self.pg_enable_capture(self.pg_interfaces)
3844         self.pg_start()
3845         capture_ip6 = self.pg0.get_capture(len(pkts))
3846         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3847         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3848                                    self.pg0.remote_ip6)
3849
3850         # in2out
3851         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3852                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3853                 ICMPv6DestUnreach(code=1) /
3854                 packet[IPv6] for packet in capture_ip6]
3855         self.pg0.add_stream(pkts)
3856         self.pg_enable_capture(self.pg_interfaces)
3857         self.pg_start()
3858         capture = self.pg1.get_capture(len(pkts))
3859         for packet in capture:
3860             try:
3861                 self.assertEqual(packet[IP].src, self.nat_addr)
3862                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3863                 self.assertEqual(packet[ICMP].type, 3)
3864                 self.assertEqual(packet[ICMP].code, 13)
3865                 inner = packet[IPerror]
3866                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3867                 self.assertEqual(inner.dst, self.nat_addr)
3868                 self.check_icmp_checksum(packet)
3869                 if inner.haslayer(TCPerror):
3870                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3871                 elif inner.haslayer(UDPerror):
3872                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3873                 else:
3874                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3875             except:
3876                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3877                 raise
3878
3879         # out2in
3880         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3881                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3882                 ICMP(type=3, code=13) /
3883                 packet[IP] for packet in capture_ip4]
3884         self.pg1.add_stream(pkts)
3885         self.pg_enable_capture(self.pg_interfaces)
3886         self.pg_start()
3887         capture = self.pg0.get_capture(len(pkts))
3888         for packet in capture:
3889             try:
3890                 self.assertEqual(packet[IPv6].src, ip.src)
3891                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3892                 icmp = packet[ICMPv6DestUnreach]
3893                 self.assertEqual(icmp.code, 1)
3894                 inner = icmp[IPerror6]
3895                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3896                 self.assertEqual(inner.dst, ip.src)
3897                 self.check_icmpv6_checksum(packet)
3898                 if inner.haslayer(TCPerror):
3899                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3900                 elif inner.haslayer(UDPerror):
3901                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3902                 else:
3903                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3904                                      self.icmp_id_in)
3905             except:
3906                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3907                 raise
3908
3909     def test_hairpinning(self):
3910         """ NAT64 hairpinning """
3911
3912         client = self.pg0.remote_hosts[0]
3913         server = self.pg0.remote_hosts[1]
3914         server_tcp_in_port = 22
3915         server_tcp_out_port = 4022
3916         server_udp_in_port = 23
3917         server_udp_out_port = 4023
3918         client_tcp_in_port = 1234
3919         client_udp_in_port = 1235
3920         client_tcp_out_port = 0
3921         client_udp_out_port = 0
3922         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3923         nat_addr_ip6 = ip.src
3924
3925         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3926                                                 self.nat_addr_n)
3927         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3928         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3929
3930         self.vapi.nat64_add_del_static_bib(server.ip6n,
3931                                            self.nat_addr_n,
3932                                            server_tcp_in_port,
3933                                            server_tcp_out_port,
3934                                            IP_PROTOS.tcp)
3935         self.vapi.nat64_add_del_static_bib(server.ip6n,
3936                                            self.nat_addr_n,
3937                                            server_udp_in_port,
3938                                            server_udp_out_port,
3939                                            IP_PROTOS.udp)
3940
3941         # client to server
3942         pkts = []
3943         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3944              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3945              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3946         pkts.append(p)
3947         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3948              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3949              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3950         pkts.append(p)
3951         self.pg0.add_stream(pkts)
3952         self.pg_enable_capture(self.pg_interfaces)
3953         self.pg_start()
3954         capture = self.pg0.get_capture(len(pkts))
3955         for packet in capture:
3956             try:
3957                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3958                 self.assertEqual(packet[IPv6].dst, server.ip6)
3959                 if packet.haslayer(TCP):
3960                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3961                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3962                     self.check_tcp_checksum(packet)
3963                     client_tcp_out_port = packet[TCP].sport
3964                 else:
3965                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3966                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3967                     self.check_udp_checksum(packet)
3968                     client_udp_out_port = packet[UDP].sport
3969             except:
3970                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3971                 raise
3972
3973         # server to client
3974         pkts = []
3975         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3976              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3977              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3978         pkts.append(p)
3979         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3980              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3981              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3982         pkts.append(p)
3983         self.pg0.add_stream(pkts)
3984         self.pg_enable_capture(self.pg_interfaces)
3985         self.pg_start()
3986         capture = self.pg0.get_capture(len(pkts))
3987         for packet in capture:
3988             try:
3989                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3990                 self.assertEqual(packet[IPv6].dst, client.ip6)
3991                 if packet.haslayer(TCP):
3992                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3993                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3994                     self.check_tcp_checksum(packet)
3995                 else:
3996                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3997                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3998                     self.check_udp_checksum(packet)
3999             except:
4000                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4001                 raise
4002
4003         # ICMP error
4004         pkts = []
4005         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4006                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4007                 ICMPv6DestUnreach(code=1) /
4008                 packet[IPv6] for packet in capture]
4009         self.pg0.add_stream(pkts)
4010         self.pg_enable_capture(self.pg_interfaces)
4011         self.pg_start()
4012         capture = self.pg0.get_capture(len(pkts))
4013         for packet in capture:
4014             try:
4015                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4016                 self.assertEqual(packet[IPv6].dst, server.ip6)
4017                 icmp = packet[ICMPv6DestUnreach]
4018                 self.assertEqual(icmp.code, 1)
4019                 inner = icmp[IPerror6]
4020                 self.assertEqual(inner.src, server.ip6)
4021                 self.assertEqual(inner.dst, nat_addr_ip6)
4022                 self.check_icmpv6_checksum(packet)
4023                 if inner.haslayer(TCPerror):
4024                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4025                     self.assertEqual(inner[TCPerror].dport,
4026                                      client_tcp_out_port)
4027                 else:
4028                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4029                     self.assertEqual(inner[UDPerror].dport,
4030                                      client_udp_out_port)
4031             except:
4032                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4033                 raise
4034
4035     def test_prefix(self):
4036         """ NAT64 Network-Specific Prefix """
4037
4038         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4039                                                 self.nat_addr_n)
4040         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4041         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4042         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4043                                                 self.vrf1_nat_addr_n,
4044                                                 vrf_id=self.vrf1_id)
4045         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4046
4047         # Add global prefix
4048         global_pref64 = "2001:db8::"
4049         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4050         global_pref64_len = 32
4051         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4052
4053         prefix = self.vapi.nat64_prefix_dump()
4054         self.assertEqual(len(prefix), 1)
4055         self.assertEqual(prefix[0].prefix, global_pref64_n)
4056         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4057         self.assertEqual(prefix[0].vrf_id, 0)
4058
4059         # Add tenant specific prefix
4060         vrf1_pref64 = "2001:db8:122:300::"
4061         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4062         vrf1_pref64_len = 56
4063         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4064                                        vrf1_pref64_len,
4065                                        vrf_id=self.vrf1_id)
4066         prefix = self.vapi.nat64_prefix_dump()
4067         self.assertEqual(len(prefix), 2)
4068
4069         # Global prefix
4070         pkts = self.create_stream_in_ip6(self.pg0,
4071                                          self.pg1,
4072                                          pref=global_pref64,
4073                                          plen=global_pref64_len)
4074         self.pg0.add_stream(pkts)
4075         self.pg_enable_capture(self.pg_interfaces)
4076         self.pg_start()
4077         capture = self.pg1.get_capture(len(pkts))
4078         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4079                                 dst_ip=self.pg1.remote_ip4)
4080
4081         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4082         self.pg1.add_stream(pkts)
4083         self.pg_enable_capture(self.pg_interfaces)
4084         self.pg_start()
4085         capture = self.pg0.get_capture(len(pkts))
4086         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4087                                   global_pref64,
4088                                   global_pref64_len)
4089         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4090
4091         # Tenant specific prefix
4092         pkts = self.create_stream_in_ip6(self.pg2,
4093                                          self.pg1,
4094                                          pref=vrf1_pref64,
4095                                          plen=vrf1_pref64_len)
4096         self.pg2.add_stream(pkts)
4097         self.pg_enable_capture(self.pg_interfaces)
4098         self.pg_start()
4099         capture = self.pg1.get_capture(len(pkts))
4100         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4101                                 dst_ip=self.pg1.remote_ip4)
4102
4103         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4104         self.pg1.add_stream(pkts)
4105         self.pg_enable_capture(self.pg_interfaces)
4106         self.pg_start()
4107         capture = self.pg2.get_capture(len(pkts))
4108         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4109                                   vrf1_pref64,
4110                                   vrf1_pref64_len)
4111         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4112
4113     def test_unknown_proto(self):
4114         """ NAT64 translate packet with unknown protocol """
4115
4116         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4117                                                 self.nat_addr_n)
4118         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4119         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4120         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4121
4122         # in2out
4123         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4124              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4125              TCP(sport=self.tcp_port_in, dport=20))
4126         self.pg0.add_stream(p)
4127         self.pg_enable_capture(self.pg_interfaces)
4128         self.pg_start()
4129         p = self.pg1.get_capture(1)
4130
4131         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4132              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4133              GRE() /
4134              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4135              TCP(sport=1234, dport=1234))
4136         self.pg0.add_stream(p)
4137         self.pg_enable_capture(self.pg_interfaces)
4138         self.pg_start()
4139         p = self.pg1.get_capture(1)
4140         packet = p[0]
4141         try:
4142             self.assertEqual(packet[IP].src, self.nat_addr)
4143             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4144             self.assertTrue(packet.haslayer(GRE))
4145             self.check_ip_checksum(packet)
4146         except:
4147             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4148             raise
4149
4150         # out2in
4151         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4152              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4153              GRE() /
4154              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4155              TCP(sport=1234, dport=1234))
4156         self.pg1.add_stream(p)
4157         self.pg_enable_capture(self.pg_interfaces)
4158         self.pg_start()
4159         p = self.pg0.get_capture(1)
4160         packet = p[0]
4161         try:
4162             self.assertEqual(packet[IPv6].src, remote_ip6)
4163             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4164             self.assertEqual(packet[IPv6].nh, 47)
4165         except:
4166             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4167             raise
4168
4169     def test_hairpinning_unknown_proto(self):
4170         """ NAT64 translate packet with unknown protocol - hairpinning """
4171
4172         client = self.pg0.remote_hosts[0]
4173         server = self.pg0.remote_hosts[1]
4174         server_tcp_in_port = 22
4175         server_tcp_out_port = 4022
4176         client_tcp_in_port = 1234
4177         client_tcp_out_port = 1235
4178         server_nat_ip = "10.0.0.100"
4179         client_nat_ip = "10.0.0.110"
4180         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4181         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4182         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4183         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4184
4185         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4186                                                 client_nat_ip_n)
4187         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4188         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4189
4190         self.vapi.nat64_add_del_static_bib(server.ip6n,
4191                                            server_nat_ip_n,
4192                                            server_tcp_in_port,
4193                                            server_tcp_out_port,
4194                                            IP_PROTOS.tcp)
4195
4196         self.vapi.nat64_add_del_static_bib(server.ip6n,
4197                                            server_nat_ip_n,
4198                                            0,
4199                                            0,
4200                                            IP_PROTOS.gre)
4201
4202         self.vapi.nat64_add_del_static_bib(client.ip6n,
4203                                            client_nat_ip_n,
4204                                            client_tcp_in_port,
4205                                            client_tcp_out_port,
4206                                            IP_PROTOS.tcp)
4207
4208         # client to server
4209         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4210              IPv6(src=client.ip6, dst=server_nat_ip6) /
4211              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4212         self.pg0.add_stream(p)
4213         self.pg_enable_capture(self.pg_interfaces)
4214         self.pg_start()
4215         p = self.pg0.get_capture(1)
4216
4217         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4218              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4219              GRE() /
4220              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4221              TCP(sport=1234, dport=1234))
4222         self.pg0.add_stream(p)
4223         self.pg_enable_capture(self.pg_interfaces)
4224         self.pg_start()
4225         p = self.pg0.get_capture(1)
4226         packet = p[0]
4227         try:
4228             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4229             self.assertEqual(packet[IPv6].dst, server.ip6)
4230             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4231         except:
4232             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4233             raise
4234
4235         # server to client
4236         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4237              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4238              GRE() /
4239              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4240              TCP(sport=1234, dport=1234))
4241         self.pg0.add_stream(p)
4242         self.pg_enable_capture(self.pg_interfaces)
4243         self.pg_start()
4244         p = self.pg0.get_capture(1)
4245         packet = p[0]
4246         try:
4247             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4248             self.assertEqual(packet[IPv6].dst, client.ip6)
4249             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4250         except:
4251             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4252             raise
4253
4254     def test_one_armed_nat64(self):
4255         """ One armed NAT64 """
4256         external_port = 0
4257         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4258                                            '64:ff9b::',
4259                                            96)
4260
4261         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4262                                                 self.nat_addr_n)
4263         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4264         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4265
4266         # in2out
4267         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4268              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4269              TCP(sport=12345, dport=80))
4270         self.pg3.add_stream(p)
4271         self.pg_enable_capture(self.pg_interfaces)
4272         self.pg_start()
4273         capture = self.pg3.get_capture(1)
4274         p = capture[0]
4275         try:
4276             ip = p[IP]
4277             tcp = p[TCP]
4278             self.assertEqual(ip.src, self.nat_addr)
4279             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4280             self.assertNotEqual(tcp.sport, 12345)
4281             external_port = tcp.sport
4282             self.assertEqual(tcp.dport, 80)
4283             self.check_tcp_checksum(p)
4284             self.check_ip_checksum(p)
4285         except:
4286             self.logger.error(ppp("Unexpected or invalid packet:", p))
4287             raise
4288
4289         # out2in
4290         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4291              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4292              TCP(sport=80, dport=external_port))
4293         self.pg3.add_stream(p)
4294         self.pg_enable_capture(self.pg_interfaces)
4295         self.pg_start()
4296         capture = self.pg3.get_capture(1)
4297         p = capture[0]
4298         try:
4299             ip = p[IPv6]
4300             tcp = p[TCP]
4301             self.assertEqual(ip.src, remote_host_ip6)
4302             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4303             self.assertEqual(tcp.sport, 80)
4304             self.assertEqual(tcp.dport, 12345)
4305             self.check_tcp_checksum(p)
4306         except:
4307             self.logger.error(ppp("Unexpected or invalid packet:", p))
4308             raise
4309
4310     def test_frag_in_order(self):
4311         """ NAT64 translate fragments arriving in order """
4312         self.tcp_port_in = random.randint(1025, 65535)
4313
4314         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4315                                                 self.nat_addr_n)
4316         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4317         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4318
4319         reass = self.vapi.nat_reass_dump()
4320         reass_n_start = len(reass)
4321
4322         # in2out
4323         data = 'a' * 200
4324         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4325                                            self.tcp_port_in, 20, data)
4326         self.pg0.add_stream(pkts)
4327         self.pg_enable_capture(self.pg_interfaces)
4328         self.pg_start()
4329         frags = self.pg1.get_capture(len(pkts))
4330         p = self.reass_frags_and_verify(frags,
4331                                         self.nat_addr,
4332                                         self.pg1.remote_ip4)
4333         self.assertEqual(p[TCP].dport, 20)
4334         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4335         self.tcp_port_out = p[TCP].sport
4336         self.assertEqual(data, p[Raw].load)
4337
4338         # out2in
4339         data = "A" * 4 + "b" * 16 + "C" * 3
4340         pkts = self.create_stream_frag(self.pg1,
4341                                        self.nat_addr,
4342                                        20,
4343                                        self.tcp_port_out,
4344                                        data)
4345         self.pg1.add_stream(pkts)
4346         self.pg_enable_capture(self.pg_interfaces)
4347         self.pg_start()
4348         frags = self.pg0.get_capture(len(pkts))
4349         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4350         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4351         self.assertEqual(p[TCP].sport, 20)
4352         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4353         self.assertEqual(data, p[Raw].load)
4354
4355         reass = self.vapi.nat_reass_dump()
4356         reass_n_end = len(reass)
4357
4358         self.assertEqual(reass_n_end - reass_n_start, 2)
4359
4360     def test_reass_hairpinning(self):
4361         """ NAT64 fragments hairpinning """
4362         data = 'a' * 200
4363         client = self.pg0.remote_hosts[0]
4364         server = self.pg0.remote_hosts[1]
4365         server_in_port = random.randint(1025, 65535)
4366         server_out_port = random.randint(1025, 65535)
4367         client_in_port = random.randint(1025, 65535)
4368         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4369         nat_addr_ip6 = ip.src
4370
4371         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4372                                                 self.nat_addr_n)
4373         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4374         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4375
4376         # add static BIB entry for server
4377         self.vapi.nat64_add_del_static_bib(server.ip6n,
4378                                            self.nat_addr_n,
4379                                            server_in_port,
4380                                            server_out_port,
4381                                            IP_PROTOS.tcp)
4382
4383         # send packet from host to server
4384         pkts = self.create_stream_frag_ip6(self.pg0,
4385                                            self.nat_addr,
4386                                            client_in_port,
4387                                            server_out_port,
4388                                            data)
4389         self.pg0.add_stream(pkts)
4390         self.pg_enable_capture(self.pg_interfaces)
4391         self.pg_start()
4392         frags = self.pg0.get_capture(len(pkts))
4393         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4394         self.assertNotEqual(p[TCP].sport, client_in_port)
4395         self.assertEqual(p[TCP].dport, server_in_port)
4396         self.assertEqual(data, p[Raw].load)
4397
4398     def test_frag_out_of_order(self):
4399         """ NAT64 translate fragments arriving out of order """
4400         self.tcp_port_in = random.randint(1025, 65535)
4401
4402         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4403                                                 self.nat_addr_n)
4404         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4405         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4406
4407         # in2out
4408         data = 'a' * 200
4409         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4410                                            self.tcp_port_in, 20, data)
4411         pkts.reverse()
4412         self.pg0.add_stream(pkts)
4413         self.pg_enable_capture(self.pg_interfaces)
4414         self.pg_start()
4415         frags = self.pg1.get_capture(len(pkts))
4416         p = self.reass_frags_and_verify(frags,
4417                                         self.nat_addr,
4418                                         self.pg1.remote_ip4)
4419         self.assertEqual(p[TCP].dport, 20)
4420         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4421         self.tcp_port_out = p[TCP].sport
4422         self.assertEqual(data, p[Raw].load)
4423
4424         # out2in
4425         data = "A" * 4 + "B" * 16 + "C" * 3
4426         pkts = self.create_stream_frag(self.pg1,
4427                                        self.nat_addr,
4428                                        20,
4429                                        self.tcp_port_out,
4430                                        data)
4431         pkts.reverse()
4432         self.pg1.add_stream(pkts)
4433         self.pg_enable_capture(self.pg_interfaces)
4434         self.pg_start()
4435         frags = self.pg0.get_capture(len(pkts))
4436         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4437         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4438         self.assertEqual(p[TCP].sport, 20)
4439         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4440         self.assertEqual(data, p[Raw].load)
4441
4442     def test_interface_addr(self):
4443         """ Acquire NAT64 pool addresses from interface """
4444         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4445
4446         # no address in NAT64 pool
4447         adresses = self.vapi.nat44_address_dump()
4448         self.assertEqual(0, len(adresses))
4449
4450         # configure interface address and check NAT64 address pool
4451         self.pg4.config_ip4()
4452         addresses = self.vapi.nat64_pool_addr_dump()
4453         self.assertEqual(len(addresses), 1)
4454         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4455
4456         # remove interface address and check NAT64 address pool
4457         self.pg4.unconfig_ip4()
4458         addresses = self.vapi.nat64_pool_addr_dump()
4459         self.assertEqual(0, len(adresses))
4460
4461     def nat64_get_ses_num(self):
4462         """
4463         Return number of active NAT64 sessions.
4464         """
4465         st = self.vapi.nat64_st_dump()
4466         return len(st)
4467
4468     def clear_nat64(self):
4469         """
4470         Clear NAT64 configuration.
4471         """
4472         self.vapi.nat64_set_timeouts()
4473
4474         interfaces = self.vapi.nat64_interface_dump()
4475         for intf in interfaces:
4476             if intf.is_inside > 1:
4477                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4478                                                   0,
4479                                                   is_add=0)
4480             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4481                                               intf.is_inside,
4482                                               is_add=0)
4483
4484         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4485         for bibe in bib:
4486             if bibe.is_static:
4487                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4488                                                    bibe.o_addr,
4489                                                    bibe.i_port,
4490                                                    bibe.o_port,
4491                                                    bibe.proto,
4492                                                    bibe.vrf_id,
4493                                                    is_add=0)
4494
4495         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4496         for bibe in bib:
4497             if bibe.is_static:
4498                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4499                                                    bibe.o_addr,
4500                                                    bibe.i_port,
4501                                                    bibe.o_port,
4502                                                    bibe.proto,
4503                                                    bibe.vrf_id,
4504                                                    is_add=0)
4505
4506         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4507         for bibe in bib:
4508             if bibe.is_static:
4509                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4510                                                    bibe.o_addr,
4511                                                    bibe.i_port,
4512                                                    bibe.o_port,
4513                                                    bibe.proto,
4514                                                    bibe.vrf_id,
4515                                                    is_add=0)
4516
4517         adresses = self.vapi.nat64_pool_addr_dump()
4518         for addr in adresses:
4519             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4520                                                     addr.address,
4521                                                     vrf_id=addr.vrf_id,
4522                                                     is_add=0)
4523
4524         prefixes = self.vapi.nat64_prefix_dump()
4525         for prefix in prefixes:
4526             self.vapi.nat64_add_del_prefix(prefix.prefix,
4527                                            prefix.prefix_len,
4528                                            vrf_id=prefix.vrf_id,
4529                                            is_add=0)
4530
4531     def tearDown(self):
4532         super(TestNAT64, self).tearDown()
4533         if not self.vpp_dead:
4534             self.logger.info(self.vapi.cli("show nat64 pool"))
4535             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4536             self.logger.info(self.vapi.cli("show nat64 prefix"))
4537             self.logger.info(self.vapi.cli("show nat64 bib all"))
4538             self.logger.info(self.vapi.cli("show nat64 session table all"))
4539             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4540             self.clear_nat64()
4541
4542
4543 class TestDSlite(MethodHolder):
4544     """ DS-Lite Test Cases """
4545
4546     @classmethod
4547     def setUpClass(cls):
4548         super(TestDSlite, cls).setUpClass()
4549
4550         try:
4551             cls.nat_addr = '10.0.0.3'
4552             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4553
4554             cls.create_pg_interfaces(range(2))
4555             cls.pg0.admin_up()
4556             cls.pg0.config_ip4()
4557             cls.pg0.resolve_arp()
4558             cls.pg1.admin_up()
4559             cls.pg1.config_ip6()
4560             cls.pg1.generate_remote_hosts(2)
4561             cls.pg1.configure_ipv6_neighbors()
4562
4563         except Exception:
4564             super(TestDSlite, cls).tearDownClass()
4565             raise
4566
4567     def test_dslite(self):
4568         """ Test DS-Lite """
4569         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4570                                                  self.nat_addr_n)
4571         aftr_ip4 = '192.0.0.1'
4572         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4573         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4574         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4575         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4576
4577         # UDP
4578         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4579              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4580              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4581              UDP(sport=20000, dport=10000))
4582         self.pg1.add_stream(p)
4583         self.pg_enable_capture(self.pg_interfaces)
4584         self.pg_start()
4585         capture = self.pg0.get_capture(1)
4586         capture = capture[0]
4587         self.assertFalse(capture.haslayer(IPv6))
4588         self.assertEqual(capture[IP].src, self.nat_addr)
4589         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4590         self.assertNotEqual(capture[UDP].sport, 20000)
4591         self.assertEqual(capture[UDP].dport, 10000)
4592         self.check_ip_checksum(capture)
4593         out_port = capture[UDP].sport
4594
4595         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4596              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4597              UDP(sport=10000, dport=out_port))
4598         self.pg0.add_stream(p)
4599         self.pg_enable_capture(self.pg_interfaces)
4600         self.pg_start()
4601         capture = self.pg1.get_capture(1)
4602         capture = capture[0]
4603         self.assertEqual(capture[IPv6].src, aftr_ip6)
4604         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4605         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4606         self.assertEqual(capture[IP].dst, '192.168.1.1')
4607         self.assertEqual(capture[UDP].sport, 10000)
4608         self.assertEqual(capture[UDP].dport, 20000)
4609         self.check_ip_checksum(capture)
4610
4611         # TCP
4612         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4613              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4614              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4615              TCP(sport=20001, dport=10001))
4616         self.pg1.add_stream(p)
4617         self.pg_enable_capture(self.pg_interfaces)
4618         self.pg_start()
4619         capture = self.pg0.get_capture(1)
4620         capture = capture[0]
4621         self.assertFalse(capture.haslayer(IPv6))
4622         self.assertEqual(capture[IP].src, self.nat_addr)
4623         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4624         self.assertNotEqual(capture[TCP].sport, 20001)
4625         self.assertEqual(capture[TCP].dport, 10001)
4626         self.check_ip_checksum(capture)
4627         self.check_tcp_checksum(capture)
4628         out_port = capture[TCP].sport
4629
4630         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4631              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4632              TCP(sport=10001, dport=out_port))
4633         self.pg0.add_stream(p)
4634         self.pg_enable_capture(self.pg_interfaces)
4635         self.pg_start()
4636         capture = self.pg1.get_capture(1)
4637         capture = capture[0]
4638         self.assertEqual(capture[IPv6].src, aftr_ip6)
4639         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4640         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4641         self.assertEqual(capture[IP].dst, '192.168.1.1')
4642         self.assertEqual(capture[TCP].sport, 10001)
4643         self.assertEqual(capture[TCP].dport, 20001)
4644         self.check_ip_checksum(capture)
4645         self.check_tcp_checksum(capture)
4646
4647         # ICMP
4648         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4649              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4650              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4651              ICMP(id=4000, type='echo-request'))
4652         self.pg1.add_stream(p)
4653         self.pg_enable_capture(self.pg_interfaces)
4654         self.pg_start()
4655         capture = self.pg0.get_capture(1)
4656         capture = capture[0]
4657         self.assertFalse(capture.haslayer(IPv6))
4658         self.assertEqual(capture[IP].src, self.nat_addr)
4659         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4660         self.assertNotEqual(capture[ICMP].id, 4000)
4661         self.check_ip_checksum(capture)
4662         self.check_icmp_checksum(capture)
4663         out_id = capture[ICMP].id
4664
4665         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4666              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4667              ICMP(id=out_id, type='echo-reply'))
4668         self.pg0.add_stream(p)
4669         self.pg_enable_capture(self.pg_interfaces)
4670         self.pg_start()
4671         capture = self.pg1.get_capture(1)
4672         capture = capture[0]
4673         self.assertEqual(capture[IPv6].src, aftr_ip6)
4674         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4675         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4676         self.assertEqual(capture[IP].dst, '192.168.1.1')
4677         self.assertEqual(capture[ICMP].id, 4000)
4678         self.check_ip_checksum(capture)
4679         self.check_icmp_checksum(capture)
4680
4681         # ping DS-Lite AFTR tunnel endpoint address
4682         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4683              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
4684              ICMPv6EchoRequest())
4685         self.pg1.add_stream(p)
4686         self.pg_enable_capture(self.pg_interfaces)
4687         self.pg_start()
4688         capture = self.pg1.get_capture(1)
4689         self.assertEqual(1, len(capture))
4690         capture = capture[0]
4691         self.assertEqual(capture[IPv6].src, aftr_ip6)
4692         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4693         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
4694
4695     def tearDown(self):
4696         super(TestDSlite, self).tearDown()
4697         if not self.vpp_dead:
4698             self.logger.info(self.vapi.cli("show dslite pool"))
4699             self.logger.info(
4700                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4701             self.logger.info(self.vapi.cli("show dslite sessions"))
4702
4703 if __name__ == '__main__':
4704     unittest.main(testRunner=VppTestRunner)