make test: automatically seed random generator
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
246         """
247         Create packet stream for outside network
248
249         :param out_if: Outside interface
250         :param dst_ip: Destination IP address (Default use global NAT address)
251         :param ttl: TTL of generated packets
252         """
253         if dst_ip is None:
254             dst_ip = self.nat_addr
255         pkts = []
256         # TCP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              TCP(dport=self.tcp_port_out, sport=20))
260         pkts.append(p)
261
262         # UDP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              UDP(dport=self.udp_port_out, sport=20))
266         pkts.append(p)
267
268         # ICMP
269         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271              ICMP(id=self.icmp_id_out, type='echo-reply'))
272         pkts.append(p)
273
274         return pkts
275
276     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277                            packet_num=3, dst_ip=None):
278         """
279         Verify captured packets on outside network
280
281         :param capture: Captured packets
282         :param nat_ip: Translated IP address (Default use global NAT address)
283         :param same_port: Sorce port number is not translated (Default False)
284         :param packet_num: Expected number of packets (Default 3)
285         :param dst_ip: Destination IP address (Default do not verify)
286         """
287         if nat_ip is None:
288             nat_ip = self.nat_addr
289         self.assertEqual(packet_num, len(capture))
290         for packet in capture:
291             try:
292                 self.check_ip_checksum(packet)
293                 self.assertEqual(packet[IP].src, nat_ip)
294                 if dst_ip is not None:
295                     self.assertEqual(packet[IP].dst, dst_ip)
296                 if packet.haslayer(TCP):
297                     if same_port:
298                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
299                     else:
300                         self.assertNotEqual(
301                             packet[TCP].sport, self.tcp_port_in)
302                     self.tcp_port_out = packet[TCP].sport
303                     self.check_tcp_checksum(packet)
304                 elif packet.haslayer(UDP):
305                     if same_port:
306                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
307                     else:
308                         self.assertNotEqual(
309                             packet[UDP].sport, self.udp_port_in)
310                     self.udp_port_out = packet[UDP].sport
311                 else:
312                     if same_port:
313                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
314                     else:
315                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316                     self.icmp_id_out = packet[ICMP].id
317                     self.check_icmp_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323     def verify_capture_in(self, capture, in_if, packet_num=3):
324         """
325         Verify captured packets on inside network
326
327         :param capture: Captured packets
328         :param in_if: Inside interface
329         :param packet_num: Expected number of packets (Default 3)
330         """
331         self.assertEqual(packet_num, len(capture))
332         for packet in capture:
333             try:
334                 self.check_ip_checksum(packet)
335                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336                 if packet.haslayer(TCP):
337                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338                     self.check_tcp_checksum(packet)
339                 elif packet.haslayer(UDP):
340                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
341                 else:
342                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343                     self.check_icmp_checksum(packet)
344             except:
345                 self.logger.error(ppp("Unexpected or invalid packet "
346                                       "(inside network):", packet))
347                 raise
348
349     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
350         """
351         Verify captured IPv6 packets on inside network
352
353         :param capture: Captured packets
354         :param src_ip: Source IP
355         :param dst_ip: Destination IP address
356         :param packet_num: Expected number of packets (Default 3)
357         """
358         self.assertEqual(packet_num, len(capture))
359         for packet in capture:
360             try:
361                 self.assertEqual(packet[IPv6].src, src_ip)
362                 self.assertEqual(packet[IPv6].dst, dst_ip)
363                 if packet.haslayer(TCP):
364                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365                     self.check_tcp_checksum(packet)
366                 elif packet.haslayer(UDP):
367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
368                     self.check_udp_checksum(packet)
369                 else:
370                     self.assertEqual(packet[ICMPv6EchoReply].id,
371                                      self.icmp_id_in)
372                     self.check_icmpv6_checksum(packet)
373             except:
374                 self.logger.error(ppp("Unexpected or invalid packet "
375                                       "(inside network):", packet))
376                 raise
377
378     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
379         """
380         Verify captured packet that don't have to be translated
381
382         :param capture: Captured packets
383         :param ingress_if: Ingress interface
384         :param egress_if: Egress interface
385         """
386         for packet in capture:
387             try:
388                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390                 if packet.haslayer(TCP):
391                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392                 elif packet.haslayer(UDP):
393                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
394                 else:
395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
396             except:
397                 self.logger.error(ppp("Unexpected or invalid packet "
398                                       "(inside network):", packet))
399                 raise
400
401     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402                                             packet_num=3, icmp_type=11):
403         """
404         Verify captured packets with ICMP errors on outside network
405
406         :param capture: Captured packets
407         :param src_ip: Translated IP address or IP address of VPP
408                        (Default use global NAT address)
409         :param packet_num: Expected number of packets (Default 3)
410         :param icmp_type: Type of error ICMP packet
411                           we are expecting (Default 11)
412         """
413         if src_ip is None:
414             src_ip = self.nat_addr
415         self.assertEqual(packet_num, len(capture))
416         for packet in capture:
417             try:
418                 self.assertEqual(packet[IP].src, src_ip)
419                 self.assertTrue(packet.haslayer(ICMP))
420                 icmp = packet[ICMP]
421                 self.assertEqual(icmp.type, icmp_type)
422                 self.assertTrue(icmp.haslayer(IPerror))
423                 inner_ip = icmp[IPerror]
424                 if inner_ip.haslayer(TCPerror):
425                     self.assertEqual(inner_ip[TCPerror].dport,
426                                      self.tcp_port_out)
427                 elif inner_ip.haslayer(UDPerror):
428                     self.assertEqual(inner_ip[UDPerror].dport,
429                                      self.udp_port_out)
430                 else:
431                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
432             except:
433                 self.logger.error(ppp("Unexpected or invalid packet "
434                                       "(outside network):", packet))
435                 raise
436
437     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
438                                            icmp_type=11):
439         """
440         Verify captured packets with ICMP errors on inside network
441
442         :param capture: Captured packets
443         :param in_if: Inside interface
444         :param packet_num: Expected number of packets (Default 3)
445         :param icmp_type: Type of error ICMP packet
446                           we are expecting (Default 11)
447         """
448         self.assertEqual(packet_num, len(capture))
449         for packet in capture:
450             try:
451                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452                 self.assertTrue(packet.haslayer(ICMP))
453                 icmp = packet[ICMP]
454                 self.assertEqual(icmp.type, icmp_type)
455                 self.assertTrue(icmp.haslayer(IPerror))
456                 inner_ip = icmp[IPerror]
457                 if inner_ip.haslayer(TCPerror):
458                     self.assertEqual(inner_ip[TCPerror].sport,
459                                      self.tcp_port_in)
460                 elif inner_ip.haslayer(UDPerror):
461                     self.assertEqual(inner_ip[UDPerror].sport,
462                                      self.udp_port_in)
463                 else:
464                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
465             except:
466                 self.logger.error(ppp("Unexpected or invalid packet "
467                                       "(inside network):", packet))
468                 raise
469
470     def create_stream_frag(self, src_if, dst, sport, dport, data):
471         """
472         Create fragmented packet stream
473
474         :param src_if: Source interface
475         :param dst: Destination IPv4 address
476         :param sport: Source TCP port
477         :param dport: Destination TCP port
478         :param data: Payload data
479         :returns: Fragmets
480         """
481         id = random.randint(0, 65535)
482         p = (IP(src=src_if.remote_ip4, dst=dst) /
483              TCP(sport=sport, dport=dport) /
484              Raw(data))
485         p = p.__class__(str(p))
486         chksum = p['TCP'].chksum
487         pkts = []
488         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490              TCP(sport=sport, dport=dport, chksum=chksum) /
491              Raw(data[0:4]))
492         pkts.append(p)
493         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495                 proto=IP_PROTOS.tcp) /
496              Raw(data[4:20]))
497         pkts.append(p)
498         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500                 id=id) /
501              Raw(data[20:]))
502         pkts.append(p)
503         return pkts
504
505     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506                                pref=None, plen=0, frag_size=128):
507         """
508         Create fragmented packet stream
509
510         :param src_if: Source interface
511         :param dst: Destination IPv4 address
512         :param sport: Source TCP port
513         :param dport: Destination TCP port
514         :param data: Payload data
515         :param pref: NAT64 prefix
516         :param plen: NAT64 prefix length
517         :param fragsize: size of fragments
518         :returns: Fragmets
519         """
520         if pref is None:
521             dst_ip6 = ''.join(['64:ff9b::', dst])
522         else:
523             dst_ip6 = self.compose_ip6(dst, pref, plen)
524
525         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528              TCP(sport=sport, dport=dport) /
529              Raw(data))
530
531         return fragment6(p, frag_size)
532
533     def reass_frags_and_verify(self, frags, src, dst):
534         """
535         Reassemble and verify fragmented packet
536
537         :param frags: Captured fragments
538         :param src: Source IPv4 address to verify
539         :param dst: Destination IPv4 address to verify
540
541         :returns: Reassembled IPv4 packet
542         """
543         buffer = StringIO.StringIO()
544         for p in frags:
545             self.assertEqual(p[IP].src, src)
546             self.assertEqual(p[IP].dst, dst)
547             self.check_ip_checksum(p)
548             buffer.seek(p[IP].frag * 8)
549             buffer.write(p[IP].payload)
550         ip = frags[0].getlayer(IP)
551         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552                 proto=frags[0][IP].proto)
553         if ip.proto == IP_PROTOS.tcp:
554             p = (ip / TCP(buffer.getvalue()))
555             self.check_tcp_checksum(p)
556         elif ip.proto == IP_PROTOS.udp:
557             p = (ip / UDP(buffer.getvalue()))
558         return p
559
560     def reass_frags_and_verify_ip6(self, frags, src, dst):
561         """
562         Reassemble and verify fragmented packet
563
564         :param frags: Captured fragments
565         :param src: Source IPv6 address to verify
566         :param dst: Destination IPv6 address to verify
567
568         :returns: Reassembled IPv6 packet
569         """
570         buffer = StringIO.StringIO()
571         for p in frags:
572             self.assertEqual(p[IPv6].src, src)
573             self.assertEqual(p[IPv6].dst, dst)
574             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575             buffer.write(p[IPv6ExtHdrFragment].payload)
576         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577                   nh=frags[0][IPv6ExtHdrFragment].nh)
578         if ip.nh == IP_PROTOS.tcp:
579             p = (ip / TCP(buffer.getvalue()))
580             self.check_tcp_checksum(p)
581         elif ip.nh == IP_PROTOS.udp:
582             p = (ip / UDP(buffer.getvalue()))
583         return p
584
585     def verify_ipfix_nat44_ses(self, data):
586         """
587         Verify IPFIX NAT44 session create/delete event
588
589         :param data: Decoded IPFIX data records
590         """
591         nat44_ses_create_num = 0
592         nat44_ses_delete_num = 0
593         self.assertEqual(6, len(data))
594         for record in data:
595             # natEvent
596             self.assertIn(ord(record[230]), [4, 5])
597             if ord(record[230]) == 4:
598                 nat44_ses_create_num += 1
599             else:
600                 nat44_ses_delete_num += 1
601             # sourceIPv4Address
602             self.assertEqual(self.pg0.remote_ip4n, record[8])
603             # postNATSourceIPv4Address
604             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
605                              record[225])
606             # ingressVRFID
607             self.assertEqual(struct.pack("!I", 0), record[234])
608             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609             if IP_PROTOS.icmp == ord(record[4]):
610                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
612                                  record[227])
613             elif IP_PROTOS.tcp == ord(record[4]):
614                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
615                                  record[7])
616                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
617                                  record[227])
618             elif IP_PROTOS.udp == ord(record[4]):
619                 self.assertEqual(struct.pack("!H", self.udp_port_in),
620                                  record[7])
621                 self.assertEqual(struct.pack("!H", self.udp_port_out),
622                                  record[227])
623             else:
624                 self.fail("Invalid protocol")
625         self.assertEqual(3, nat44_ses_create_num)
626         self.assertEqual(3, nat44_ses_delete_num)
627
628     def verify_ipfix_addr_exhausted(self, data):
629         """
630         Verify IPFIX NAT addresses event
631
632         :param data: Decoded IPFIX data records
633         """
634         self.assertEqual(1, len(data))
635         record = data[0]
636         # natEvent
637         self.assertEqual(ord(record[230]), 3)
638         # natPoolID
639         self.assertEqual(struct.pack("!I", 0), record[283])
640
641
642 class TestNAT44(MethodHolder):
643     """ NAT44 Test Cases """
644
645     @classmethod
646     def setUpClass(cls):
647         super(TestNAT44, cls).setUpClass()
648
649         try:
650             cls.tcp_port_in = 6303
651             cls.tcp_port_out = 6303
652             cls.udp_port_in = 6304
653             cls.udp_port_out = 6304
654             cls.icmp_id_in = 6305
655             cls.icmp_id_out = 6305
656             cls.nat_addr = '10.0.0.3'
657             cls.ipfix_src_port = 4739
658             cls.ipfix_domain_id = 1
659
660             cls.create_pg_interfaces(range(10))
661             cls.interfaces = list(cls.pg_interfaces[0:4])
662
663             for i in cls.interfaces:
664                 i.admin_up()
665                 i.config_ip4()
666                 i.resolve_arp()
667
668             cls.pg0.generate_remote_hosts(3)
669             cls.pg0.configure_ipv4_neighbors()
670
671             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672             cls.vapi.ip_table_add_del(10, is_add=1)
673             cls.vapi.ip_table_add_del(20, is_add=1)
674
675             cls.pg4._local_ip4 = "172.16.255.1"
676             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678             cls.pg4.set_table_ip4(10)
679             cls.pg5._local_ip4 = "172.17.255.3"
680             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682             cls.pg5.set_table_ip4(10)
683             cls.pg6._local_ip4 = "172.16.255.1"
684             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686             cls.pg6.set_table_ip4(20)
687             for i in cls.overlapping_interfaces:
688                 i.config_ip4()
689                 i.admin_up()
690                 i.resolve_arp()
691
692             cls.pg7.admin_up()
693             cls.pg8.admin_up()
694
695             cls.pg9.generate_remote_hosts(2)
696             cls.pg9.config_ip4()
697             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
699                                                   ip_addr_n,
700                                                   24)
701             cls.pg9.admin_up()
702             cls.pg9.resolve_arp()
703             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705             cls.pg9.resolve_arp()
706
707         except Exception:
708             super(TestNAT44, cls).tearDownClass()
709             raise
710
711     def clear_nat44(self):
712         """
713         Clear NAT44 configuration.
714         """
715         # I found no elegant way to do this
716         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717                                    dst_address_length=32,
718                                    next_hop_address=self.pg7.remote_ip4n,
719                                    next_hop_sw_if_index=self.pg7.sw_if_index,
720                                    is_add=0)
721         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722                                    dst_address_length=32,
723                                    next_hop_address=self.pg8.remote_ip4n,
724                                    next_hop_sw_if_index=self.pg8.sw_if_index,
725                                    is_add=0)
726
727         for intf in [self.pg7, self.pg8]:
728             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
729             for n in neighbors:
730                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
731                                               n.mac_address,
732                                               n.ip_address,
733                                               is_add=0)
734
735         if self.pg7.has_ip4_config:
736             self.pg7.unconfig_ip4()
737
738         interfaces = self.vapi.nat44_interface_addr_dump()
739         for intf in interfaces:
740             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
741
742         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
743                             domain_id=self.ipfix_domain_id)
744         self.ipfix_src_port = 4739
745         self.ipfix_domain_id = 1
746
747         interfaces = self.vapi.nat44_interface_dump()
748         for intf in interfaces:
749             if intf.is_inside > 1:
750                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
751                                                           0,
752                                                           is_add=0)
753             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
754                                                       intf.is_inside,
755                                                       is_add=0)
756
757         interfaces = self.vapi.nat44_interface_output_feature_dump()
758         for intf in interfaces:
759             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
760                                                              intf.is_inside,
761                                                              is_add=0)
762
763         static_mappings = self.vapi.nat44_static_mapping_dump()
764         for sm in static_mappings:
765             self.vapi.nat44_add_del_static_mapping(
766                 sm.local_ip_address,
767                 sm.external_ip_address,
768                 local_port=sm.local_port,
769                 external_port=sm.external_port,
770                 addr_only=sm.addr_only,
771                 vrf_id=sm.vrf_id,
772                 protocol=sm.protocol,
773                 is_add=0)
774
775         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
776         for lb_sm in lb_static_mappings:
777             self.vapi.nat44_add_del_lb_static_mapping(
778                 lb_sm.external_addr,
779                 lb_sm.external_port,
780                 lb_sm.protocol,
781                 lb_sm.vrf_id,
782                 is_add=0,
783                 local_num=0,
784                 locals=[])
785
786         adresses = self.vapi.nat44_address_dump()
787         for addr in adresses:
788             self.vapi.nat44_add_del_address_range(addr.ip_address,
789                                                   addr.ip_address,
790                                                   is_add=0)
791
792         self.vapi.nat_set_reass()
793         self.vapi.nat_set_reass(is_ip6=1)
794
795     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
796                                  local_port=0, external_port=0, vrf_id=0,
797                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
798                                  proto=0):
799         """
800         Add/delete NAT44 static mapping
801
802         :param local_ip: Local IP address
803         :param external_ip: External IP address
804         :param local_port: Local port number (Optional)
805         :param external_port: External port number (Optional)
806         :param vrf_id: VRF ID (Default 0)
807         :param is_add: 1 if add, 0 if delete (Default add)
808         :param external_sw_if_index: External interface instead of IP address
809         :param proto: IP protocol (Mandatory if port specified)
810         """
811         addr_only = 1
812         if local_port and external_port:
813             addr_only = 0
814         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
815         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
816         self.vapi.nat44_add_del_static_mapping(
817             l_ip,
818             e_ip,
819             external_sw_if_index,
820             local_port,
821             external_port,
822             addr_only,
823             vrf_id,
824             proto,
825             is_add)
826
827     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
828         """
829         Add/delete NAT44 address
830
831         :param ip: IP address
832         :param is_add: 1 if add, 0 if delete (Default add)
833         """
834         nat_addr = socket.inet_pton(socket.AF_INET, ip)
835         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
836                                               vrf_id=vrf_id)
837
838     def test_dynamic(self):
839         """ NAT44 dynamic translation test """
840
841         self.nat44_add_address(self.nat_addr)
842         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
843         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
844                                                   is_inside=0)
845
846         # in2out
847         pkts = self.create_stream_in(self.pg0, self.pg1)
848         self.pg0.add_stream(pkts)
849         self.pg_enable_capture(self.pg_interfaces)
850         self.pg_start()
851         capture = self.pg1.get_capture(len(pkts))
852         self.verify_capture_out(capture)
853
854         # out2in
855         pkts = self.create_stream_out(self.pg1)
856         self.pg1.add_stream(pkts)
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pg_start()
859         capture = self.pg0.get_capture(len(pkts))
860         self.verify_capture_in(capture, self.pg0)
861
862     def test_dynamic_icmp_errors_in2out_ttl_1(self):
863         """ NAT44 handling of client packets with TTL=1 """
864
865         self.nat44_add_address(self.nat_addr)
866         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
867         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
868                                                   is_inside=0)
869
870         # Client side - generate traffic
871         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
872         self.pg0.add_stream(pkts)
873         self.pg_enable_capture(self.pg_interfaces)
874         self.pg_start()
875
876         # Client side - verify ICMP type 11 packets
877         capture = self.pg0.get_capture(len(pkts))
878         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
879
880     def test_dynamic_icmp_errors_out2in_ttl_1(self):
881         """ NAT44 handling of server packets with TTL=1 """
882
883         self.nat44_add_address(self.nat_addr)
884         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
885         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
886                                                   is_inside=0)
887
888         # Client side - create sessions
889         pkts = self.create_stream_in(self.pg0, self.pg1)
890         self.pg0.add_stream(pkts)
891         self.pg_enable_capture(self.pg_interfaces)
892         self.pg_start()
893
894         # Server side - generate traffic
895         capture = self.pg1.get_capture(len(pkts))
896         self.verify_capture_out(capture)
897         pkts = self.create_stream_out(self.pg1, ttl=1)
898         self.pg1.add_stream(pkts)
899         self.pg_enable_capture(self.pg_interfaces)
900         self.pg_start()
901
902         # Server side - verify ICMP type 11 packets
903         capture = self.pg1.get_capture(len(pkts))
904         self.verify_capture_out_with_icmp_errors(capture,
905                                                  src_ip=self.pg1.local_ip4)
906
907     def test_dynamic_icmp_errors_in2out_ttl_2(self):
908         """ NAT44 handling of error responses to client packets with TTL=2 """
909
910         self.nat44_add_address(self.nat_addr)
911         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
912         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
913                                                   is_inside=0)
914
915         # Client side - generate traffic
916         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
917         self.pg0.add_stream(pkts)
918         self.pg_enable_capture(self.pg_interfaces)
919         self.pg_start()
920
921         # Server side - simulate ICMP type 11 response
922         capture = self.pg1.get_capture(len(pkts))
923         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
924                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
925                 ICMP(type=11) / packet[IP] for packet in capture]
926         self.pg1.add_stream(pkts)
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929
930         # Client side - verify ICMP type 11 packets
931         capture = self.pg0.get_capture(len(pkts))
932         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
933
934     def test_dynamic_icmp_errors_out2in_ttl_2(self):
935         """ NAT44 handling of error responses to server packets with TTL=2 """
936
937         self.nat44_add_address(self.nat_addr)
938         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
939         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
940                                                   is_inside=0)
941
942         # Client side - create sessions
943         pkts = self.create_stream_in(self.pg0, self.pg1)
944         self.pg0.add_stream(pkts)
945         self.pg_enable_capture(self.pg_interfaces)
946         self.pg_start()
947
948         # Server side - generate traffic
949         capture = self.pg1.get_capture(len(pkts))
950         self.verify_capture_out(capture)
951         pkts = self.create_stream_out(self.pg1, ttl=2)
952         self.pg1.add_stream(pkts)
953         self.pg_enable_capture(self.pg_interfaces)
954         self.pg_start()
955
956         # Client side - simulate ICMP type 11 response
957         capture = self.pg0.get_capture(len(pkts))
958         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
959                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
960                 ICMP(type=11) / packet[IP] for packet in capture]
961         self.pg0.add_stream(pkts)
962         self.pg_enable_capture(self.pg_interfaces)
963         self.pg_start()
964
965         # Server side - verify ICMP type 11 packets
966         capture = self.pg1.get_capture(len(pkts))
967         self.verify_capture_out_with_icmp_errors(capture)
968
969     def test_ping_out_interface_from_outside(self):
970         """ Ping NAT44 out interface from outside network """
971
972         self.nat44_add_address(self.nat_addr)
973         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
974         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
975                                                   is_inside=0)
976
977         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
978              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
979              ICMP(id=self.icmp_id_out, type='echo-request'))
980         pkts = [p]
981         self.pg1.add_stream(pkts)
982         self.pg_enable_capture(self.pg_interfaces)
983         self.pg_start()
984         capture = self.pg1.get_capture(len(pkts))
985         self.assertEqual(1, len(capture))
986         packet = capture[0]
987         try:
988             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
989             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
990             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
991             self.assertEqual(packet[ICMP].type, 0)  # echo reply
992         except:
993             self.logger.error(ppp("Unexpected or invalid packet "
994                                   "(outside network):", packet))
995             raise
996
997     def test_ping_internal_host_from_outside(self):
998         """ Ping internal host from outside network """
999
1000         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1001         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1002         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1003                                                   is_inside=0)
1004
1005         # out2in
1006         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1007                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1008                ICMP(id=self.icmp_id_out, type='echo-request'))
1009         self.pg1.add_stream(pkt)
1010         self.pg_enable_capture(self.pg_interfaces)
1011         self.pg_start()
1012         capture = self.pg0.get_capture(1)
1013         self.verify_capture_in(capture, self.pg0, packet_num=1)
1014         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1015
1016         # in2out
1017         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1018                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1019                ICMP(id=self.icmp_id_in, type='echo-reply'))
1020         self.pg0.add_stream(pkt)
1021         self.pg_enable_capture(self.pg_interfaces)
1022         self.pg_start()
1023         capture = self.pg1.get_capture(1)
1024         self.verify_capture_out(capture, same_port=True, packet_num=1)
1025         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1026
1027     def test_static_in(self):
1028         """ 1:1 NAT initialized from inside network """
1029
1030         nat_ip = "10.0.0.10"
1031         self.tcp_port_out = 6303
1032         self.udp_port_out = 6304
1033         self.icmp_id_out = 6305
1034
1035         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1036         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1037         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1038                                                   is_inside=0)
1039
1040         # in2out
1041         pkts = self.create_stream_in(self.pg0, self.pg1)
1042         self.pg0.add_stream(pkts)
1043         self.pg_enable_capture(self.pg_interfaces)
1044         self.pg_start()
1045         capture = self.pg1.get_capture(len(pkts))
1046         self.verify_capture_out(capture, nat_ip, True)
1047
1048         # out2in
1049         pkts = self.create_stream_out(self.pg1, nat_ip)
1050         self.pg1.add_stream(pkts)
1051         self.pg_enable_capture(self.pg_interfaces)
1052         self.pg_start()
1053         capture = self.pg0.get_capture(len(pkts))
1054         self.verify_capture_in(capture, self.pg0)
1055
1056     def test_static_out(self):
1057         """ 1:1 NAT initialized from outside network """
1058
1059         nat_ip = "10.0.0.20"
1060         self.tcp_port_out = 6303
1061         self.udp_port_out = 6304
1062         self.icmp_id_out = 6305
1063
1064         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1065         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1066         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1067                                                   is_inside=0)
1068
1069         # out2in
1070         pkts = self.create_stream_out(self.pg1, nat_ip)
1071         self.pg1.add_stream(pkts)
1072         self.pg_enable_capture(self.pg_interfaces)
1073         self.pg_start()
1074         capture = self.pg0.get_capture(len(pkts))
1075         self.verify_capture_in(capture, self.pg0)
1076
1077         # in2out
1078         pkts = self.create_stream_in(self.pg0, self.pg1)
1079         self.pg0.add_stream(pkts)
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pg_start()
1082         capture = self.pg1.get_capture(len(pkts))
1083         self.verify_capture_out(capture, nat_ip, True)
1084
1085     def test_static_with_port_in(self):
1086         """ 1:1 NAPT initialized from inside network """
1087
1088         self.tcp_port_out = 3606
1089         self.udp_port_out = 3607
1090         self.icmp_id_out = 3608
1091
1092         self.nat44_add_address(self.nat_addr)
1093         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1094                                       self.tcp_port_in, self.tcp_port_out,
1095                                       proto=IP_PROTOS.tcp)
1096         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1097                                       self.udp_port_in, self.udp_port_out,
1098                                       proto=IP_PROTOS.udp)
1099         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1100                                       self.icmp_id_in, self.icmp_id_out,
1101                                       proto=IP_PROTOS.icmp)
1102         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1103         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1104                                                   is_inside=0)
1105
1106         # in2out
1107         pkts = self.create_stream_in(self.pg0, self.pg1)
1108         self.pg0.add_stream(pkts)
1109         self.pg_enable_capture(self.pg_interfaces)
1110         self.pg_start()
1111         capture = self.pg1.get_capture(len(pkts))
1112         self.verify_capture_out(capture)
1113
1114         # out2in
1115         pkts = self.create_stream_out(self.pg1)
1116         self.pg1.add_stream(pkts)
1117         self.pg_enable_capture(self.pg_interfaces)
1118         self.pg_start()
1119         capture = self.pg0.get_capture(len(pkts))
1120         self.verify_capture_in(capture, self.pg0)
1121
1122     def test_static_with_port_out(self):
1123         """ 1:1 NAPT initialized from outside network """
1124
1125         self.tcp_port_out = 30606
1126         self.udp_port_out = 30607
1127         self.icmp_id_out = 30608
1128
1129         self.nat44_add_address(self.nat_addr)
1130         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1131                                       self.tcp_port_in, self.tcp_port_out,
1132                                       proto=IP_PROTOS.tcp)
1133         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1134                                       self.udp_port_in, self.udp_port_out,
1135                                       proto=IP_PROTOS.udp)
1136         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1137                                       self.icmp_id_in, self.icmp_id_out,
1138                                       proto=IP_PROTOS.icmp)
1139         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1141                                                   is_inside=0)
1142
1143         # out2in
1144         pkts = self.create_stream_out(self.pg1)
1145         self.pg1.add_stream(pkts)
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148         capture = self.pg0.get_capture(len(pkts))
1149         self.verify_capture_in(capture, self.pg0)
1150
1151         # in2out
1152         pkts = self.create_stream_in(self.pg0, self.pg1)
1153         self.pg0.add_stream(pkts)
1154         self.pg_enable_capture(self.pg_interfaces)
1155         self.pg_start()
1156         capture = self.pg1.get_capture(len(pkts))
1157         self.verify_capture_out(capture)
1158
1159     def test_static_vrf_aware(self):
1160         """ 1:1 NAT VRF awareness """
1161
1162         nat_ip1 = "10.0.0.30"
1163         nat_ip2 = "10.0.0.40"
1164         self.tcp_port_out = 6303
1165         self.udp_port_out = 6304
1166         self.icmp_id_out = 6305
1167
1168         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1169                                       vrf_id=10)
1170         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1171                                       vrf_id=10)
1172         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1173                                                   is_inside=0)
1174         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1175         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1176
1177         # inside interface VRF match NAT44 static mapping VRF
1178         pkts = self.create_stream_in(self.pg4, self.pg3)
1179         self.pg4.add_stream(pkts)
1180         self.pg_enable_capture(self.pg_interfaces)
1181         self.pg_start()
1182         capture = self.pg3.get_capture(len(pkts))
1183         self.verify_capture_out(capture, nat_ip1, True)
1184
1185         # inside interface VRF don't match NAT44 static mapping VRF (packets
1186         # are dropped)
1187         pkts = self.create_stream_in(self.pg0, self.pg3)
1188         self.pg0.add_stream(pkts)
1189         self.pg_enable_capture(self.pg_interfaces)
1190         self.pg_start()
1191         self.pg3.assert_nothing_captured()
1192
1193     def test_static_lb(self):
1194         """ NAT44 local service load balancing """
1195         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1196         external_port = 80
1197         local_port = 8080
1198         server1 = self.pg0.remote_hosts[0]
1199         server2 = self.pg0.remote_hosts[1]
1200
1201         locals = [{'addr': server1.ip4n,
1202                    'port': local_port,
1203                    'probability': 70},
1204                   {'addr': server2.ip4n,
1205                    'port': local_port,
1206                    'probability': 30}]
1207
1208         self.nat44_add_address(self.nat_addr)
1209         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1210                                                   external_port,
1211                                                   IP_PROTOS.tcp,
1212                                                   local_num=len(locals),
1213                                                   locals=locals)
1214         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1215         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1216                                                   is_inside=0)
1217
1218         # from client to service
1219         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1220              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1221              TCP(sport=12345, dport=external_port))
1222         self.pg1.add_stream(p)
1223         self.pg_enable_capture(self.pg_interfaces)
1224         self.pg_start()
1225         capture = self.pg0.get_capture(1)
1226         p = capture[0]
1227         server = None
1228         try:
1229             ip = p[IP]
1230             tcp = p[TCP]
1231             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1232             if ip.dst == server1.ip4:
1233                 server = server1
1234             else:
1235                 server = server2
1236             self.assertEqual(tcp.dport, local_port)
1237             self.check_tcp_checksum(p)
1238             self.check_ip_checksum(p)
1239         except:
1240             self.logger.error(ppp("Unexpected or invalid packet:", p))
1241             raise
1242
1243         # from service back to client
1244         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1245              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1246              TCP(sport=local_port, dport=12345))
1247         self.pg0.add_stream(p)
1248         self.pg_enable_capture(self.pg_interfaces)
1249         self.pg_start()
1250         capture = self.pg1.get_capture(1)
1251         p = capture[0]
1252         try:
1253             ip = p[IP]
1254             tcp = p[TCP]
1255             self.assertEqual(ip.src, self.nat_addr)
1256             self.assertEqual(tcp.sport, external_port)
1257             self.check_tcp_checksum(p)
1258             self.check_ip_checksum(p)
1259         except:
1260             self.logger.error(ppp("Unexpected or invalid packet:", p))
1261             raise
1262
1263         # multiple clients
1264         server1_n = 0
1265         server2_n = 0
1266         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1267         pkts = []
1268         for client in clients:
1269             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1270                  IP(src=client, dst=self.nat_addr) /
1271                  TCP(sport=12345, dport=external_port))
1272             pkts.append(p)
1273         self.pg1.add_stream(pkts)
1274         self.pg_enable_capture(self.pg_interfaces)
1275         self.pg_start()
1276         capture = self.pg0.get_capture(len(pkts))
1277         for p in capture:
1278             if p[IP].dst == server1.ip4:
1279                 server1_n += 1
1280             else:
1281                 server2_n += 1
1282         self.assertTrue(server1_n > server2_n)
1283
1284     def test_multiple_inside_interfaces(self):
1285         """ NAT44 multiple non-overlapping address space inside interfaces """
1286
1287         self.nat44_add_address(self.nat_addr)
1288         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1289         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1290         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1291                                                   is_inside=0)
1292
1293         # between two NAT44 inside interfaces (no translation)
1294         pkts = self.create_stream_in(self.pg0, self.pg1)
1295         self.pg0.add_stream(pkts)
1296         self.pg_enable_capture(self.pg_interfaces)
1297         self.pg_start()
1298         capture = self.pg1.get_capture(len(pkts))
1299         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1300
1301         # from NAT44 inside to interface without NAT44 feature (no translation)
1302         pkts = self.create_stream_in(self.pg0, self.pg2)
1303         self.pg0.add_stream(pkts)
1304         self.pg_enable_capture(self.pg_interfaces)
1305         self.pg_start()
1306         capture = self.pg2.get_capture(len(pkts))
1307         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1308
1309         # in2out 1st interface
1310         pkts = self.create_stream_in(self.pg0, self.pg3)
1311         self.pg0.add_stream(pkts)
1312         self.pg_enable_capture(self.pg_interfaces)
1313         self.pg_start()
1314         capture = self.pg3.get_capture(len(pkts))
1315         self.verify_capture_out(capture)
1316
1317         # out2in 1st interface
1318         pkts = self.create_stream_out(self.pg3)
1319         self.pg3.add_stream(pkts)
1320         self.pg_enable_capture(self.pg_interfaces)
1321         self.pg_start()
1322         capture = self.pg0.get_capture(len(pkts))
1323         self.verify_capture_in(capture, self.pg0)
1324
1325         # in2out 2nd interface
1326         pkts = self.create_stream_in(self.pg1, self.pg3)
1327         self.pg1.add_stream(pkts)
1328         self.pg_enable_capture(self.pg_interfaces)
1329         self.pg_start()
1330         capture = self.pg3.get_capture(len(pkts))
1331         self.verify_capture_out(capture)
1332
1333         # out2in 2nd interface
1334         pkts = self.create_stream_out(self.pg3)
1335         self.pg3.add_stream(pkts)
1336         self.pg_enable_capture(self.pg_interfaces)
1337         self.pg_start()
1338         capture = self.pg1.get_capture(len(pkts))
1339         self.verify_capture_in(capture, self.pg1)
1340
1341     def test_inside_overlapping_interfaces(self):
1342         """ NAT44 multiple inside interfaces with overlapping address space """
1343
1344         static_nat_ip = "10.0.0.10"
1345         self.nat44_add_address(self.nat_addr)
1346         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1347                                                   is_inside=0)
1348         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1349         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1350         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1351         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1352                                       vrf_id=20)
1353
1354         # between NAT44 inside interfaces with same VRF (no translation)
1355         pkts = self.create_stream_in(self.pg4, self.pg5)
1356         self.pg4.add_stream(pkts)
1357         self.pg_enable_capture(self.pg_interfaces)
1358         self.pg_start()
1359         capture = self.pg5.get_capture(len(pkts))
1360         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1361
1362         # between NAT44 inside interfaces with different VRF (hairpinning)
1363         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1364              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1365              TCP(sport=1234, dport=5678))
1366         self.pg4.add_stream(p)
1367         self.pg_enable_capture(self.pg_interfaces)
1368         self.pg_start()
1369         capture = self.pg6.get_capture(1)
1370         p = capture[0]
1371         try:
1372             ip = p[IP]
1373             tcp = p[TCP]
1374             self.assertEqual(ip.src, self.nat_addr)
1375             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1376             self.assertNotEqual(tcp.sport, 1234)
1377             self.assertEqual(tcp.dport, 5678)
1378         except:
1379             self.logger.error(ppp("Unexpected or invalid packet:", p))
1380             raise
1381
1382         # in2out 1st interface
1383         pkts = self.create_stream_in(self.pg4, self.pg3)
1384         self.pg4.add_stream(pkts)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         capture = self.pg3.get_capture(len(pkts))
1388         self.verify_capture_out(capture)
1389
1390         # out2in 1st interface
1391         pkts = self.create_stream_out(self.pg3)
1392         self.pg3.add_stream(pkts)
1393         self.pg_enable_capture(self.pg_interfaces)
1394         self.pg_start()
1395         capture = self.pg4.get_capture(len(pkts))
1396         self.verify_capture_in(capture, self.pg4)
1397
1398         # in2out 2nd interface
1399         pkts = self.create_stream_in(self.pg5, self.pg3)
1400         self.pg5.add_stream(pkts)
1401         self.pg_enable_capture(self.pg_interfaces)
1402         self.pg_start()
1403         capture = self.pg3.get_capture(len(pkts))
1404         self.verify_capture_out(capture)
1405
1406         # out2in 2nd interface
1407         pkts = self.create_stream_out(self.pg3)
1408         self.pg3.add_stream(pkts)
1409         self.pg_enable_capture(self.pg_interfaces)
1410         self.pg_start()
1411         capture = self.pg5.get_capture(len(pkts))
1412         self.verify_capture_in(capture, self.pg5)
1413
1414         # pg5 session dump
1415         addresses = self.vapi.nat44_address_dump()
1416         self.assertEqual(len(addresses), 1)
1417         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1418         self.assertEqual(len(sessions), 3)
1419         for session in sessions:
1420             self.assertFalse(session.is_static)
1421             self.assertEqual(session.inside_ip_address[0:4],
1422                              self.pg5.remote_ip4n)
1423             self.assertEqual(session.outside_ip_address,
1424                              addresses[0].ip_address)
1425         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1426         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1427         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1428         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1429         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1430         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1431         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1432         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1433         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1434
1435         # in2out 3rd interface
1436         pkts = self.create_stream_in(self.pg6, self.pg3)
1437         self.pg6.add_stream(pkts)
1438         self.pg_enable_capture(self.pg_interfaces)
1439         self.pg_start()
1440         capture = self.pg3.get_capture(len(pkts))
1441         self.verify_capture_out(capture, static_nat_ip, True)
1442
1443         # out2in 3rd interface
1444         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1445         self.pg3.add_stream(pkts)
1446         self.pg_enable_capture(self.pg_interfaces)
1447         self.pg_start()
1448         capture = self.pg6.get_capture(len(pkts))
1449         self.verify_capture_in(capture, self.pg6)
1450
1451         # general user and session dump verifications
1452         users = self.vapi.nat44_user_dump()
1453         self.assertTrue(len(users) >= 3)
1454         addresses = self.vapi.nat44_address_dump()
1455         self.assertEqual(len(addresses), 1)
1456         for user in users:
1457             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1458                                                          user.vrf_id)
1459             for session in sessions:
1460                 self.assertEqual(user.ip_address, session.inside_ip_address)
1461                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1462                 self.assertTrue(session.protocol in
1463                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1464                                  IP_PROTOS.icmp])
1465
1466         # pg4 session dump
1467         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1468         self.assertTrue(len(sessions) >= 4)
1469         for session in sessions:
1470             self.assertFalse(session.is_static)
1471             self.assertEqual(session.inside_ip_address[0:4],
1472                              self.pg4.remote_ip4n)
1473             self.assertEqual(session.outside_ip_address,
1474                              addresses[0].ip_address)
1475
1476         # pg6 session dump
1477         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1478         self.assertTrue(len(sessions) >= 3)
1479         for session in sessions:
1480             self.assertTrue(session.is_static)
1481             self.assertEqual(session.inside_ip_address[0:4],
1482                              self.pg6.remote_ip4n)
1483             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1484                              map(int, static_nat_ip.split('.')))
1485             self.assertTrue(session.inside_port in
1486                             [self.tcp_port_in, self.udp_port_in,
1487                              self.icmp_id_in])
1488
1489     def test_hairpinning(self):
1490         """ NAT44 hairpinning - 1:1 NAPT """
1491
1492         host = self.pg0.remote_hosts[0]
1493         server = self.pg0.remote_hosts[1]
1494         host_in_port = 1234
1495         host_out_port = 0
1496         server_in_port = 5678
1497         server_out_port = 8765
1498
1499         self.nat44_add_address(self.nat_addr)
1500         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1501         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1502                                                   is_inside=0)
1503         # add static mapping for server
1504         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1505                                       server_in_port, server_out_port,
1506                                       proto=IP_PROTOS.tcp)
1507
1508         # send packet from host to server
1509         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1510              IP(src=host.ip4, dst=self.nat_addr) /
1511              TCP(sport=host_in_port, dport=server_out_port))
1512         self.pg0.add_stream(p)
1513         self.pg_enable_capture(self.pg_interfaces)
1514         self.pg_start()
1515         capture = self.pg0.get_capture(1)
1516         p = capture[0]
1517         try:
1518             ip = p[IP]
1519             tcp = p[TCP]
1520             self.assertEqual(ip.src, self.nat_addr)
1521             self.assertEqual(ip.dst, server.ip4)
1522             self.assertNotEqual(tcp.sport, host_in_port)
1523             self.assertEqual(tcp.dport, server_in_port)
1524             self.check_tcp_checksum(p)
1525             host_out_port = tcp.sport
1526         except:
1527             self.logger.error(ppp("Unexpected or invalid packet:", p))
1528             raise
1529
1530         # send reply from server to host
1531         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1532              IP(src=server.ip4, dst=self.nat_addr) /
1533              TCP(sport=server_in_port, dport=host_out_port))
1534         self.pg0.add_stream(p)
1535         self.pg_enable_capture(self.pg_interfaces)
1536         self.pg_start()
1537         capture = self.pg0.get_capture(1)
1538         p = capture[0]
1539         try:
1540             ip = p[IP]
1541             tcp = p[TCP]
1542             self.assertEqual(ip.src, self.nat_addr)
1543             self.assertEqual(ip.dst, host.ip4)
1544             self.assertEqual(tcp.sport, server_out_port)
1545             self.assertEqual(tcp.dport, host_in_port)
1546             self.check_tcp_checksum(p)
1547         except:
1548             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1549             raise
1550
1551     def test_hairpinning2(self):
1552         """ NAT44 hairpinning - 1:1 NAT"""
1553
1554         server1_nat_ip = "10.0.0.10"
1555         server2_nat_ip = "10.0.0.11"
1556         host = self.pg0.remote_hosts[0]
1557         server1 = self.pg0.remote_hosts[1]
1558         server2 = self.pg0.remote_hosts[2]
1559         server_tcp_port = 22
1560         server_udp_port = 20
1561
1562         self.nat44_add_address(self.nat_addr)
1563         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1564         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1565                                                   is_inside=0)
1566
1567         # add static mapping for servers
1568         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1569         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1570
1571         # host to server1
1572         pkts = []
1573         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1574              IP(src=host.ip4, dst=server1_nat_ip) /
1575              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1576         pkts.append(p)
1577         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1578              IP(src=host.ip4, dst=server1_nat_ip) /
1579              UDP(sport=self.udp_port_in, dport=server_udp_port))
1580         pkts.append(p)
1581         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1582              IP(src=host.ip4, dst=server1_nat_ip) /
1583              ICMP(id=self.icmp_id_in, type='echo-request'))
1584         pkts.append(p)
1585         self.pg0.add_stream(pkts)
1586         self.pg_enable_capture(self.pg_interfaces)
1587         self.pg_start()
1588         capture = self.pg0.get_capture(len(pkts))
1589         for packet in capture:
1590             try:
1591                 self.assertEqual(packet[IP].src, self.nat_addr)
1592                 self.assertEqual(packet[IP].dst, server1.ip4)
1593                 if packet.haslayer(TCP):
1594                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1595                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1596                     self.tcp_port_out = packet[TCP].sport
1597                     self.check_tcp_checksum(packet)
1598                 elif packet.haslayer(UDP):
1599                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1600                     self.assertEqual(packet[UDP].dport, server_udp_port)
1601                     self.udp_port_out = packet[UDP].sport
1602                 else:
1603                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1604                     self.icmp_id_out = packet[ICMP].id
1605             except:
1606                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1607                 raise
1608
1609         # server1 to host
1610         pkts = []
1611         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1612              IP(src=server1.ip4, dst=self.nat_addr) /
1613              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1614         pkts.append(p)
1615         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1616              IP(src=server1.ip4, dst=self.nat_addr) /
1617              UDP(sport=server_udp_port, dport=self.udp_port_out))
1618         pkts.append(p)
1619         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1620              IP(src=server1.ip4, dst=self.nat_addr) /
1621              ICMP(id=self.icmp_id_out, type='echo-reply'))
1622         pkts.append(p)
1623         self.pg0.add_stream(pkts)
1624         self.pg_enable_capture(self.pg_interfaces)
1625         self.pg_start()
1626         capture = self.pg0.get_capture(len(pkts))
1627         for packet in capture:
1628             try:
1629                 self.assertEqual(packet[IP].src, server1_nat_ip)
1630                 self.assertEqual(packet[IP].dst, host.ip4)
1631                 if packet.haslayer(TCP):
1632                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1633                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1634                     self.check_tcp_checksum(packet)
1635                 elif packet.haslayer(UDP):
1636                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1637                     self.assertEqual(packet[UDP].sport, server_udp_port)
1638                 else:
1639                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1640             except:
1641                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1642                 raise
1643
1644         # server2 to server1
1645         pkts = []
1646         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1647              IP(src=server2.ip4, dst=server1_nat_ip) /
1648              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1649         pkts.append(p)
1650         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1651              IP(src=server2.ip4, dst=server1_nat_ip) /
1652              UDP(sport=self.udp_port_in, dport=server_udp_port))
1653         pkts.append(p)
1654         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1655              IP(src=server2.ip4, dst=server1_nat_ip) /
1656              ICMP(id=self.icmp_id_in, type='echo-request'))
1657         pkts.append(p)
1658         self.pg0.add_stream(pkts)
1659         self.pg_enable_capture(self.pg_interfaces)
1660         self.pg_start()
1661         capture = self.pg0.get_capture(len(pkts))
1662         for packet in capture:
1663             try:
1664                 self.assertEqual(packet[IP].src, server2_nat_ip)
1665                 self.assertEqual(packet[IP].dst, server1.ip4)
1666                 if packet.haslayer(TCP):
1667                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1668                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1669                     self.tcp_port_out = packet[TCP].sport
1670                     self.check_tcp_checksum(packet)
1671                 elif packet.haslayer(UDP):
1672                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1673                     self.assertEqual(packet[UDP].dport, server_udp_port)
1674                     self.udp_port_out = packet[UDP].sport
1675                 else:
1676                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1677                     self.icmp_id_out = packet[ICMP].id
1678             except:
1679                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1680                 raise
1681
1682         # server1 to server2
1683         pkts = []
1684         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1685              IP(src=server1.ip4, dst=server2_nat_ip) /
1686              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1687         pkts.append(p)
1688         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1689              IP(src=server1.ip4, dst=server2_nat_ip) /
1690              UDP(sport=server_udp_port, dport=self.udp_port_out))
1691         pkts.append(p)
1692         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1693              IP(src=server1.ip4, dst=server2_nat_ip) /
1694              ICMP(id=self.icmp_id_out, type='echo-reply'))
1695         pkts.append(p)
1696         self.pg0.add_stream(pkts)
1697         self.pg_enable_capture(self.pg_interfaces)
1698         self.pg_start()
1699         capture = self.pg0.get_capture(len(pkts))
1700         for packet in capture:
1701             try:
1702                 self.assertEqual(packet[IP].src, server1_nat_ip)
1703                 self.assertEqual(packet[IP].dst, server2.ip4)
1704                 if packet.haslayer(TCP):
1705                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1706                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1707                     self.check_tcp_checksum(packet)
1708                 elif packet.haslayer(UDP):
1709                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1710                     self.assertEqual(packet[UDP].sport, server_udp_port)
1711                 else:
1712                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1713             except:
1714                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1715                 raise
1716
1717     def test_max_translations_per_user(self):
1718         """ MAX translations per user - recycle the least recently used """
1719
1720         self.nat44_add_address(self.nat_addr)
1721         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1722         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1723                                                   is_inside=0)
1724
1725         # get maximum number of translations per user
1726         nat44_config = self.vapi.nat_show_config()
1727
1728         # send more than maximum number of translations per user packets
1729         pkts_num = nat44_config.max_translations_per_user + 5
1730         pkts = []
1731         for port in range(0, pkts_num):
1732             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1734                  TCP(sport=1025 + port))
1735             pkts.append(p)
1736         self.pg0.add_stream(pkts)
1737         self.pg_enable_capture(self.pg_interfaces)
1738         self.pg_start()
1739
1740         # verify number of translated packet
1741         self.pg1.get_capture(pkts_num)
1742
1743     def test_interface_addr(self):
1744         """ Acquire NAT44 addresses from interface """
1745         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1746
1747         # no address in NAT pool
1748         adresses = self.vapi.nat44_address_dump()
1749         self.assertEqual(0, len(adresses))
1750
1751         # configure interface address and check NAT address pool
1752         self.pg7.config_ip4()
1753         adresses = self.vapi.nat44_address_dump()
1754         self.assertEqual(1, len(adresses))
1755         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1756
1757         # remove interface address and check NAT address pool
1758         self.pg7.unconfig_ip4()
1759         adresses = self.vapi.nat44_address_dump()
1760         self.assertEqual(0, len(adresses))
1761
1762     def test_interface_addr_static_mapping(self):
1763         """ Static mapping with addresses from interface """
1764         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1765         self.nat44_add_static_mapping(
1766             '1.2.3.4',
1767             external_sw_if_index=self.pg7.sw_if_index)
1768
1769         # static mappings with external interface
1770         static_mappings = self.vapi.nat44_static_mapping_dump()
1771         self.assertEqual(1, len(static_mappings))
1772         self.assertEqual(self.pg7.sw_if_index,
1773                          static_mappings[0].external_sw_if_index)
1774
1775         # configure interface address and check static mappings
1776         self.pg7.config_ip4()
1777         static_mappings = self.vapi.nat44_static_mapping_dump()
1778         self.assertEqual(1, len(static_mappings))
1779         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1780                          self.pg7.local_ip4n)
1781         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1782
1783         # remove interface address and check static mappings
1784         self.pg7.unconfig_ip4()
1785         static_mappings = self.vapi.nat44_static_mapping_dump()
1786         self.assertEqual(0, len(static_mappings))
1787
1788     def test_ipfix_nat44_sess(self):
1789         """ IPFIX logging NAT44 session created/delted """
1790         self.ipfix_domain_id = 10
1791         self.ipfix_src_port = 20202
1792         colector_port = 30303
1793         bind_layers(UDP, IPFIX, dport=30303)
1794         self.nat44_add_address(self.nat_addr)
1795         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1796         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1797                                                   is_inside=0)
1798         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1799                                      src_address=self.pg3.local_ip4n,
1800                                      path_mtu=512,
1801                                      template_interval=10,
1802                                      collector_port=colector_port)
1803         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1804                             src_port=self.ipfix_src_port)
1805
1806         pkts = self.create_stream_in(self.pg0, self.pg1)
1807         self.pg0.add_stream(pkts)
1808         self.pg_enable_capture(self.pg_interfaces)
1809         self.pg_start()
1810         capture = self.pg1.get_capture(len(pkts))
1811         self.verify_capture_out(capture)
1812         self.nat44_add_address(self.nat_addr, is_add=0)
1813         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1814         capture = self.pg3.get_capture(3)
1815         ipfix = IPFIXDecoder()
1816         # first load template
1817         for p in capture:
1818             self.assertTrue(p.haslayer(IPFIX))
1819             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1820             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1821             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1822             self.assertEqual(p[UDP].dport, colector_port)
1823             self.assertEqual(p[IPFIX].observationDomainID,
1824                              self.ipfix_domain_id)
1825             if p.haslayer(Template):
1826                 ipfix.add_template(p.getlayer(Template))
1827         # verify events in data set
1828         for p in capture:
1829             if p.haslayer(Data):
1830                 data = ipfix.decode_data_set(p.getlayer(Set))
1831                 self.verify_ipfix_nat44_ses(data)
1832
1833     def test_ipfix_addr_exhausted(self):
1834         """ IPFIX logging NAT addresses exhausted """
1835         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1836         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1837                                                   is_inside=0)
1838         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1839                                      src_address=self.pg3.local_ip4n,
1840                                      path_mtu=512,
1841                                      template_interval=10)
1842         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1843                             src_port=self.ipfix_src_port)
1844
1845         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1846              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1847              TCP(sport=3025))
1848         self.pg0.add_stream(p)
1849         self.pg_enable_capture(self.pg_interfaces)
1850         self.pg_start()
1851         capture = self.pg1.get_capture(0)
1852         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1853         capture = self.pg3.get_capture(3)
1854         ipfix = IPFIXDecoder()
1855         # first load template
1856         for p in capture:
1857             self.assertTrue(p.haslayer(IPFIX))
1858             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1859             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1860             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1861             self.assertEqual(p[UDP].dport, 4739)
1862             self.assertEqual(p[IPFIX].observationDomainID,
1863                              self.ipfix_domain_id)
1864             if p.haslayer(Template):
1865                 ipfix.add_template(p.getlayer(Template))
1866         # verify events in data set
1867         for p in capture:
1868             if p.haslayer(Data):
1869                 data = ipfix.decode_data_set(p.getlayer(Set))
1870                 self.verify_ipfix_addr_exhausted(data)
1871
1872     def test_pool_addr_fib(self):
1873         """ NAT44 add pool addresses to FIB """
1874         static_addr = '10.0.0.10'
1875         self.nat44_add_address(self.nat_addr)
1876         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1878                                                   is_inside=0)
1879         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1880
1881         # NAT44 address
1882         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1883              ARP(op=ARP.who_has, pdst=self.nat_addr,
1884                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1885         self.pg1.add_stream(p)
1886         self.pg_enable_capture(self.pg_interfaces)
1887         self.pg_start()
1888         capture = self.pg1.get_capture(1)
1889         self.assertTrue(capture[0].haslayer(ARP))
1890         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1891
1892         # 1:1 NAT address
1893         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1894              ARP(op=ARP.who_has, pdst=static_addr,
1895                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1896         self.pg1.add_stream(p)
1897         self.pg_enable_capture(self.pg_interfaces)
1898         self.pg_start()
1899         capture = self.pg1.get_capture(1)
1900         self.assertTrue(capture[0].haslayer(ARP))
1901         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1902
1903         # send ARP to non-NAT44 interface
1904         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1905              ARP(op=ARP.who_has, pdst=self.nat_addr,
1906                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1907         self.pg2.add_stream(p)
1908         self.pg_enable_capture(self.pg_interfaces)
1909         self.pg_start()
1910         capture = self.pg1.get_capture(0)
1911
1912         # remove addresses and verify
1913         self.nat44_add_address(self.nat_addr, is_add=0)
1914         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1915                                       is_add=0)
1916
1917         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1918              ARP(op=ARP.who_has, pdst=self.nat_addr,
1919                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1920         self.pg1.add_stream(p)
1921         self.pg_enable_capture(self.pg_interfaces)
1922         self.pg_start()
1923         capture = self.pg1.get_capture(0)
1924
1925         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1926              ARP(op=ARP.who_has, pdst=static_addr,
1927                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1928         self.pg1.add_stream(p)
1929         self.pg_enable_capture(self.pg_interfaces)
1930         self.pg_start()
1931         capture = self.pg1.get_capture(0)
1932
1933     def test_vrf_mode(self):
1934         """ NAT44 tenant VRF aware address pool mode """
1935
1936         vrf_id1 = 1
1937         vrf_id2 = 2
1938         nat_ip1 = "10.0.0.10"
1939         nat_ip2 = "10.0.0.11"
1940
1941         self.pg0.unconfig_ip4()
1942         self.pg1.unconfig_ip4()
1943         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1944         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1945         self.pg0.set_table_ip4(vrf_id1)
1946         self.pg1.set_table_ip4(vrf_id2)
1947         self.pg0.config_ip4()
1948         self.pg1.config_ip4()
1949
1950         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1951         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1952         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1953         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1954         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1955                                                   is_inside=0)
1956
1957         # first VRF
1958         pkts = self.create_stream_in(self.pg0, self.pg2)
1959         self.pg0.add_stream(pkts)
1960         self.pg_enable_capture(self.pg_interfaces)
1961         self.pg_start()
1962         capture = self.pg2.get_capture(len(pkts))
1963         self.verify_capture_out(capture, nat_ip1)
1964
1965         # second VRF
1966         pkts = self.create_stream_in(self.pg1, self.pg2)
1967         self.pg1.add_stream(pkts)
1968         self.pg_enable_capture(self.pg_interfaces)
1969         self.pg_start()
1970         capture = self.pg2.get_capture(len(pkts))
1971         self.verify_capture_out(capture, nat_ip2)
1972
1973         self.pg0.unconfig_ip4()
1974         self.pg1.unconfig_ip4()
1975         self.pg0.set_table_ip4(0)
1976         self.pg1.set_table_ip4(0)
1977         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1978         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1979
1980     def test_vrf_feature_independent(self):
1981         """ NAT44 tenant VRF independent address pool mode """
1982
1983         nat_ip1 = "10.0.0.10"
1984         nat_ip2 = "10.0.0.11"
1985
1986         self.nat44_add_address(nat_ip1)
1987         self.nat44_add_address(nat_ip2)
1988         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1989         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1990         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1991                                                   is_inside=0)
1992
1993         # first VRF
1994         pkts = self.create_stream_in(self.pg0, self.pg2)
1995         self.pg0.add_stream(pkts)
1996         self.pg_enable_capture(self.pg_interfaces)
1997         self.pg_start()
1998         capture = self.pg2.get_capture(len(pkts))
1999         self.verify_capture_out(capture, nat_ip1)
2000
2001         # second VRF
2002         pkts = self.create_stream_in(self.pg1, self.pg2)
2003         self.pg1.add_stream(pkts)
2004         self.pg_enable_capture(self.pg_interfaces)
2005         self.pg_start()
2006         capture = self.pg2.get_capture(len(pkts))
2007         self.verify_capture_out(capture, nat_ip1)
2008
2009     def test_dynamic_ipless_interfaces(self):
2010         """ NAT44 interfaces without configured IP address """
2011
2012         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2013                                       mactobinary(self.pg7.remote_mac),
2014                                       self.pg7.remote_ip4n,
2015                                       is_static=1)
2016         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2017                                       mactobinary(self.pg8.remote_mac),
2018                                       self.pg8.remote_ip4n,
2019                                       is_static=1)
2020
2021         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2022                                    dst_address_length=32,
2023                                    next_hop_address=self.pg7.remote_ip4n,
2024                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2025         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2026                                    dst_address_length=32,
2027                                    next_hop_address=self.pg8.remote_ip4n,
2028                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2029
2030         self.nat44_add_address(self.nat_addr)
2031         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2032         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2033                                                   is_inside=0)
2034
2035         # in2out
2036         pkts = self.create_stream_in(self.pg7, self.pg8)
2037         self.pg7.add_stream(pkts)
2038         self.pg_enable_capture(self.pg_interfaces)
2039         self.pg_start()
2040         capture = self.pg8.get_capture(len(pkts))
2041         self.verify_capture_out(capture)
2042
2043         # out2in
2044         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2045         self.pg8.add_stream(pkts)
2046         self.pg_enable_capture(self.pg_interfaces)
2047         self.pg_start()
2048         capture = self.pg7.get_capture(len(pkts))
2049         self.verify_capture_in(capture, self.pg7)
2050
2051     def test_static_ipless_interfaces(self):
2052         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2053
2054         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2055                                       mactobinary(self.pg7.remote_mac),
2056                                       self.pg7.remote_ip4n,
2057                                       is_static=1)
2058         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2059                                       mactobinary(self.pg8.remote_mac),
2060                                       self.pg8.remote_ip4n,
2061                                       is_static=1)
2062
2063         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2064                                    dst_address_length=32,
2065                                    next_hop_address=self.pg7.remote_ip4n,
2066                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2067         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2068                                    dst_address_length=32,
2069                                    next_hop_address=self.pg8.remote_ip4n,
2070                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2071
2072         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2073         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2074         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2075                                                   is_inside=0)
2076
2077         # out2in
2078         pkts = self.create_stream_out(self.pg8)
2079         self.pg8.add_stream(pkts)
2080         self.pg_enable_capture(self.pg_interfaces)
2081         self.pg_start()
2082         capture = self.pg7.get_capture(len(pkts))
2083         self.verify_capture_in(capture, self.pg7)
2084
2085         # in2out
2086         pkts = self.create_stream_in(self.pg7, self.pg8)
2087         self.pg7.add_stream(pkts)
2088         self.pg_enable_capture(self.pg_interfaces)
2089         self.pg_start()
2090         capture = self.pg8.get_capture(len(pkts))
2091         self.verify_capture_out(capture, self.nat_addr, True)
2092
2093     def test_static_with_port_ipless_interfaces(self):
2094         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2095
2096         self.tcp_port_out = 30606
2097         self.udp_port_out = 30607
2098         self.icmp_id_out = 30608
2099
2100         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2101                                       mactobinary(self.pg7.remote_mac),
2102                                       self.pg7.remote_ip4n,
2103                                       is_static=1)
2104         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2105                                       mactobinary(self.pg8.remote_mac),
2106                                       self.pg8.remote_ip4n,
2107                                       is_static=1)
2108
2109         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2110                                    dst_address_length=32,
2111                                    next_hop_address=self.pg7.remote_ip4n,
2112                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2113         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2114                                    dst_address_length=32,
2115                                    next_hop_address=self.pg8.remote_ip4n,
2116                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2117
2118         self.nat44_add_address(self.nat_addr)
2119         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2120                                       self.tcp_port_in, self.tcp_port_out,
2121                                       proto=IP_PROTOS.tcp)
2122         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2123                                       self.udp_port_in, self.udp_port_out,
2124                                       proto=IP_PROTOS.udp)
2125         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2126                                       self.icmp_id_in, self.icmp_id_out,
2127                                       proto=IP_PROTOS.icmp)
2128         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2129         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2130                                                   is_inside=0)
2131
2132         # out2in
2133         pkts = self.create_stream_out(self.pg8)
2134         self.pg8.add_stream(pkts)
2135         self.pg_enable_capture(self.pg_interfaces)
2136         self.pg_start()
2137         capture = self.pg7.get_capture(len(pkts))
2138         self.verify_capture_in(capture, self.pg7)
2139
2140         # in2out
2141         pkts = self.create_stream_in(self.pg7, self.pg8)
2142         self.pg7.add_stream(pkts)
2143         self.pg_enable_capture(self.pg_interfaces)
2144         self.pg_start()
2145         capture = self.pg8.get_capture(len(pkts))
2146         self.verify_capture_out(capture)
2147
2148     def test_static_unknown_proto(self):
2149         """ 1:1 NAT translate packet with unknown protocol """
2150         nat_ip = "10.0.0.10"
2151         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2152         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2153         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2154                                                   is_inside=0)
2155
2156         # in2out
2157         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2158              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2159              GRE() /
2160              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2161              TCP(sport=1234, dport=1234))
2162         self.pg0.add_stream(p)
2163         self.pg_enable_capture(self.pg_interfaces)
2164         self.pg_start()
2165         p = self.pg1.get_capture(1)
2166         packet = p[0]
2167         try:
2168             self.assertEqual(packet[IP].src, nat_ip)
2169             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2170             self.assertTrue(packet.haslayer(GRE))
2171             self.check_ip_checksum(packet)
2172         except:
2173             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2174             raise
2175
2176         # out2in
2177         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2178              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2179              GRE() /
2180              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2181              TCP(sport=1234, dport=1234))
2182         self.pg1.add_stream(p)
2183         self.pg_enable_capture(self.pg_interfaces)
2184         self.pg_start()
2185         p = self.pg0.get_capture(1)
2186         packet = p[0]
2187         try:
2188             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2189             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2190             self.assertTrue(packet.haslayer(GRE))
2191             self.check_ip_checksum(packet)
2192         except:
2193             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2194             raise
2195
2196     def test_hairpinning_static_unknown_proto(self):
2197         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2198
2199         host = self.pg0.remote_hosts[0]
2200         server = self.pg0.remote_hosts[1]
2201
2202         host_nat_ip = "10.0.0.10"
2203         server_nat_ip = "10.0.0.11"
2204
2205         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2206         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2207         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2208         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2209                                                   is_inside=0)
2210
2211         # host to server
2212         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2213              IP(src=host.ip4, dst=server_nat_ip) /
2214              GRE() /
2215              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2216              TCP(sport=1234, dport=1234))
2217         self.pg0.add_stream(p)
2218         self.pg_enable_capture(self.pg_interfaces)
2219         self.pg_start()
2220         p = self.pg0.get_capture(1)
2221         packet = p[0]
2222         try:
2223             self.assertEqual(packet[IP].src, host_nat_ip)
2224             self.assertEqual(packet[IP].dst, server.ip4)
2225             self.assertTrue(packet.haslayer(GRE))
2226             self.check_ip_checksum(packet)
2227         except:
2228             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2229             raise
2230
2231         # server to host
2232         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2233              IP(src=server.ip4, dst=host_nat_ip) /
2234              GRE() /
2235              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2236              TCP(sport=1234, dport=1234))
2237         self.pg0.add_stream(p)
2238         self.pg_enable_capture(self.pg_interfaces)
2239         self.pg_start()
2240         p = self.pg0.get_capture(1)
2241         packet = p[0]
2242         try:
2243             self.assertEqual(packet[IP].src, server_nat_ip)
2244             self.assertEqual(packet[IP].dst, host.ip4)
2245             self.assertTrue(packet.haslayer(GRE))
2246             self.check_ip_checksum(packet)
2247         except:
2248             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2249             raise
2250
2251     def test_unknown_proto(self):
2252         """ NAT44 translate packet with unknown protocol """
2253         self.nat44_add_address(self.nat_addr)
2254         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2255         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2256                                                   is_inside=0)
2257
2258         # in2out
2259         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2260              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2261              TCP(sport=self.tcp_port_in, dport=20))
2262         self.pg0.add_stream(p)
2263         self.pg_enable_capture(self.pg_interfaces)
2264         self.pg_start()
2265         p = self.pg1.get_capture(1)
2266
2267         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2268              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2269              GRE() /
2270              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2271              TCP(sport=1234, dport=1234))
2272         self.pg0.add_stream(p)
2273         self.pg_enable_capture(self.pg_interfaces)
2274         self.pg_start()
2275         p = self.pg1.get_capture(1)
2276         packet = p[0]
2277         try:
2278             self.assertEqual(packet[IP].src, self.nat_addr)
2279             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2280             self.assertTrue(packet.haslayer(GRE))
2281             self.check_ip_checksum(packet)
2282         except:
2283             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2284             raise
2285
2286         # out2in
2287         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2288              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2289              GRE() /
2290              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2291              TCP(sport=1234, dport=1234))
2292         self.pg1.add_stream(p)
2293         self.pg_enable_capture(self.pg_interfaces)
2294         self.pg_start()
2295         p = self.pg0.get_capture(1)
2296         packet = p[0]
2297         try:
2298             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2299             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2300             self.assertTrue(packet.haslayer(GRE))
2301             self.check_ip_checksum(packet)
2302         except:
2303             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2304             raise
2305
2306     def test_hairpinning_unknown_proto(self):
2307         """ NAT44 translate packet with unknown protocol - hairpinning """
2308         host = self.pg0.remote_hosts[0]
2309         server = self.pg0.remote_hosts[1]
2310         host_in_port = 1234
2311         host_out_port = 0
2312         server_in_port = 5678
2313         server_out_port = 8765
2314         server_nat_ip = "10.0.0.11"
2315
2316         self.nat44_add_address(self.nat_addr)
2317         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2318         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2319                                                   is_inside=0)
2320
2321         # add static mapping for server
2322         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2323
2324         # host to server
2325         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2326              IP(src=host.ip4, dst=server_nat_ip) /
2327              TCP(sport=host_in_port, dport=server_out_port))
2328         self.pg0.add_stream(p)
2329         self.pg_enable_capture(self.pg_interfaces)
2330         self.pg_start()
2331         capture = self.pg0.get_capture(1)
2332
2333         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2334              IP(src=host.ip4, dst=server_nat_ip) /
2335              GRE() /
2336              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2337              TCP(sport=1234, dport=1234))
2338         self.pg0.add_stream(p)
2339         self.pg_enable_capture(self.pg_interfaces)
2340         self.pg_start()
2341         p = self.pg0.get_capture(1)
2342         packet = p[0]
2343         try:
2344             self.assertEqual(packet[IP].src, self.nat_addr)
2345             self.assertEqual(packet[IP].dst, server.ip4)
2346             self.assertTrue(packet.haslayer(GRE))
2347             self.check_ip_checksum(packet)
2348         except:
2349             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2350             raise
2351
2352         # server to host
2353         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2354              IP(src=server.ip4, dst=self.nat_addr) /
2355              GRE() /
2356              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2357              TCP(sport=1234, dport=1234))
2358         self.pg0.add_stream(p)
2359         self.pg_enable_capture(self.pg_interfaces)
2360         self.pg_start()
2361         p = self.pg0.get_capture(1)
2362         packet = p[0]
2363         try:
2364             self.assertEqual(packet[IP].src, server_nat_ip)
2365             self.assertEqual(packet[IP].dst, host.ip4)
2366             self.assertTrue(packet.haslayer(GRE))
2367             self.check_ip_checksum(packet)
2368         except:
2369             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2370             raise
2371
2372     def test_output_feature(self):
2373         """ NAT44 interface output feature (in2out postrouting) """
2374         self.nat44_add_address(self.nat_addr)
2375         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2376         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2377         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2378                                                          is_inside=0)
2379
2380         # in2out
2381         pkts = self.create_stream_in(self.pg0, self.pg3)
2382         self.pg0.add_stream(pkts)
2383         self.pg_enable_capture(self.pg_interfaces)
2384         self.pg_start()
2385         capture = self.pg3.get_capture(len(pkts))
2386         self.verify_capture_out(capture)
2387
2388         # out2in
2389         pkts = self.create_stream_out(self.pg3)
2390         self.pg3.add_stream(pkts)
2391         self.pg_enable_capture(self.pg_interfaces)
2392         self.pg_start()
2393         capture = self.pg0.get_capture(len(pkts))
2394         self.verify_capture_in(capture, self.pg0)
2395
2396         # from non-NAT interface to NAT inside interface
2397         pkts = self.create_stream_in(self.pg2, self.pg0)
2398         self.pg2.add_stream(pkts)
2399         self.pg_enable_capture(self.pg_interfaces)
2400         self.pg_start()
2401         capture = self.pg0.get_capture(len(pkts))
2402         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2403
2404     def test_output_feature_vrf_aware(self):
2405         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2406         nat_ip_vrf10 = "10.0.0.10"
2407         nat_ip_vrf20 = "10.0.0.20"
2408
2409         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2410                                    dst_address_length=32,
2411                                    next_hop_address=self.pg3.remote_ip4n,
2412                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2413                                    table_id=10)
2414         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2415                                    dst_address_length=32,
2416                                    next_hop_address=self.pg3.remote_ip4n,
2417                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2418                                    table_id=20)
2419
2420         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2421         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2422         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2423         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2424         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2425                                                          is_inside=0)
2426
2427         # in2out VRF 10
2428         pkts = self.create_stream_in(self.pg4, self.pg3)
2429         self.pg4.add_stream(pkts)
2430         self.pg_enable_capture(self.pg_interfaces)
2431         self.pg_start()
2432         capture = self.pg3.get_capture(len(pkts))
2433         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2434
2435         # out2in VRF 10
2436         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2437         self.pg3.add_stream(pkts)
2438         self.pg_enable_capture(self.pg_interfaces)
2439         self.pg_start()
2440         capture = self.pg4.get_capture(len(pkts))
2441         self.verify_capture_in(capture, self.pg4)
2442
2443         # in2out VRF 20
2444         pkts = self.create_stream_in(self.pg6, self.pg3)
2445         self.pg6.add_stream(pkts)
2446         self.pg_enable_capture(self.pg_interfaces)
2447         self.pg_start()
2448         capture = self.pg3.get_capture(len(pkts))
2449         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2450
2451         # out2in VRF 20
2452         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2453         self.pg3.add_stream(pkts)
2454         self.pg_enable_capture(self.pg_interfaces)
2455         self.pg_start()
2456         capture = self.pg6.get_capture(len(pkts))
2457         self.verify_capture_in(capture, self.pg6)
2458
2459     def test_output_feature_hairpinning(self):
2460         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2461         host = self.pg0.remote_hosts[0]
2462         server = self.pg0.remote_hosts[1]
2463         host_in_port = 1234
2464         host_out_port = 0
2465         server_in_port = 5678
2466         server_out_port = 8765
2467
2468         self.nat44_add_address(self.nat_addr)
2469         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2470         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2471                                                          is_inside=0)
2472
2473         # add static mapping for server
2474         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2475                                       server_in_port, server_out_port,
2476                                       proto=IP_PROTOS.tcp)
2477
2478         # send packet from host to server
2479         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2480              IP(src=host.ip4, dst=self.nat_addr) /
2481              TCP(sport=host_in_port, dport=server_out_port))
2482         self.pg0.add_stream(p)
2483         self.pg_enable_capture(self.pg_interfaces)
2484         self.pg_start()
2485         capture = self.pg0.get_capture(1)
2486         p = capture[0]
2487         try:
2488             ip = p[IP]
2489             tcp = p[TCP]
2490             self.assertEqual(ip.src, self.nat_addr)
2491             self.assertEqual(ip.dst, server.ip4)
2492             self.assertNotEqual(tcp.sport, host_in_port)
2493             self.assertEqual(tcp.dport, server_in_port)
2494             self.check_tcp_checksum(p)
2495             host_out_port = tcp.sport
2496         except:
2497             self.logger.error(ppp("Unexpected or invalid packet:", p))
2498             raise
2499
2500         # send reply from server to host
2501         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2502              IP(src=server.ip4, dst=self.nat_addr) /
2503              TCP(sport=server_in_port, dport=host_out_port))
2504         self.pg0.add_stream(p)
2505         self.pg_enable_capture(self.pg_interfaces)
2506         self.pg_start()
2507         capture = self.pg0.get_capture(1)
2508         p = capture[0]
2509         try:
2510             ip = p[IP]
2511             tcp = p[TCP]
2512             self.assertEqual(ip.src, self.nat_addr)
2513             self.assertEqual(ip.dst, host.ip4)
2514             self.assertEqual(tcp.sport, server_out_port)
2515             self.assertEqual(tcp.dport, host_in_port)
2516             self.check_tcp_checksum(p)
2517         except:
2518             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2519             raise
2520
2521     def test_one_armed_nat44(self):
2522         """ One armed NAT44 """
2523         remote_host = self.pg9.remote_hosts[0]
2524         local_host = self.pg9.remote_hosts[1]
2525         external_port = 0
2526
2527         self.nat44_add_address(self.nat_addr)
2528         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2529         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2530                                                   is_inside=0)
2531
2532         # in2out
2533         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2534              IP(src=local_host.ip4, dst=remote_host.ip4) /
2535              TCP(sport=12345, dport=80))
2536         self.pg9.add_stream(p)
2537         self.pg_enable_capture(self.pg_interfaces)
2538         self.pg_start()
2539         capture = self.pg9.get_capture(1)
2540         p = capture[0]
2541         try:
2542             ip = p[IP]
2543             tcp = p[TCP]
2544             self.assertEqual(ip.src, self.nat_addr)
2545             self.assertEqual(ip.dst, remote_host.ip4)
2546             self.assertNotEqual(tcp.sport, 12345)
2547             external_port = tcp.sport
2548             self.assertEqual(tcp.dport, 80)
2549             self.check_tcp_checksum(p)
2550             self.check_ip_checksum(p)
2551         except:
2552             self.logger.error(ppp("Unexpected or invalid packet:", p))
2553             raise
2554
2555         # out2in
2556         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2557              IP(src=remote_host.ip4, dst=self.nat_addr) /
2558              TCP(sport=80, dport=external_port))
2559         self.pg9.add_stream(p)
2560         self.pg_enable_capture(self.pg_interfaces)
2561         self.pg_start()
2562         capture = self.pg9.get_capture(1)
2563         p = capture[0]
2564         try:
2565             ip = p[IP]
2566             tcp = p[TCP]
2567             self.assertEqual(ip.src, remote_host.ip4)
2568             self.assertEqual(ip.dst, local_host.ip4)
2569             self.assertEqual(tcp.sport, 80)
2570             self.assertEqual(tcp.dport, 12345)
2571             self.check_tcp_checksum(p)
2572             self.check_ip_checksum(p)
2573         except:
2574             self.logger.error(ppp("Unexpected or invalid packet:", p))
2575             raise
2576
2577     def test_del_session(self):
2578         """ Delete NAT44 session """
2579         self.nat44_add_address(self.nat_addr)
2580         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2581         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2582                                                   is_inside=0)
2583
2584         pkts = self.create_stream_in(self.pg0, self.pg1)
2585         self.pg0.add_stream(pkts)
2586         self.pg_enable_capture(self.pg_interfaces)
2587         self.pg_start()
2588         capture = self.pg1.get_capture(len(pkts))
2589
2590         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2591         nsessions = len(sessions)
2592
2593         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2594                                     sessions[0].inside_port,
2595                                     sessions[0].protocol)
2596         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2597                                     sessions[1].outside_port,
2598                                     sessions[1].protocol,
2599                                     is_in=0)
2600
2601         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2602         self.assertEqual(nsessions - len(sessions), 2)
2603
2604     def test_set_get_reass(self):
2605         """ NAT44 set/get virtual fragmentation reassembly """
2606         reas_cfg1 = self.vapi.nat_get_reass()
2607
2608         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2609                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2610                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2611
2612         reas_cfg2 = self.vapi.nat_get_reass()
2613
2614         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2615         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2616         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2617
2618         self.vapi.nat_set_reass(drop_frag=1)
2619         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2620
2621     def test_frag_in_order(self):
2622         """ NAT44 translate fragments arriving in order """
2623         self.nat44_add_address(self.nat_addr)
2624         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2625         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2626                                                   is_inside=0)
2627
2628         data = "A" * 4 + "B" * 16 + "C" * 3
2629         self.tcp_port_in = random.randint(1025, 65535)
2630
2631         reass = self.vapi.nat_reass_dump()
2632         reass_n_start = len(reass)
2633
2634         # in2out
2635         pkts = self.create_stream_frag(self.pg0,
2636                                        self.pg1.remote_ip4,
2637                                        self.tcp_port_in,
2638                                        20,
2639                                        data)
2640         self.pg0.add_stream(pkts)
2641         self.pg_enable_capture(self.pg_interfaces)
2642         self.pg_start()
2643         frags = self.pg1.get_capture(len(pkts))
2644         p = self.reass_frags_and_verify(frags,
2645                                         self.nat_addr,
2646                                         self.pg1.remote_ip4)
2647         self.assertEqual(p[TCP].dport, 20)
2648         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2649         self.tcp_port_out = p[TCP].sport
2650         self.assertEqual(data, p[Raw].load)
2651
2652         # out2in
2653         pkts = self.create_stream_frag(self.pg1,
2654                                        self.nat_addr,
2655                                        20,
2656                                        self.tcp_port_out,
2657                                        data)
2658         self.pg1.add_stream(pkts)
2659         self.pg_enable_capture(self.pg_interfaces)
2660         self.pg_start()
2661         frags = self.pg0.get_capture(len(pkts))
2662         p = self.reass_frags_and_verify(frags,
2663                                         self.pg1.remote_ip4,
2664                                         self.pg0.remote_ip4)
2665         self.assertEqual(p[TCP].sport, 20)
2666         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2667         self.assertEqual(data, p[Raw].load)
2668
2669         reass = self.vapi.nat_reass_dump()
2670         reass_n_end = len(reass)
2671
2672         self.assertEqual(reass_n_end - reass_n_start, 2)
2673
2674     def test_reass_hairpinning(self):
2675         """ NAT44 fragments hairpinning """
2676         host = self.pg0.remote_hosts[0]
2677         server = self.pg0.remote_hosts[1]
2678         host_in_port = random.randint(1025, 65535)
2679         host_out_port = 0
2680         server_in_port = random.randint(1025, 65535)
2681         server_out_port = random.randint(1025, 65535)
2682         data = "A" * 4 + "B" * 16 + "C" * 3
2683
2684         self.nat44_add_address(self.nat_addr)
2685         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2686         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2687                                                   is_inside=0)
2688         # add static mapping for server
2689         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2690                                       server_in_port, server_out_port,
2691                                       proto=IP_PROTOS.tcp)
2692
2693         # send packet from host to server
2694         pkts = self.create_stream_frag(self.pg0,
2695                                        self.nat_addr,
2696                                        host_in_port,
2697                                        server_out_port,
2698                                        data)
2699         self.pg0.add_stream(pkts)
2700         self.pg_enable_capture(self.pg_interfaces)
2701         self.pg_start()
2702         frags = self.pg0.get_capture(len(pkts))
2703         p = self.reass_frags_and_verify(frags,
2704                                         self.nat_addr,
2705                                         server.ip4)
2706         self.assertNotEqual(p[TCP].sport, host_in_port)
2707         self.assertEqual(p[TCP].dport, server_in_port)
2708         self.assertEqual(data, p[Raw].load)
2709
2710     def test_frag_out_of_order(self):
2711         """ NAT44 translate fragments arriving out of order """
2712         self.nat44_add_address(self.nat_addr)
2713         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2714         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2715                                                   is_inside=0)
2716
2717         data = "A" * 4 + "B" * 16 + "C" * 3
2718         random.randint(1025, 65535)
2719
2720         # in2out
2721         pkts = self.create_stream_frag(self.pg0,
2722                                        self.pg1.remote_ip4,
2723                                        self.tcp_port_in,
2724                                        20,
2725                                        data)
2726         pkts.reverse()
2727         self.pg0.add_stream(pkts)
2728         self.pg_enable_capture(self.pg_interfaces)
2729         self.pg_start()
2730         frags = self.pg1.get_capture(len(pkts))
2731         p = self.reass_frags_and_verify(frags,
2732                                         self.nat_addr,
2733                                         self.pg1.remote_ip4)
2734         self.assertEqual(p[TCP].dport, 20)
2735         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2736         self.tcp_port_out = p[TCP].sport
2737         self.assertEqual(data, p[Raw].load)
2738
2739         # out2in
2740         pkts = self.create_stream_frag(self.pg1,
2741                                        self.nat_addr,
2742                                        20,
2743                                        self.tcp_port_out,
2744                                        data)
2745         pkts.reverse()
2746         self.pg1.add_stream(pkts)
2747         self.pg_enable_capture(self.pg_interfaces)
2748         self.pg_start()
2749         frags = self.pg0.get_capture(len(pkts))
2750         p = self.reass_frags_and_verify(frags,
2751                                         self.pg1.remote_ip4,
2752                                         self.pg0.remote_ip4)
2753         self.assertEqual(p[TCP].sport, 20)
2754         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2755         self.assertEqual(data, p[Raw].load)
2756
2757     def test_port_restricted(self):
2758         """ Port restricted NAT44 (MAP-E CE) """
2759         self.nat44_add_address(self.nat_addr)
2760         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2761         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2762                                                   is_inside=0)
2763         self.vapi.cli("nat44 addr-port-assignment-alg map-e psid 10 "
2764                       "psid-offset 6 psid-len 6")
2765
2766         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2767              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2768              TCP(sport=4567, dport=22))
2769         self.pg0.add_stream(p)
2770         self.pg_enable_capture(self.pg_interfaces)
2771         self.pg_start()
2772         capture = self.pg1.get_capture(1)
2773         p = capture[0]
2774         try:
2775             ip = p[IP]
2776             tcp = p[TCP]
2777             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2778             self.assertEqual(ip.src, self.nat_addr)
2779             self.assertEqual(tcp.dport, 22)
2780             self.assertNotEqual(tcp.sport, 4567)
2781             self.assertEqual((tcp.sport >> 6) & 63, 10)
2782             self.check_tcp_checksum(p)
2783             self.check_ip_checksum(p)
2784         except:
2785             self.logger.error(ppp("Unexpected or invalid packet:", p))
2786             raise
2787
2788     def tearDown(self):
2789         super(TestNAT44, self).tearDown()
2790         if not self.vpp_dead:
2791             self.logger.info(self.vapi.cli("show nat44 verbose"))
2792             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2793             self.vapi.cli("nat44 addr-port-assignment-alg default")
2794             self.clear_nat44()
2795
2796
2797 class TestDeterministicNAT(MethodHolder):
2798     """ Deterministic NAT Test Cases """
2799
2800     @classmethod
2801     def setUpConstants(cls):
2802         super(TestDeterministicNAT, cls).setUpConstants()
2803         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2804
2805     @classmethod
2806     def setUpClass(cls):
2807         super(TestDeterministicNAT, cls).setUpClass()
2808
2809         try:
2810             cls.tcp_port_in = 6303
2811             cls.tcp_external_port = 6303
2812             cls.udp_port_in = 6304
2813             cls.udp_external_port = 6304
2814             cls.icmp_id_in = 6305
2815             cls.nat_addr = '10.0.0.3'
2816
2817             cls.create_pg_interfaces(range(3))
2818             cls.interfaces = list(cls.pg_interfaces)
2819
2820             for i in cls.interfaces:
2821                 i.admin_up()
2822                 i.config_ip4()
2823                 i.resolve_arp()
2824
2825             cls.pg0.generate_remote_hosts(2)
2826             cls.pg0.configure_ipv4_neighbors()
2827
2828         except Exception:
2829             super(TestDeterministicNAT, cls).tearDownClass()
2830             raise
2831
2832     def create_stream_in(self, in_if, out_if, ttl=64):
2833         """
2834         Create packet stream for inside network
2835
2836         :param in_if: Inside interface
2837         :param out_if: Outside interface
2838         :param ttl: TTL of generated packets
2839         """
2840         pkts = []
2841         # TCP
2842         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2843              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2844              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2845         pkts.append(p)
2846
2847         # UDP
2848         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2849              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2850              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2851         pkts.append(p)
2852
2853         # ICMP
2854         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2855              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2856              ICMP(id=self.icmp_id_in, type='echo-request'))
2857         pkts.append(p)
2858
2859         return pkts
2860
2861     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2862         """
2863         Create packet stream for outside network
2864
2865         :param out_if: Outside interface
2866         :param dst_ip: Destination IP address (Default use global NAT address)
2867         :param ttl: TTL of generated packets
2868         """
2869         if dst_ip is None:
2870             dst_ip = self.nat_addr
2871         pkts = []
2872         # TCP
2873         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2874              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2875              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2876         pkts.append(p)
2877
2878         # UDP
2879         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2880              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2881              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2882         pkts.append(p)
2883
2884         # ICMP
2885         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2886              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2887              ICMP(id=self.icmp_external_id, type='echo-reply'))
2888         pkts.append(p)
2889
2890         return pkts
2891
2892     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2893         """
2894         Verify captured packets on outside network
2895
2896         :param capture: Captured packets
2897         :param nat_ip: Translated IP address (Default use global NAT address)
2898         :param same_port: Sorce port number is not translated (Default False)
2899         :param packet_num: Expected number of packets (Default 3)
2900         """
2901         if nat_ip is None:
2902             nat_ip = self.nat_addr
2903         self.assertEqual(packet_num, len(capture))
2904         for packet in capture:
2905             try:
2906                 self.assertEqual(packet[IP].src, nat_ip)
2907                 if packet.haslayer(TCP):
2908                     self.tcp_port_out = packet[TCP].sport
2909                 elif packet.haslayer(UDP):
2910                     self.udp_port_out = packet[UDP].sport
2911                 else:
2912                     self.icmp_external_id = packet[ICMP].id
2913             except:
2914                 self.logger.error(ppp("Unexpected or invalid packet "
2915                                       "(outside network):", packet))
2916                 raise
2917
2918     def initiate_tcp_session(self, in_if, out_if):
2919         """
2920         Initiates TCP session
2921
2922         :param in_if: Inside interface
2923         :param out_if: Outside interface
2924         """
2925         try:
2926             # SYN packet in->out
2927             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2928                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2929                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2930                      flags="S"))
2931             in_if.add_stream(p)
2932             self.pg_enable_capture(self.pg_interfaces)
2933             self.pg_start()
2934             capture = out_if.get_capture(1)
2935             p = capture[0]
2936             self.tcp_port_out = p[TCP].sport
2937
2938             # SYN + ACK packet out->in
2939             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2940                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2941                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2942                      flags="SA"))
2943             out_if.add_stream(p)
2944             self.pg_enable_capture(self.pg_interfaces)
2945             self.pg_start()
2946             in_if.get_capture(1)
2947
2948             # ACK packet in->out
2949             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2950                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2951                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2952                      flags="A"))
2953             in_if.add_stream(p)
2954             self.pg_enable_capture(self.pg_interfaces)
2955             self.pg_start()
2956             out_if.get_capture(1)
2957
2958         except:
2959             self.logger.error("TCP 3 way handshake failed")
2960             raise
2961
2962     def verify_ipfix_max_entries_per_user(self, data):
2963         """
2964         Verify IPFIX maximum entries per user exceeded event
2965
2966         :param data: Decoded IPFIX data records
2967         """
2968         self.assertEqual(1, len(data))
2969         record = data[0]
2970         # natEvent
2971         self.assertEqual(ord(record[230]), 13)
2972         # natQuotaExceededEvent
2973         self.assertEqual('\x03\x00\x00\x00', record[466])
2974         # sourceIPv4Address
2975         self.assertEqual(self.pg0.remote_ip4n, record[8])
2976
2977     def test_deterministic_mode(self):
2978         """ NAT plugin run deterministic mode """
2979         in_addr = '172.16.255.0'
2980         out_addr = '172.17.255.50'
2981         in_addr_t = '172.16.255.20'
2982         in_addr_n = socket.inet_aton(in_addr)
2983         out_addr_n = socket.inet_aton(out_addr)
2984         in_addr_t_n = socket.inet_aton(in_addr_t)
2985         in_plen = 24
2986         out_plen = 32
2987
2988         nat_config = self.vapi.nat_show_config()
2989         self.assertEqual(1, nat_config.deterministic)
2990
2991         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2992
2993         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2994         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2995         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2996         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2997
2998         deterministic_mappings = self.vapi.nat_det_map_dump()
2999         self.assertEqual(len(deterministic_mappings), 1)
3000         dsm = deterministic_mappings[0]
3001         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3002         self.assertEqual(in_plen, dsm.in_plen)
3003         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3004         self.assertEqual(out_plen, dsm.out_plen)
3005
3006         self.clear_nat_det()
3007         deterministic_mappings = self.vapi.nat_det_map_dump()
3008         self.assertEqual(len(deterministic_mappings), 0)
3009
3010     def test_set_timeouts(self):
3011         """ Set deterministic NAT timeouts """
3012         timeouts_before = self.vapi.nat_det_get_timeouts()
3013
3014         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3015                                        timeouts_before.tcp_established + 10,
3016                                        timeouts_before.tcp_transitory + 10,
3017                                        timeouts_before.icmp + 10)
3018
3019         timeouts_after = self.vapi.nat_det_get_timeouts()
3020
3021         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3022         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3023         self.assertNotEqual(timeouts_before.tcp_established,
3024                             timeouts_after.tcp_established)
3025         self.assertNotEqual(timeouts_before.tcp_transitory,
3026                             timeouts_after.tcp_transitory)
3027
3028     def test_det_in(self):
3029         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3030
3031         nat_ip = "10.0.0.10"
3032
3033         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3034                                       32,
3035                                       socket.inet_aton(nat_ip),
3036                                       32)
3037         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3038         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3039                                                   is_inside=0)
3040
3041         # in2out
3042         pkts = self.create_stream_in(self.pg0, self.pg1)
3043         self.pg0.add_stream(pkts)
3044         self.pg_enable_capture(self.pg_interfaces)
3045         self.pg_start()
3046         capture = self.pg1.get_capture(len(pkts))
3047         self.verify_capture_out(capture, nat_ip)
3048
3049         # out2in
3050         pkts = self.create_stream_out(self.pg1, nat_ip)
3051         self.pg1.add_stream(pkts)
3052         self.pg_enable_capture(self.pg_interfaces)
3053         self.pg_start()
3054         capture = self.pg0.get_capture(len(pkts))
3055         self.verify_capture_in(capture, self.pg0)
3056
3057         # session dump test
3058         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3059         self.assertEqual(len(sessions), 3)
3060
3061         # TCP session
3062         s = sessions[0]
3063         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3064         self.assertEqual(s.in_port, self.tcp_port_in)
3065         self.assertEqual(s.out_port, self.tcp_port_out)
3066         self.assertEqual(s.ext_port, self.tcp_external_port)
3067
3068         # UDP session
3069         s = sessions[1]
3070         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3071         self.assertEqual(s.in_port, self.udp_port_in)
3072         self.assertEqual(s.out_port, self.udp_port_out)
3073         self.assertEqual(s.ext_port, self.udp_external_port)
3074
3075         # ICMP session
3076         s = sessions[2]
3077         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3078         self.assertEqual(s.in_port, self.icmp_id_in)
3079         self.assertEqual(s.out_port, self.icmp_external_id)
3080
3081     def test_multiple_users(self):
3082         """ Deterministic NAT multiple users """
3083
3084         nat_ip = "10.0.0.10"
3085         port_in = 80
3086         external_port = 6303
3087
3088         host0 = self.pg0.remote_hosts[0]
3089         host1 = self.pg0.remote_hosts[1]
3090
3091         self.vapi.nat_det_add_del_map(host0.ip4n,
3092                                       24,
3093                                       socket.inet_aton(nat_ip),
3094                                       32)
3095         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3096         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3097                                                   is_inside=0)
3098
3099         # host0 to out
3100         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3101              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3102              TCP(sport=port_in, dport=external_port))
3103         self.pg0.add_stream(p)
3104         self.pg_enable_capture(self.pg_interfaces)
3105         self.pg_start()
3106         capture = self.pg1.get_capture(1)
3107         p = capture[0]
3108         try:
3109             ip = p[IP]
3110             tcp = p[TCP]
3111             self.assertEqual(ip.src, nat_ip)
3112             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3113             self.assertEqual(tcp.dport, external_port)
3114             port_out0 = tcp.sport
3115         except:
3116             self.logger.error(ppp("Unexpected or invalid packet:", p))
3117             raise
3118
3119         # host1 to out
3120         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3121              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3122              TCP(sport=port_in, dport=external_port))
3123         self.pg0.add_stream(p)
3124         self.pg_enable_capture(self.pg_interfaces)
3125         self.pg_start()
3126         capture = self.pg1.get_capture(1)
3127         p = capture[0]
3128         try:
3129             ip = p[IP]
3130             tcp = p[TCP]
3131             self.assertEqual(ip.src, nat_ip)
3132             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3133             self.assertEqual(tcp.dport, external_port)
3134             port_out1 = tcp.sport
3135         except:
3136             self.logger.error(ppp("Unexpected or invalid packet:", p))
3137             raise
3138
3139         dms = self.vapi.nat_det_map_dump()
3140         self.assertEqual(1, len(dms))
3141         self.assertEqual(2, dms[0].ses_num)
3142
3143         # out to host0
3144         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3145              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3146              TCP(sport=external_port, dport=port_out0))
3147         self.pg1.add_stream(p)
3148         self.pg_enable_capture(self.pg_interfaces)
3149         self.pg_start()
3150         capture = self.pg0.get_capture(1)
3151         p = capture[0]
3152         try:
3153             ip = p[IP]
3154             tcp = p[TCP]
3155             self.assertEqual(ip.src, self.pg1.remote_ip4)
3156             self.assertEqual(ip.dst, host0.ip4)
3157             self.assertEqual(tcp.dport, port_in)
3158             self.assertEqual(tcp.sport, external_port)
3159         except:
3160             self.logger.error(ppp("Unexpected or invalid packet:", p))
3161             raise
3162
3163         # out to host1
3164         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3165              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3166              TCP(sport=external_port, dport=port_out1))
3167         self.pg1.add_stream(p)
3168         self.pg_enable_capture(self.pg_interfaces)
3169         self.pg_start()
3170         capture = self.pg0.get_capture(1)
3171         p = capture[0]
3172         try:
3173             ip = p[IP]
3174             tcp = p[TCP]
3175             self.assertEqual(ip.src, self.pg1.remote_ip4)
3176             self.assertEqual(ip.dst, host1.ip4)
3177             self.assertEqual(tcp.dport, port_in)
3178             self.assertEqual(tcp.sport, external_port)
3179         except:
3180             self.logger.error(ppp("Unexpected or invalid packet", p))
3181             raise
3182
3183         # session close api test
3184         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3185                                             port_out1,
3186                                             self.pg1.remote_ip4n,
3187                                             external_port)
3188         dms = self.vapi.nat_det_map_dump()
3189         self.assertEqual(dms[0].ses_num, 1)
3190
3191         self.vapi.nat_det_close_session_in(host0.ip4n,
3192                                            port_in,
3193                                            self.pg1.remote_ip4n,
3194                                            external_port)
3195         dms = self.vapi.nat_det_map_dump()
3196         self.assertEqual(dms[0].ses_num, 0)
3197
3198     def test_tcp_session_close_detection_in(self):
3199         """ Deterministic NAT TCP session close from inside network """
3200         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3201                                       32,
3202                                       socket.inet_aton(self.nat_addr),
3203                                       32)
3204         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3205         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3206                                                   is_inside=0)
3207
3208         self.initiate_tcp_session(self.pg0, self.pg1)
3209
3210         # close the session from inside
3211         try:
3212             # FIN packet in -> out
3213             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3214                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3215                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3216                      flags="F"))
3217             self.pg0.add_stream(p)
3218             self.pg_enable_capture(self.pg_interfaces)
3219             self.pg_start()
3220             self.pg1.get_capture(1)
3221
3222             pkts = []
3223
3224             # ACK packet out -> in
3225             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3226                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3227                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3228                      flags="A"))
3229             pkts.append(p)
3230
3231             # FIN packet out -> in
3232             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3233                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3234                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3235                      flags="F"))
3236             pkts.append(p)
3237
3238             self.pg1.add_stream(pkts)
3239             self.pg_enable_capture(self.pg_interfaces)
3240             self.pg_start()
3241             self.pg0.get_capture(2)
3242
3243             # ACK packet in -> out
3244             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3245                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3246                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3247                      flags="A"))
3248             self.pg0.add_stream(p)
3249             self.pg_enable_capture(self.pg_interfaces)
3250             self.pg_start()
3251             self.pg1.get_capture(1)
3252
3253             # Check if deterministic NAT44 closed the session
3254             dms = self.vapi.nat_det_map_dump()
3255             self.assertEqual(0, dms[0].ses_num)
3256         except:
3257             self.logger.error("TCP session termination failed")
3258             raise
3259
3260     def test_tcp_session_close_detection_out(self):
3261         """ Deterministic NAT TCP session close from outside network """
3262         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3263                                       32,
3264                                       socket.inet_aton(self.nat_addr),
3265                                       32)
3266         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3267         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3268                                                   is_inside=0)
3269
3270         self.initiate_tcp_session(self.pg0, self.pg1)
3271
3272         # close the session from outside
3273         try:
3274             # FIN packet out -> in
3275             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3276                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3277                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3278                      flags="F"))
3279             self.pg1.add_stream(p)
3280             self.pg_enable_capture(self.pg_interfaces)
3281             self.pg_start()
3282             self.pg0.get_capture(1)
3283
3284             pkts = []
3285
3286             # ACK packet in -> out
3287             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3288                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3289                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3290                      flags="A"))
3291             pkts.append(p)
3292
3293             # ACK packet in -> out
3294             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3295                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3296                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3297                      flags="F"))
3298             pkts.append(p)
3299
3300             self.pg0.add_stream(pkts)
3301             self.pg_enable_capture(self.pg_interfaces)
3302             self.pg_start()
3303             self.pg1.get_capture(2)
3304
3305             # ACK packet out -> in
3306             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3307                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3308                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3309                      flags="A"))
3310             self.pg1.add_stream(p)
3311             self.pg_enable_capture(self.pg_interfaces)
3312             self.pg_start()
3313             self.pg0.get_capture(1)
3314
3315             # Check if deterministic NAT44 closed the session
3316             dms = self.vapi.nat_det_map_dump()
3317             self.assertEqual(0, dms[0].ses_num)
3318         except:
3319             self.logger.error("TCP session termination failed")
3320             raise
3321
3322     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3323     def test_session_timeout(self):
3324         """ Deterministic NAT session timeouts """
3325         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3326                                       32,
3327                                       socket.inet_aton(self.nat_addr),
3328                                       32)
3329         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3330         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3331                                                   is_inside=0)
3332
3333         self.initiate_tcp_session(self.pg0, self.pg1)
3334         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3335         pkts = self.create_stream_in(self.pg0, self.pg1)
3336         self.pg0.add_stream(pkts)
3337         self.pg_enable_capture(self.pg_interfaces)
3338         self.pg_start()
3339         capture = self.pg1.get_capture(len(pkts))
3340         sleep(15)
3341
3342         dms = self.vapi.nat_det_map_dump()
3343         self.assertEqual(0, dms[0].ses_num)
3344
3345     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3346     def test_session_limit_per_user(self):
3347         """ Deterministic NAT maximum sessions per user limit """
3348         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3349                                       32,
3350                                       socket.inet_aton(self.nat_addr),
3351                                       32)
3352         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3353         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3354                                                   is_inside=0)
3355         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3356                                      src_address=self.pg2.local_ip4n,
3357                                      path_mtu=512,
3358                                      template_interval=10)
3359         self.vapi.nat_ipfix()
3360
3361         pkts = []
3362         for port in range(1025, 2025):
3363             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3364                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3365                  UDP(sport=port, dport=port))
3366             pkts.append(p)
3367
3368         self.pg0.add_stream(pkts)
3369         self.pg_enable_capture(self.pg_interfaces)
3370         self.pg_start()
3371         capture = self.pg1.get_capture(len(pkts))
3372
3373         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3374              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3375              UDP(sport=3001, dport=3002))
3376         self.pg0.add_stream(p)
3377         self.pg_enable_capture(self.pg_interfaces)
3378         self.pg_start()
3379         capture = self.pg1.assert_nothing_captured()
3380
3381         # verify ICMP error packet
3382         capture = self.pg0.get_capture(1)
3383         p = capture[0]
3384         self.assertTrue(p.haslayer(ICMP))
3385         icmp = p[ICMP]
3386         self.assertEqual(icmp.type, 3)
3387         self.assertEqual(icmp.code, 1)
3388         self.assertTrue(icmp.haslayer(IPerror))
3389         inner_ip = icmp[IPerror]
3390         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3391         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3392
3393         dms = self.vapi.nat_det_map_dump()
3394
3395         self.assertEqual(1000, dms[0].ses_num)
3396
3397         # verify IPFIX logging
3398         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3399         sleep(1)
3400         capture = self.pg2.get_capture(2)
3401         ipfix = IPFIXDecoder()
3402         # first load template
3403         for p in capture:
3404             self.assertTrue(p.haslayer(IPFIX))
3405             if p.haslayer(Template):
3406                 ipfix.add_template(p.getlayer(Template))
3407         # verify events in data set
3408         for p in capture:
3409             if p.haslayer(Data):
3410                 data = ipfix.decode_data_set(p.getlayer(Set))
3411                 self.verify_ipfix_max_entries_per_user(data)
3412
3413     def clear_nat_det(self):
3414         """
3415         Clear deterministic NAT configuration.
3416         """
3417         self.vapi.nat_ipfix(enable=0)
3418         self.vapi.nat_det_set_timeouts()
3419         deterministic_mappings = self.vapi.nat_det_map_dump()
3420         for dsm in deterministic_mappings:
3421             self.vapi.nat_det_add_del_map(dsm.in_addr,
3422                                           dsm.in_plen,
3423                                           dsm.out_addr,
3424                                           dsm.out_plen,
3425                                           is_add=0)
3426
3427         interfaces = self.vapi.nat44_interface_dump()
3428         for intf in interfaces:
3429             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3430                                                       intf.is_inside,
3431                                                       is_add=0)
3432
3433     def tearDown(self):
3434         super(TestDeterministicNAT, self).tearDown()
3435         if not self.vpp_dead:
3436             self.logger.info(self.vapi.cli("show nat44 detail"))
3437             self.clear_nat_det()
3438
3439
3440 class TestNAT64(MethodHolder):
3441     """ NAT64 Test Cases """
3442
3443     @classmethod
3444     def setUpClass(cls):
3445         super(TestNAT64, cls).setUpClass()
3446
3447         try:
3448             cls.tcp_port_in = 6303
3449             cls.tcp_port_out = 6303
3450             cls.udp_port_in = 6304
3451             cls.udp_port_out = 6304
3452             cls.icmp_id_in = 6305
3453             cls.icmp_id_out = 6305
3454             cls.nat_addr = '10.0.0.3'
3455             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3456             cls.vrf1_id = 10
3457             cls.vrf1_nat_addr = '10.0.10.3'
3458             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3459                                                    cls.vrf1_nat_addr)
3460
3461             cls.create_pg_interfaces(range(5))
3462             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3463             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3464             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3465
3466             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3467
3468             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3469
3470             cls.pg0.generate_remote_hosts(2)
3471
3472             for i in cls.ip6_interfaces:
3473                 i.admin_up()
3474                 i.config_ip6()
3475                 i.configure_ipv6_neighbors()
3476
3477             for i in cls.ip4_interfaces:
3478                 i.admin_up()
3479                 i.config_ip4()
3480                 i.resolve_arp()
3481
3482             cls.pg3.admin_up()
3483             cls.pg3.config_ip4()
3484             cls.pg3.resolve_arp()
3485             cls.pg3.config_ip6()
3486             cls.pg3.configure_ipv6_neighbors()
3487
3488         except Exception:
3489             super(TestNAT64, cls).tearDownClass()
3490             raise
3491
3492     def test_pool(self):
3493         """ Add/delete address to NAT64 pool """
3494         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3495
3496         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3497
3498         addresses = self.vapi.nat64_pool_addr_dump()
3499         self.assertEqual(len(addresses), 1)
3500         self.assertEqual(addresses[0].address, nat_addr)
3501
3502         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3503
3504         addresses = self.vapi.nat64_pool_addr_dump()
3505         self.assertEqual(len(addresses), 0)
3506
3507     def test_interface(self):
3508         """ Enable/disable NAT64 feature on the interface """
3509         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3510         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3511
3512         interfaces = self.vapi.nat64_interface_dump()
3513         self.assertEqual(len(interfaces), 2)
3514         pg0_found = False
3515         pg1_found = False
3516         for intf in interfaces:
3517             if intf.sw_if_index == self.pg0.sw_if_index:
3518                 self.assertEqual(intf.is_inside, 1)
3519                 pg0_found = True
3520             elif intf.sw_if_index == self.pg1.sw_if_index:
3521                 self.assertEqual(intf.is_inside, 0)
3522                 pg1_found = True
3523         self.assertTrue(pg0_found)
3524         self.assertTrue(pg1_found)
3525
3526         features = self.vapi.cli("show interface features pg0")
3527         self.assertNotEqual(features.find('nat64-in2out'), -1)
3528         features = self.vapi.cli("show interface features pg1")
3529         self.assertNotEqual(features.find('nat64-out2in'), -1)
3530
3531         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3532         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3533
3534         interfaces = self.vapi.nat64_interface_dump()
3535         self.assertEqual(len(interfaces), 0)
3536
3537     def test_static_bib(self):
3538         """ Add/delete static BIB entry """
3539         in_addr = socket.inet_pton(socket.AF_INET6,
3540                                    '2001:db8:85a3::8a2e:370:7334')
3541         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3542         in_port = 1234
3543         out_port = 5678
3544         proto = IP_PROTOS.tcp
3545
3546         self.vapi.nat64_add_del_static_bib(in_addr,
3547                                            out_addr,
3548                                            in_port,
3549                                            out_port,
3550                                            proto)
3551         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3552         static_bib_num = 0
3553         for bibe in bib:
3554             if bibe.is_static:
3555                 static_bib_num += 1
3556                 self.assertEqual(bibe.i_addr, in_addr)
3557                 self.assertEqual(bibe.o_addr, out_addr)
3558                 self.assertEqual(bibe.i_port, in_port)
3559                 self.assertEqual(bibe.o_port, out_port)
3560         self.assertEqual(static_bib_num, 1)
3561
3562         self.vapi.nat64_add_del_static_bib(in_addr,
3563                                            out_addr,
3564                                            in_port,
3565                                            out_port,
3566                                            proto,
3567                                            is_add=0)
3568         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3569         static_bib_num = 0
3570         for bibe in bib:
3571             if bibe.is_static:
3572                 static_bib_num += 1
3573         self.assertEqual(static_bib_num, 0)
3574
3575     def test_set_timeouts(self):
3576         """ Set NAT64 timeouts """
3577         # verify default values
3578         timeouts = self.vapi.nat64_get_timeouts()
3579         self.assertEqual(timeouts.udp, 300)
3580         self.assertEqual(timeouts.icmp, 60)
3581         self.assertEqual(timeouts.tcp_trans, 240)
3582         self.assertEqual(timeouts.tcp_est, 7440)
3583         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3584
3585         # set and verify custom values
3586         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3587                                      tcp_est=7450, tcp_incoming_syn=10)
3588         timeouts = self.vapi.nat64_get_timeouts()
3589         self.assertEqual(timeouts.udp, 200)
3590         self.assertEqual(timeouts.icmp, 30)
3591         self.assertEqual(timeouts.tcp_trans, 250)
3592         self.assertEqual(timeouts.tcp_est, 7450)
3593         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3594
3595     def test_dynamic(self):
3596         """ NAT64 dynamic translation test """
3597         self.tcp_port_in = 6303
3598         self.udp_port_in = 6304
3599         self.icmp_id_in = 6305
3600
3601         ses_num_start = self.nat64_get_ses_num()
3602
3603         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3604                                                 self.nat_addr_n)
3605         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3606         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3607
3608         # in2out
3609         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3610         self.pg0.add_stream(pkts)
3611         self.pg_enable_capture(self.pg_interfaces)
3612         self.pg_start()
3613         capture = self.pg1.get_capture(len(pkts))
3614         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3615                                 dst_ip=self.pg1.remote_ip4)
3616
3617         # out2in
3618         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3619         self.pg1.add_stream(pkts)
3620         self.pg_enable_capture(self.pg_interfaces)
3621         self.pg_start()
3622         capture = self.pg0.get_capture(len(pkts))
3623         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3624         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3625
3626         # in2out
3627         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3628         self.pg0.add_stream(pkts)
3629         self.pg_enable_capture(self.pg_interfaces)
3630         self.pg_start()
3631         capture = self.pg1.get_capture(len(pkts))
3632         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3633                                 dst_ip=self.pg1.remote_ip4)
3634
3635         # out2in
3636         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3637         self.pg1.add_stream(pkts)
3638         self.pg_enable_capture(self.pg_interfaces)
3639         self.pg_start()
3640         capture = self.pg0.get_capture(len(pkts))
3641         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3642
3643         ses_num_end = self.nat64_get_ses_num()
3644
3645         self.assertEqual(ses_num_end - ses_num_start, 3)
3646
3647         # tenant with specific VRF
3648         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3649                                                 self.vrf1_nat_addr_n,
3650                                                 vrf_id=self.vrf1_id)
3651         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3652
3653         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3654         self.pg2.add_stream(pkts)
3655         self.pg_enable_capture(self.pg_interfaces)
3656         self.pg_start()
3657         capture = self.pg1.get_capture(len(pkts))
3658         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3659                                 dst_ip=self.pg1.remote_ip4)
3660
3661         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3662         self.pg1.add_stream(pkts)
3663         self.pg_enable_capture(self.pg_interfaces)
3664         self.pg_start()
3665         capture = self.pg2.get_capture(len(pkts))
3666         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3667
3668     def test_static(self):
3669         """ NAT64 static translation test """
3670         self.tcp_port_in = 60303
3671         self.udp_port_in = 60304
3672         self.icmp_id_in = 60305
3673         self.tcp_port_out = 60303
3674         self.udp_port_out = 60304
3675         self.icmp_id_out = 60305
3676
3677         ses_num_start = self.nat64_get_ses_num()
3678
3679         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3680                                                 self.nat_addr_n)
3681         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3682         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3683
3684         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3685                                            self.nat_addr_n,
3686                                            self.tcp_port_in,
3687                                            self.tcp_port_out,
3688                                            IP_PROTOS.tcp)
3689         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3690                                            self.nat_addr_n,
3691                                            self.udp_port_in,
3692                                            self.udp_port_out,
3693                                            IP_PROTOS.udp)
3694         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3695                                            self.nat_addr_n,
3696                                            self.icmp_id_in,
3697                                            self.icmp_id_out,
3698                                            IP_PROTOS.icmp)
3699
3700         # in2out
3701         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3702         self.pg0.add_stream(pkts)
3703         self.pg_enable_capture(self.pg_interfaces)
3704         self.pg_start()
3705         capture = self.pg1.get_capture(len(pkts))
3706         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3707                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3708
3709         # out2in
3710         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3711         self.pg1.add_stream(pkts)
3712         self.pg_enable_capture(self.pg_interfaces)
3713         self.pg_start()
3714         capture = self.pg0.get_capture(len(pkts))
3715         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3716         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3717
3718         ses_num_end = self.nat64_get_ses_num()
3719
3720         self.assertEqual(ses_num_end - ses_num_start, 3)
3721
3722     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3723     def test_session_timeout(self):
3724         """ NAT64 session timeout """
3725         self.icmp_id_in = 1234
3726         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3727                                                 self.nat_addr_n)
3728         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3729         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3730         self.vapi.nat64_set_timeouts(icmp=5)
3731
3732         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3733         self.pg0.add_stream(pkts)
3734         self.pg_enable_capture(self.pg_interfaces)
3735         self.pg_start()
3736         capture = self.pg1.get_capture(len(pkts))
3737
3738         ses_num_before_timeout = self.nat64_get_ses_num()
3739
3740         sleep(15)
3741
3742         # ICMP session after timeout
3743         ses_num_after_timeout = self.nat64_get_ses_num()
3744         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3745
3746     def test_icmp_error(self):
3747         """ NAT64 ICMP Error message translation """
3748         self.tcp_port_in = 6303
3749         self.udp_port_in = 6304
3750         self.icmp_id_in = 6305
3751
3752         ses_num_start = self.nat64_get_ses_num()
3753
3754         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3755                                                 self.nat_addr_n)
3756         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3757         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3758
3759         # send some packets to create sessions
3760         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3761         self.pg0.add_stream(pkts)
3762         self.pg_enable_capture(self.pg_interfaces)
3763         self.pg_start()
3764         capture_ip4 = self.pg1.get_capture(len(pkts))
3765         self.verify_capture_out(capture_ip4,
3766                                 nat_ip=self.nat_addr,
3767                                 dst_ip=self.pg1.remote_ip4)
3768
3769         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3770         self.pg1.add_stream(pkts)
3771         self.pg_enable_capture(self.pg_interfaces)
3772         self.pg_start()
3773         capture_ip6 = self.pg0.get_capture(len(pkts))
3774         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3775         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3776                                    self.pg0.remote_ip6)
3777
3778         # in2out
3779         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3780                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3781                 ICMPv6DestUnreach(code=1) /
3782                 packet[IPv6] for packet in capture_ip6]
3783         self.pg0.add_stream(pkts)
3784         self.pg_enable_capture(self.pg_interfaces)
3785         self.pg_start()
3786         capture = self.pg1.get_capture(len(pkts))
3787         for packet in capture:
3788             try:
3789                 self.assertEqual(packet[IP].src, self.nat_addr)
3790                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3791                 self.assertEqual(packet[ICMP].type, 3)
3792                 self.assertEqual(packet[ICMP].code, 13)
3793                 inner = packet[IPerror]
3794                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3795                 self.assertEqual(inner.dst, self.nat_addr)
3796                 self.check_icmp_checksum(packet)
3797                 if inner.haslayer(TCPerror):
3798                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3799                 elif inner.haslayer(UDPerror):
3800                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3801                 else:
3802                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3803             except:
3804                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3805                 raise
3806
3807         # out2in
3808         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3809                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3810                 ICMP(type=3, code=13) /
3811                 packet[IP] for packet in capture_ip4]
3812         self.pg1.add_stream(pkts)
3813         self.pg_enable_capture(self.pg_interfaces)
3814         self.pg_start()
3815         capture = self.pg0.get_capture(len(pkts))
3816         for packet in capture:
3817             try:
3818                 self.assertEqual(packet[IPv6].src, ip.src)
3819                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3820                 icmp = packet[ICMPv6DestUnreach]
3821                 self.assertEqual(icmp.code, 1)
3822                 inner = icmp[IPerror6]
3823                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3824                 self.assertEqual(inner.dst, ip.src)
3825                 self.check_icmpv6_checksum(packet)
3826                 if inner.haslayer(TCPerror):
3827                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3828                 elif inner.haslayer(UDPerror):
3829                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3830                 else:
3831                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3832                                      self.icmp_id_in)
3833             except:
3834                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3835                 raise
3836
3837     def test_hairpinning(self):
3838         """ NAT64 hairpinning """
3839
3840         client = self.pg0.remote_hosts[0]
3841         server = self.pg0.remote_hosts[1]
3842         server_tcp_in_port = 22
3843         server_tcp_out_port = 4022
3844         server_udp_in_port = 23
3845         server_udp_out_port = 4023
3846         client_tcp_in_port = 1234
3847         client_udp_in_port = 1235
3848         client_tcp_out_port = 0
3849         client_udp_out_port = 0
3850         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3851         nat_addr_ip6 = ip.src
3852
3853         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3854                                                 self.nat_addr_n)
3855         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3856         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3857
3858         self.vapi.nat64_add_del_static_bib(server.ip6n,
3859                                            self.nat_addr_n,
3860                                            server_tcp_in_port,
3861                                            server_tcp_out_port,
3862                                            IP_PROTOS.tcp)
3863         self.vapi.nat64_add_del_static_bib(server.ip6n,
3864                                            self.nat_addr_n,
3865                                            server_udp_in_port,
3866                                            server_udp_out_port,
3867                                            IP_PROTOS.udp)
3868
3869         # client to server
3870         pkts = []
3871         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3872              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3873              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3874         pkts.append(p)
3875         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3876              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3877              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3878         pkts.append(p)
3879         self.pg0.add_stream(pkts)
3880         self.pg_enable_capture(self.pg_interfaces)
3881         self.pg_start()
3882         capture = self.pg0.get_capture(len(pkts))
3883         for packet in capture:
3884             try:
3885                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3886                 self.assertEqual(packet[IPv6].dst, server.ip6)
3887                 if packet.haslayer(TCP):
3888                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3889                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3890                     self.check_tcp_checksum(packet)
3891                     client_tcp_out_port = packet[TCP].sport
3892                 else:
3893                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3894                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3895                     self.check_udp_checksum(packet)
3896                     client_udp_out_port = packet[UDP].sport
3897             except:
3898                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3899                 raise
3900
3901         # server to client
3902         pkts = []
3903         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3904              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3905              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3906         pkts.append(p)
3907         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3908              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3909              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3910         pkts.append(p)
3911         self.pg0.add_stream(pkts)
3912         self.pg_enable_capture(self.pg_interfaces)
3913         self.pg_start()
3914         capture = self.pg0.get_capture(len(pkts))
3915         for packet in capture:
3916             try:
3917                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3918                 self.assertEqual(packet[IPv6].dst, client.ip6)
3919                 if packet.haslayer(TCP):
3920                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3921                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3922                     self.check_tcp_checksum(packet)
3923                 else:
3924                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3925                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3926                     self.check_udp_checksum(packet)
3927             except:
3928                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3929                 raise
3930
3931         # ICMP error
3932         pkts = []
3933         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3934                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3935                 ICMPv6DestUnreach(code=1) /
3936                 packet[IPv6] for packet in capture]
3937         self.pg0.add_stream(pkts)
3938         self.pg_enable_capture(self.pg_interfaces)
3939         self.pg_start()
3940         capture = self.pg0.get_capture(len(pkts))
3941         for packet in capture:
3942             try:
3943                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3944                 self.assertEqual(packet[IPv6].dst, server.ip6)
3945                 icmp = packet[ICMPv6DestUnreach]
3946                 self.assertEqual(icmp.code, 1)
3947                 inner = icmp[IPerror6]
3948                 self.assertEqual(inner.src, server.ip6)
3949                 self.assertEqual(inner.dst, nat_addr_ip6)
3950                 self.check_icmpv6_checksum(packet)
3951                 if inner.haslayer(TCPerror):
3952                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3953                     self.assertEqual(inner[TCPerror].dport,
3954                                      client_tcp_out_port)
3955                 else:
3956                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3957                     self.assertEqual(inner[UDPerror].dport,
3958                                      client_udp_out_port)
3959             except:
3960                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3961                 raise
3962
3963     def test_prefix(self):
3964         """ NAT64 Network-Specific Prefix """
3965
3966         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3967                                                 self.nat_addr_n)
3968         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3969         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3970         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3971                                                 self.vrf1_nat_addr_n,
3972                                                 vrf_id=self.vrf1_id)
3973         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3974
3975         # Add global prefix
3976         global_pref64 = "2001:db8::"
3977         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3978         global_pref64_len = 32
3979         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3980
3981         prefix = self.vapi.nat64_prefix_dump()
3982         self.assertEqual(len(prefix), 1)
3983         self.assertEqual(prefix[0].prefix, global_pref64_n)
3984         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3985         self.assertEqual(prefix[0].vrf_id, 0)
3986
3987         # Add tenant specific prefix
3988         vrf1_pref64 = "2001:db8:122:300::"
3989         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3990         vrf1_pref64_len = 56
3991         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3992                                        vrf1_pref64_len,
3993                                        vrf_id=self.vrf1_id)
3994         prefix = self.vapi.nat64_prefix_dump()
3995         self.assertEqual(len(prefix), 2)
3996
3997         # Global prefix
3998         pkts = self.create_stream_in_ip6(self.pg0,
3999                                          self.pg1,
4000                                          pref=global_pref64,
4001                                          plen=global_pref64_len)
4002         self.pg0.add_stream(pkts)
4003         self.pg_enable_capture(self.pg_interfaces)
4004         self.pg_start()
4005         capture = self.pg1.get_capture(len(pkts))
4006         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4007                                 dst_ip=self.pg1.remote_ip4)
4008
4009         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4010         self.pg1.add_stream(pkts)
4011         self.pg_enable_capture(self.pg_interfaces)
4012         self.pg_start()
4013         capture = self.pg0.get_capture(len(pkts))
4014         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4015                                   global_pref64,
4016                                   global_pref64_len)
4017         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4018
4019         # Tenant specific prefix
4020         pkts = self.create_stream_in_ip6(self.pg2,
4021                                          self.pg1,
4022                                          pref=vrf1_pref64,
4023                                          plen=vrf1_pref64_len)
4024         self.pg2.add_stream(pkts)
4025         self.pg_enable_capture(self.pg_interfaces)
4026         self.pg_start()
4027         capture = self.pg1.get_capture(len(pkts))
4028         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4029                                 dst_ip=self.pg1.remote_ip4)
4030
4031         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4032         self.pg1.add_stream(pkts)
4033         self.pg_enable_capture(self.pg_interfaces)
4034         self.pg_start()
4035         capture = self.pg2.get_capture(len(pkts))
4036         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4037                                   vrf1_pref64,
4038                                   vrf1_pref64_len)
4039         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4040
4041     def test_unknown_proto(self):
4042         """ NAT64 translate packet with unknown protocol """
4043
4044         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4045                                                 self.nat_addr_n)
4046         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4047         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4048         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4049
4050         # in2out
4051         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4052              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4053              TCP(sport=self.tcp_port_in, dport=20))
4054         self.pg0.add_stream(p)
4055         self.pg_enable_capture(self.pg_interfaces)
4056         self.pg_start()
4057         p = self.pg1.get_capture(1)
4058
4059         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4060              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4061              GRE() /
4062              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4063              TCP(sport=1234, dport=1234))
4064         self.pg0.add_stream(p)
4065         self.pg_enable_capture(self.pg_interfaces)
4066         self.pg_start()
4067         p = self.pg1.get_capture(1)
4068         packet = p[0]
4069         try:
4070             self.assertEqual(packet[IP].src, self.nat_addr)
4071             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4072             self.assertTrue(packet.haslayer(GRE))
4073             self.check_ip_checksum(packet)
4074         except:
4075             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4076             raise
4077
4078         # out2in
4079         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4080              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4081              GRE() /
4082              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4083              TCP(sport=1234, dport=1234))
4084         self.pg1.add_stream(p)
4085         self.pg_enable_capture(self.pg_interfaces)
4086         self.pg_start()
4087         p = self.pg0.get_capture(1)
4088         packet = p[0]
4089         try:
4090             self.assertEqual(packet[IPv6].src, remote_ip6)
4091             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4092             self.assertEqual(packet[IPv6].nh, 47)
4093         except:
4094             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4095             raise
4096
4097     def test_hairpinning_unknown_proto(self):
4098         """ NAT64 translate packet with unknown protocol - hairpinning """
4099
4100         client = self.pg0.remote_hosts[0]
4101         server = self.pg0.remote_hosts[1]
4102         server_tcp_in_port = 22
4103         server_tcp_out_port = 4022
4104         client_tcp_in_port = 1234
4105         client_tcp_out_port = 1235
4106         server_nat_ip = "10.0.0.100"
4107         client_nat_ip = "10.0.0.110"
4108         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4109         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4110         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4111         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4112
4113         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4114                                                 client_nat_ip_n)
4115         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4116         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4117
4118         self.vapi.nat64_add_del_static_bib(server.ip6n,
4119                                            server_nat_ip_n,
4120                                            server_tcp_in_port,
4121                                            server_tcp_out_port,
4122                                            IP_PROTOS.tcp)
4123
4124         self.vapi.nat64_add_del_static_bib(server.ip6n,
4125                                            server_nat_ip_n,
4126                                            0,
4127                                            0,
4128                                            IP_PROTOS.gre)
4129
4130         self.vapi.nat64_add_del_static_bib(client.ip6n,
4131                                            client_nat_ip_n,
4132                                            client_tcp_in_port,
4133                                            client_tcp_out_port,
4134                                            IP_PROTOS.tcp)
4135
4136         # client to server
4137         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4138              IPv6(src=client.ip6, dst=server_nat_ip6) /
4139              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4140         self.pg0.add_stream(p)
4141         self.pg_enable_capture(self.pg_interfaces)
4142         self.pg_start()
4143         p = self.pg0.get_capture(1)
4144
4145         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4146              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4147              GRE() /
4148              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4149              TCP(sport=1234, dport=1234))
4150         self.pg0.add_stream(p)
4151         self.pg_enable_capture(self.pg_interfaces)
4152         self.pg_start()
4153         p = self.pg0.get_capture(1)
4154         packet = p[0]
4155         try:
4156             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4157             self.assertEqual(packet[IPv6].dst, server.ip6)
4158             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4159         except:
4160             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4161             raise
4162
4163         # server to client
4164         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4165              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4166              GRE() /
4167              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4168              TCP(sport=1234, dport=1234))
4169         self.pg0.add_stream(p)
4170         self.pg_enable_capture(self.pg_interfaces)
4171         self.pg_start()
4172         p = self.pg0.get_capture(1)
4173         packet = p[0]
4174         try:
4175             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4176             self.assertEqual(packet[IPv6].dst, client.ip6)
4177             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4178         except:
4179             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4180             raise
4181
4182     def test_one_armed_nat64(self):
4183         """ One armed NAT64 """
4184         external_port = 0
4185         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4186                                            '64:ff9b::',
4187                                            96)
4188
4189         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4190                                                 self.nat_addr_n)
4191         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4192         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4193
4194         # in2out
4195         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4196              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4197              TCP(sport=12345, dport=80))
4198         self.pg3.add_stream(p)
4199         self.pg_enable_capture(self.pg_interfaces)
4200         self.pg_start()
4201         capture = self.pg3.get_capture(1)
4202         p = capture[0]
4203         try:
4204             ip = p[IP]
4205             tcp = p[TCP]
4206             self.assertEqual(ip.src, self.nat_addr)
4207             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4208             self.assertNotEqual(tcp.sport, 12345)
4209             external_port = tcp.sport
4210             self.assertEqual(tcp.dport, 80)
4211             self.check_tcp_checksum(p)
4212             self.check_ip_checksum(p)
4213         except:
4214             self.logger.error(ppp("Unexpected or invalid packet:", p))
4215             raise
4216
4217         # out2in
4218         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4219              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4220              TCP(sport=80, dport=external_port))
4221         self.pg3.add_stream(p)
4222         self.pg_enable_capture(self.pg_interfaces)
4223         self.pg_start()
4224         capture = self.pg3.get_capture(1)
4225         p = capture[0]
4226         try:
4227             ip = p[IPv6]
4228             tcp = p[TCP]
4229             self.assertEqual(ip.src, remote_host_ip6)
4230             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4231             self.assertEqual(tcp.sport, 80)
4232             self.assertEqual(tcp.dport, 12345)
4233             self.check_tcp_checksum(p)
4234         except:
4235             self.logger.error(ppp("Unexpected or invalid packet:", p))
4236             raise
4237
4238     def test_frag_in_order(self):
4239         """ NAT64 translate fragments arriving in order """
4240         self.tcp_port_in = random.randint(1025, 65535)
4241
4242         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4243                                                 self.nat_addr_n)
4244         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4245         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4246
4247         reass = self.vapi.nat_reass_dump()
4248         reass_n_start = len(reass)
4249
4250         # in2out
4251         data = 'a' * 200
4252         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4253                                            self.tcp_port_in, 20, data)
4254         self.pg0.add_stream(pkts)
4255         self.pg_enable_capture(self.pg_interfaces)
4256         self.pg_start()
4257         frags = self.pg1.get_capture(len(pkts))
4258         p = self.reass_frags_and_verify(frags,
4259                                         self.nat_addr,
4260                                         self.pg1.remote_ip4)
4261         self.assertEqual(p[TCP].dport, 20)
4262         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4263         self.tcp_port_out = p[TCP].sport
4264         self.assertEqual(data, p[Raw].load)
4265
4266         # out2in
4267         data = "A" * 4 + "b" * 16 + "C" * 3
4268         pkts = self.create_stream_frag(self.pg1,
4269                                        self.nat_addr,
4270                                        20,
4271                                        self.tcp_port_out,
4272                                        data)
4273         self.pg1.add_stream(pkts)
4274         self.pg_enable_capture(self.pg_interfaces)
4275         self.pg_start()
4276         frags = self.pg0.get_capture(len(pkts))
4277         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4278         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4279         self.assertEqual(p[TCP].sport, 20)
4280         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4281         self.assertEqual(data, p[Raw].load)
4282
4283         reass = self.vapi.nat_reass_dump()
4284         reass_n_end = len(reass)
4285
4286         self.assertEqual(reass_n_end - reass_n_start, 2)
4287
4288     def test_reass_hairpinning(self):
4289         """ NAT64 fragments hairpinning """
4290         data = 'a' * 200
4291         client = self.pg0.remote_hosts[0]
4292         server = self.pg0.remote_hosts[1]
4293         server_in_port = random.randint(1025, 65535)
4294         server_out_port = random.randint(1025, 65535)
4295         client_in_port = random.randint(1025, 65535)
4296         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4297         nat_addr_ip6 = ip.src
4298
4299         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4300                                                 self.nat_addr_n)
4301         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4302         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4303
4304         # add static BIB entry for server
4305         self.vapi.nat64_add_del_static_bib(server.ip6n,
4306                                            self.nat_addr_n,
4307                                            server_in_port,
4308                                            server_out_port,
4309                                            IP_PROTOS.tcp)
4310
4311         # send packet from host to server
4312         pkts = self.create_stream_frag_ip6(self.pg0,
4313                                            self.nat_addr,
4314                                            client_in_port,
4315                                            server_out_port,
4316                                            data)
4317         self.pg0.add_stream(pkts)
4318         self.pg_enable_capture(self.pg_interfaces)
4319         self.pg_start()
4320         frags = self.pg0.get_capture(len(pkts))
4321         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4322         self.assertNotEqual(p[TCP].sport, client_in_port)
4323         self.assertEqual(p[TCP].dport, server_in_port)
4324         self.assertEqual(data, p[Raw].load)
4325
4326     def test_frag_out_of_order(self):
4327         """ NAT64 translate fragments arriving out of order """
4328         self.tcp_port_in = random.randint(1025, 65535)
4329
4330         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4331                                                 self.nat_addr_n)
4332         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4333         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4334
4335         # in2out
4336         data = 'a' * 200
4337         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4338                                            self.tcp_port_in, 20, data)
4339         pkts.reverse()
4340         self.pg0.add_stream(pkts)
4341         self.pg_enable_capture(self.pg_interfaces)
4342         self.pg_start()
4343         frags = self.pg1.get_capture(len(pkts))
4344         p = self.reass_frags_and_verify(frags,
4345                                         self.nat_addr,
4346                                         self.pg1.remote_ip4)
4347         self.assertEqual(p[TCP].dport, 20)
4348         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4349         self.tcp_port_out = p[TCP].sport
4350         self.assertEqual(data, p[Raw].load)
4351
4352         # out2in
4353         data = "A" * 4 + "B" * 16 + "C" * 3
4354         pkts = self.create_stream_frag(self.pg1,
4355                                        self.nat_addr,
4356                                        20,
4357                                        self.tcp_port_out,
4358                                        data)
4359         pkts.reverse()
4360         self.pg1.add_stream(pkts)
4361         self.pg_enable_capture(self.pg_interfaces)
4362         self.pg_start()
4363         frags = self.pg0.get_capture(len(pkts))
4364         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4365         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4366         self.assertEqual(p[TCP].sport, 20)
4367         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4368         self.assertEqual(data, p[Raw].load)
4369
4370     def test_interface_addr(self):
4371         """ Acquire NAT64 pool addresses from interface """
4372         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4373
4374         # no address in NAT64 pool
4375         adresses = self.vapi.nat44_address_dump()
4376         self.assertEqual(0, len(adresses))
4377
4378         # configure interface address and check NAT64 address pool
4379         self.pg4.config_ip4()
4380         addresses = self.vapi.nat64_pool_addr_dump()
4381         self.assertEqual(len(addresses), 1)
4382         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4383
4384         # remove interface address and check NAT64 address pool
4385         self.pg4.unconfig_ip4()
4386         addresses = self.vapi.nat64_pool_addr_dump()
4387         self.assertEqual(0, len(adresses))
4388
4389     def nat64_get_ses_num(self):
4390         """
4391         Return number of active NAT64 sessions.
4392         """
4393         st = self.vapi.nat64_st_dump()
4394         return len(st)
4395
4396     def clear_nat64(self):
4397         """
4398         Clear NAT64 configuration.
4399         """
4400         self.vapi.nat64_set_timeouts()
4401
4402         interfaces = self.vapi.nat64_interface_dump()
4403         for intf in interfaces:
4404             if intf.is_inside > 1:
4405                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4406                                                   0,
4407                                                   is_add=0)
4408             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4409                                               intf.is_inside,
4410                                               is_add=0)
4411
4412         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4413         for bibe in bib:
4414             if bibe.is_static:
4415                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4416                                                    bibe.o_addr,
4417                                                    bibe.i_port,
4418                                                    bibe.o_port,
4419                                                    bibe.proto,
4420                                                    bibe.vrf_id,
4421                                                    is_add=0)
4422
4423         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4424         for bibe in bib:
4425             if bibe.is_static:
4426                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4427                                                    bibe.o_addr,
4428                                                    bibe.i_port,
4429                                                    bibe.o_port,
4430                                                    bibe.proto,
4431                                                    bibe.vrf_id,
4432                                                    is_add=0)
4433
4434         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4435         for bibe in bib:
4436             if bibe.is_static:
4437                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4438                                                    bibe.o_addr,
4439                                                    bibe.i_port,
4440                                                    bibe.o_port,
4441                                                    bibe.proto,
4442                                                    bibe.vrf_id,
4443                                                    is_add=0)
4444
4445         adresses = self.vapi.nat64_pool_addr_dump()
4446         for addr in adresses:
4447             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4448                                                     addr.address,
4449                                                     vrf_id=addr.vrf_id,
4450                                                     is_add=0)
4451
4452         prefixes = self.vapi.nat64_prefix_dump()
4453         for prefix in prefixes:
4454             self.vapi.nat64_add_del_prefix(prefix.prefix,
4455                                            prefix.prefix_len,
4456                                            vrf_id=prefix.vrf_id,
4457                                            is_add=0)
4458
4459     def tearDown(self):
4460         super(TestNAT64, self).tearDown()
4461         if not self.vpp_dead:
4462             self.logger.info(self.vapi.cli("show nat64 pool"))
4463             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4464             self.logger.info(self.vapi.cli("show nat64 prefix"))
4465             self.logger.info(self.vapi.cli("show nat64 bib all"))
4466             self.logger.info(self.vapi.cli("show nat64 session table all"))
4467             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4468             self.clear_nat64()
4469
4470
4471 class TestDSlite(MethodHolder):
4472     """ DS-Lite Test Cases """
4473
4474     @classmethod
4475     def setUpClass(cls):
4476         super(TestDSlite, cls).setUpClass()
4477
4478         try:
4479             cls.nat_addr = '10.0.0.3'
4480             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4481
4482             cls.create_pg_interfaces(range(2))
4483             cls.pg0.admin_up()
4484             cls.pg0.config_ip4()
4485             cls.pg0.resolve_arp()
4486             cls.pg1.admin_up()
4487             cls.pg1.config_ip6()
4488             cls.pg1.generate_remote_hosts(2)
4489             cls.pg1.configure_ipv6_neighbors()
4490
4491         except Exception:
4492             super(TestDSlite, cls).tearDownClass()
4493             raise
4494
4495     def test_dslite(self):
4496         """ Test DS-Lite """
4497         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4498                                                  self.nat_addr_n)
4499         aftr_ip4 = '192.0.0.1'
4500         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4501         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4502         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4503         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4504
4505         # UDP
4506         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4507              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4508              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4509              UDP(sport=20000, dport=10000))
4510         self.pg1.add_stream(p)
4511         self.pg_enable_capture(self.pg_interfaces)
4512         self.pg_start()
4513         capture = self.pg0.get_capture(1)
4514         capture = capture[0]
4515         self.assertFalse(capture.haslayer(IPv6))
4516         self.assertEqual(capture[IP].src, self.nat_addr)
4517         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4518         self.assertNotEqual(capture[UDP].sport, 20000)
4519         self.assertEqual(capture[UDP].dport, 10000)
4520         self.check_ip_checksum(capture)
4521         out_port = capture[UDP].sport
4522
4523         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4524              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4525              UDP(sport=10000, dport=out_port))
4526         self.pg0.add_stream(p)
4527         self.pg_enable_capture(self.pg_interfaces)
4528         self.pg_start()
4529         capture = self.pg1.get_capture(1)
4530         capture = capture[0]
4531         self.assertEqual(capture[IPv6].src, aftr_ip6)
4532         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4533         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4534         self.assertEqual(capture[IP].dst, '192.168.1.1')
4535         self.assertEqual(capture[UDP].sport, 10000)
4536         self.assertEqual(capture[UDP].dport, 20000)
4537         self.check_ip_checksum(capture)
4538
4539         # TCP
4540         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4541              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4542              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4543              TCP(sport=20001, dport=10001))
4544         self.pg1.add_stream(p)
4545         self.pg_enable_capture(self.pg_interfaces)
4546         self.pg_start()
4547         capture = self.pg0.get_capture(1)
4548         capture = capture[0]
4549         self.assertFalse(capture.haslayer(IPv6))
4550         self.assertEqual(capture[IP].src, self.nat_addr)
4551         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4552         self.assertNotEqual(capture[TCP].sport, 20001)
4553         self.assertEqual(capture[TCP].dport, 10001)
4554         self.check_ip_checksum(capture)
4555         self.check_tcp_checksum(capture)
4556         out_port = capture[TCP].sport
4557
4558         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4559              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4560              TCP(sport=10001, dport=out_port))
4561         self.pg0.add_stream(p)
4562         self.pg_enable_capture(self.pg_interfaces)
4563         self.pg_start()
4564         capture = self.pg1.get_capture(1)
4565         capture = capture[0]
4566         self.assertEqual(capture[IPv6].src, aftr_ip6)
4567         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4568         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4569         self.assertEqual(capture[IP].dst, '192.168.1.1')
4570         self.assertEqual(capture[TCP].sport, 10001)
4571         self.assertEqual(capture[TCP].dport, 20001)
4572         self.check_ip_checksum(capture)
4573         self.check_tcp_checksum(capture)
4574
4575         # ICMP
4576         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4577              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4578              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4579              ICMP(id=4000, type='echo-request'))
4580         self.pg1.add_stream(p)
4581         self.pg_enable_capture(self.pg_interfaces)
4582         self.pg_start()
4583         capture = self.pg0.get_capture(1)
4584         capture = capture[0]
4585         self.assertFalse(capture.haslayer(IPv6))
4586         self.assertEqual(capture[IP].src, self.nat_addr)
4587         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4588         self.assertNotEqual(capture[ICMP].id, 4000)
4589         self.check_ip_checksum(capture)
4590         self.check_icmp_checksum(capture)
4591         out_id = capture[ICMP].id
4592
4593         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4594              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4595              ICMP(id=out_id, type='echo-reply'))
4596         self.pg0.add_stream(p)
4597         self.pg_enable_capture(self.pg_interfaces)
4598         self.pg_start()
4599         capture = self.pg1.get_capture(1)
4600         capture = capture[0]
4601         self.assertEqual(capture[IPv6].src, aftr_ip6)
4602         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4603         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4604         self.assertEqual(capture[IP].dst, '192.168.1.1')
4605         self.assertEqual(capture[ICMP].id, 4000)
4606         self.check_ip_checksum(capture)
4607         self.check_icmp_checksum(capture)
4608
4609     def tearDown(self):
4610         super(TestDSlite, self).tearDown()
4611         if not self.vpp_dead:
4612             self.logger.info(self.vapi.cli("show dslite pool"))
4613             self.logger.info(
4614                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4615             self.logger.info(self.vapi.cli("show dslite sessions"))
4616
4617 if __name__ == '__main__':
4618     unittest.main(testRunner=VppTestRunner)