SNAT: IP fragmentation (VPP-890)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
246         """
247         Create packet stream for outside network
248
249         :param out_if: Outside interface
250         :param dst_ip: Destination IP address (Default use global NAT address)
251         :param ttl: TTL of generated packets
252         """
253         if dst_ip is None:
254             dst_ip = self.nat_addr
255         pkts = []
256         # TCP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              TCP(dport=self.tcp_port_out, sport=20))
260         pkts.append(p)
261
262         # UDP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              UDP(dport=self.udp_port_out, sport=20))
266         pkts.append(p)
267
268         # ICMP
269         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271              ICMP(id=self.icmp_id_out, type='echo-reply'))
272         pkts.append(p)
273
274         return pkts
275
276     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277                            packet_num=3, dst_ip=None):
278         """
279         Verify captured packets on outside network
280
281         :param capture: Captured packets
282         :param nat_ip: Translated IP address (Default use global NAT address)
283         :param same_port: Sorce port number is not translated (Default False)
284         :param packet_num: Expected number of packets (Default 3)
285         :param dst_ip: Destination IP address (Default do not verify)
286         """
287         if nat_ip is None:
288             nat_ip = self.nat_addr
289         self.assertEqual(packet_num, len(capture))
290         for packet in capture:
291             try:
292                 self.check_ip_checksum(packet)
293                 self.assertEqual(packet[IP].src, nat_ip)
294                 if dst_ip is not None:
295                     self.assertEqual(packet[IP].dst, dst_ip)
296                 if packet.haslayer(TCP):
297                     if same_port:
298                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
299                     else:
300                         self.assertNotEqual(
301                             packet[TCP].sport, self.tcp_port_in)
302                     self.tcp_port_out = packet[TCP].sport
303                     self.check_tcp_checksum(packet)
304                 elif packet.haslayer(UDP):
305                     if same_port:
306                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
307                     else:
308                         self.assertNotEqual(
309                             packet[UDP].sport, self.udp_port_in)
310                     self.udp_port_out = packet[UDP].sport
311                 else:
312                     if same_port:
313                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
314                     else:
315                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316                     self.icmp_id_out = packet[ICMP].id
317                     self.check_icmp_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323     def verify_capture_in(self, capture, in_if, packet_num=3):
324         """
325         Verify captured packets on inside network
326
327         :param capture: Captured packets
328         :param in_if: Inside interface
329         :param packet_num: Expected number of packets (Default 3)
330         """
331         self.assertEqual(packet_num, len(capture))
332         for packet in capture:
333             try:
334                 self.check_ip_checksum(packet)
335                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336                 if packet.haslayer(TCP):
337                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338                     self.check_tcp_checksum(packet)
339                 elif packet.haslayer(UDP):
340                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
341                 else:
342                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343                     self.check_icmp_checksum(packet)
344             except:
345                 self.logger.error(ppp("Unexpected or invalid packet "
346                                       "(inside network):", packet))
347                 raise
348
349     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
350         """
351         Verify captured IPv6 packets on inside network
352
353         :param capture: Captured packets
354         :param src_ip: Source IP
355         :param dst_ip: Destination IP address
356         :param packet_num: Expected number of packets (Default 3)
357         """
358         self.assertEqual(packet_num, len(capture))
359         for packet in capture:
360             try:
361                 self.assertEqual(packet[IPv6].src, src_ip)
362                 self.assertEqual(packet[IPv6].dst, dst_ip)
363                 if packet.haslayer(TCP):
364                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365                     self.check_tcp_checksum(packet)
366                 elif packet.haslayer(UDP):
367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
368                     self.check_udp_checksum(packet)
369                 else:
370                     self.assertEqual(packet[ICMPv6EchoReply].id,
371                                      self.icmp_id_in)
372                     self.check_icmpv6_checksum(packet)
373             except:
374                 self.logger.error(ppp("Unexpected or invalid packet "
375                                       "(inside network):", packet))
376                 raise
377
378     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
379         """
380         Verify captured packet that don't have to be translated
381
382         :param capture: Captured packets
383         :param ingress_if: Ingress interface
384         :param egress_if: Egress interface
385         """
386         for packet in capture:
387             try:
388                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390                 if packet.haslayer(TCP):
391                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392                 elif packet.haslayer(UDP):
393                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
394                 else:
395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
396             except:
397                 self.logger.error(ppp("Unexpected or invalid packet "
398                                       "(inside network):", packet))
399                 raise
400
401     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402                                             packet_num=3, icmp_type=11):
403         """
404         Verify captured packets with ICMP errors on outside network
405
406         :param capture: Captured packets
407         :param src_ip: Translated IP address or IP address of VPP
408                        (Default use global NAT address)
409         :param packet_num: Expected number of packets (Default 3)
410         :param icmp_type: Type of error ICMP packet
411                           we are expecting (Default 11)
412         """
413         if src_ip is None:
414             src_ip = self.nat_addr
415         self.assertEqual(packet_num, len(capture))
416         for packet in capture:
417             try:
418                 self.assertEqual(packet[IP].src, src_ip)
419                 self.assertTrue(packet.haslayer(ICMP))
420                 icmp = packet[ICMP]
421                 self.assertEqual(icmp.type, icmp_type)
422                 self.assertTrue(icmp.haslayer(IPerror))
423                 inner_ip = icmp[IPerror]
424                 if inner_ip.haslayer(TCPerror):
425                     self.assertEqual(inner_ip[TCPerror].dport,
426                                      self.tcp_port_out)
427                 elif inner_ip.haslayer(UDPerror):
428                     self.assertEqual(inner_ip[UDPerror].dport,
429                                      self.udp_port_out)
430                 else:
431                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
432             except:
433                 self.logger.error(ppp("Unexpected or invalid packet "
434                                       "(outside network):", packet))
435                 raise
436
437     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
438                                            icmp_type=11):
439         """
440         Verify captured packets with ICMP errors on inside network
441
442         :param capture: Captured packets
443         :param in_if: Inside interface
444         :param packet_num: Expected number of packets (Default 3)
445         :param icmp_type: Type of error ICMP packet
446                           we are expecting (Default 11)
447         """
448         self.assertEqual(packet_num, len(capture))
449         for packet in capture:
450             try:
451                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452                 self.assertTrue(packet.haslayer(ICMP))
453                 icmp = packet[ICMP]
454                 self.assertEqual(icmp.type, icmp_type)
455                 self.assertTrue(icmp.haslayer(IPerror))
456                 inner_ip = icmp[IPerror]
457                 if inner_ip.haslayer(TCPerror):
458                     self.assertEqual(inner_ip[TCPerror].sport,
459                                      self.tcp_port_in)
460                 elif inner_ip.haslayer(UDPerror):
461                     self.assertEqual(inner_ip[UDPerror].sport,
462                                      self.udp_port_in)
463                 else:
464                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
465             except:
466                 self.logger.error(ppp("Unexpected or invalid packet "
467                                       "(inside network):", packet))
468                 raise
469
470     def create_stream_frag(self, src_if, dst, sport, dport, data):
471         """
472         Create fragmented packet stream
473
474         :param src_if: Source interface
475         :param dst: Destination IPv4 address
476         :param sport: Source TCP port
477         :param dport: Destination TCP port
478         :param data: Payload data
479         :returns: Fragmets
480         """
481         id = random.randint(0, 65535)
482         p = (IP(src=src_if.remote_ip4, dst=dst) /
483              TCP(sport=sport, dport=dport) /
484              Raw(data))
485         p = p.__class__(str(p))
486         chksum = p['TCP'].chksum
487         pkts = []
488         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490              TCP(sport=sport, dport=dport, chksum=chksum) /
491              Raw(data[0:4]))
492         pkts.append(p)
493         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495                 proto=IP_PROTOS.tcp) /
496              Raw(data[4:20]))
497         pkts.append(p)
498         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500                 id=id) /
501              Raw(data[20:]))
502         pkts.append(p)
503         return pkts
504
505     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506                                pref=None, plen=0, frag_size=128):
507         """
508         Create fragmented packet stream
509
510         :param src_if: Source interface
511         :param dst: Destination IPv4 address
512         :param sport: Source TCP port
513         :param dport: Destination TCP port
514         :param data: Payload data
515         :param pref: NAT64 prefix
516         :param plen: NAT64 prefix length
517         :param fragsize: size of fragments
518         :returns: Fragmets
519         """
520         if pref is None:
521             dst_ip6 = ''.join(['64:ff9b::', dst])
522         else:
523             dst_ip6 = self.compose_ip6(dst, pref, plen)
524
525         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528              TCP(sport=sport, dport=dport) /
529              Raw(data))
530
531         return fragment6(p, frag_size)
532
533     def reass_frags_and_verify(self, frags, src, dst):
534         """
535         Reassemble and verify fragmented packet
536
537         :param frags: Captured fragments
538         :param src: Source IPv4 address to verify
539         :param dst: Destination IPv4 address to verify
540
541         :returns: Reassembled IPv4 packet
542         """
543         buffer = StringIO.StringIO()
544         for p in frags:
545             self.assertEqual(p[IP].src, src)
546             self.assertEqual(p[IP].dst, dst)
547             self.check_ip_checksum(p)
548             buffer.seek(p[IP].frag * 8)
549             buffer.write(p[IP].payload)
550         ip = frags[0].getlayer(IP)
551         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552                 proto=frags[0][IP].proto)
553         if ip.proto == IP_PROTOS.tcp:
554             p = (ip / TCP(buffer.getvalue()))
555             self.check_tcp_checksum(p)
556         elif ip.proto == IP_PROTOS.udp:
557             p = (ip / UDP(buffer.getvalue()))
558         return p
559
560     def reass_frags_and_verify_ip6(self, frags, src, dst):
561         """
562         Reassemble and verify fragmented packet
563
564         :param frags: Captured fragments
565         :param src: Source IPv6 address to verify
566         :param dst: Destination IPv6 address to verify
567
568         :returns: Reassembled IPv6 packet
569         """
570         buffer = StringIO.StringIO()
571         for p in frags:
572             self.assertEqual(p[IPv6].src, src)
573             self.assertEqual(p[IPv6].dst, dst)
574             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575             buffer.write(p[IPv6ExtHdrFragment].payload)
576         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577                   nh=frags[0][IPv6ExtHdrFragment].nh)
578         if ip.nh == IP_PROTOS.tcp:
579             p = (ip / TCP(buffer.getvalue()))
580             self.check_tcp_checksum(p)
581         elif ip.nh == IP_PROTOS.udp:
582             p = (ip / UDP(buffer.getvalue()))
583         return p
584
585     def verify_ipfix_nat44_ses(self, data):
586         """
587         Verify IPFIX NAT44 session create/delete event
588
589         :param data: Decoded IPFIX data records
590         """
591         nat44_ses_create_num = 0
592         nat44_ses_delete_num = 0
593         self.assertEqual(6, len(data))
594         for record in data:
595             # natEvent
596             self.assertIn(ord(record[230]), [4, 5])
597             if ord(record[230]) == 4:
598                 nat44_ses_create_num += 1
599             else:
600                 nat44_ses_delete_num += 1
601             # sourceIPv4Address
602             self.assertEqual(self.pg0.remote_ip4n, record[8])
603             # postNATSourceIPv4Address
604             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
605                              record[225])
606             # ingressVRFID
607             self.assertEqual(struct.pack("!I", 0), record[234])
608             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609             if IP_PROTOS.icmp == ord(record[4]):
610                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
612                                  record[227])
613             elif IP_PROTOS.tcp == ord(record[4]):
614                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
615                                  record[7])
616                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
617                                  record[227])
618             elif IP_PROTOS.udp == ord(record[4]):
619                 self.assertEqual(struct.pack("!H", self.udp_port_in),
620                                  record[7])
621                 self.assertEqual(struct.pack("!H", self.udp_port_out),
622                                  record[227])
623             else:
624                 self.fail("Invalid protocol")
625         self.assertEqual(3, nat44_ses_create_num)
626         self.assertEqual(3, nat44_ses_delete_num)
627
628     def verify_ipfix_addr_exhausted(self, data):
629         """
630         Verify IPFIX NAT addresses event
631
632         :param data: Decoded IPFIX data records
633         """
634         self.assertEqual(1, len(data))
635         record = data[0]
636         # natEvent
637         self.assertEqual(ord(record[230]), 3)
638         # natPoolID
639         self.assertEqual(struct.pack("!I", 0), record[283])
640
641
642 class TestNAT44(MethodHolder):
643     """ NAT44 Test Cases """
644
645     @classmethod
646     def setUpClass(cls):
647         super(TestNAT44, cls).setUpClass()
648
649         try:
650             cls.tcp_port_in = 6303
651             cls.tcp_port_out = 6303
652             cls.udp_port_in = 6304
653             cls.udp_port_out = 6304
654             cls.icmp_id_in = 6305
655             cls.icmp_id_out = 6305
656             cls.nat_addr = '10.0.0.3'
657             cls.ipfix_src_port = 4739
658             cls.ipfix_domain_id = 1
659
660             cls.create_pg_interfaces(range(10))
661             cls.interfaces = list(cls.pg_interfaces[0:4])
662
663             for i in cls.interfaces:
664                 i.admin_up()
665                 i.config_ip4()
666                 i.resolve_arp()
667
668             cls.pg0.generate_remote_hosts(3)
669             cls.pg0.configure_ipv4_neighbors()
670
671             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672             cls.vapi.ip_table_add_del(10, is_add=1)
673             cls.vapi.ip_table_add_del(20, is_add=1)
674
675             cls.pg4._local_ip4 = "172.16.255.1"
676             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678             cls.pg4.set_table_ip4(10)
679             cls.pg5._local_ip4 = "172.17.255.3"
680             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682             cls.pg5.set_table_ip4(10)
683             cls.pg6._local_ip4 = "172.16.255.1"
684             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686             cls.pg6.set_table_ip4(20)
687             for i in cls.overlapping_interfaces:
688                 i.config_ip4()
689                 i.admin_up()
690                 i.resolve_arp()
691
692             cls.pg7.admin_up()
693             cls.pg8.admin_up()
694
695             cls.pg9.generate_remote_hosts(2)
696             cls.pg9.config_ip4()
697             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
699                                                   ip_addr_n,
700                                                   24)
701             cls.pg9.admin_up()
702             cls.pg9.resolve_arp()
703             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705             cls.pg9.resolve_arp()
706
707             random.seed()
708
709         except Exception:
710             super(TestNAT44, cls).tearDownClass()
711             raise
712
713     def clear_nat44(self):
714         """
715         Clear NAT44 configuration.
716         """
717         # I found no elegant way to do this
718         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
719                                    dst_address_length=32,
720                                    next_hop_address=self.pg7.remote_ip4n,
721                                    next_hop_sw_if_index=self.pg7.sw_if_index,
722                                    is_add=0)
723         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
724                                    dst_address_length=32,
725                                    next_hop_address=self.pg8.remote_ip4n,
726                                    next_hop_sw_if_index=self.pg8.sw_if_index,
727                                    is_add=0)
728
729         for intf in [self.pg7, self.pg8]:
730             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
731             for n in neighbors:
732                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
733                                               n.mac_address,
734                                               n.ip_address,
735                                               is_add=0)
736
737         if self.pg7.has_ip4_config:
738             self.pg7.unconfig_ip4()
739
740         interfaces = self.vapi.nat44_interface_addr_dump()
741         for intf in interfaces:
742             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
743
744         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745                             domain_id=self.ipfix_domain_id)
746         self.ipfix_src_port = 4739
747         self.ipfix_domain_id = 1
748
749         interfaces = self.vapi.nat44_interface_dump()
750         for intf in interfaces:
751             if intf.is_inside > 1:
752                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
753                                                           0,
754                                                           is_add=0)
755             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
756                                                       intf.is_inside,
757                                                       is_add=0)
758
759         interfaces = self.vapi.nat44_interface_output_feature_dump()
760         for intf in interfaces:
761             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
762                                                              intf.is_inside,
763                                                              is_add=0)
764
765         static_mappings = self.vapi.nat44_static_mapping_dump()
766         for sm in static_mappings:
767             self.vapi.nat44_add_del_static_mapping(
768                 sm.local_ip_address,
769                 sm.external_ip_address,
770                 local_port=sm.local_port,
771                 external_port=sm.external_port,
772                 addr_only=sm.addr_only,
773                 vrf_id=sm.vrf_id,
774                 protocol=sm.protocol,
775                 is_add=0)
776
777         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
778         for lb_sm in lb_static_mappings:
779             self.vapi.nat44_add_del_lb_static_mapping(
780                 lb_sm.external_addr,
781                 lb_sm.external_port,
782                 lb_sm.protocol,
783                 lb_sm.vrf_id,
784                 is_add=0,
785                 local_num=0,
786                 locals=[])
787
788         adresses = self.vapi.nat44_address_dump()
789         for addr in adresses:
790             self.vapi.nat44_add_del_address_range(addr.ip_address,
791                                                   addr.ip_address,
792                                                   is_add=0)
793
794         self.vapi.nat_set_reass()
795         self.vapi.nat_set_reass(is_ip6=1)
796
797     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
798                                  local_port=0, external_port=0, vrf_id=0,
799                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
800                                  proto=0):
801         """
802         Add/delete NAT44 static mapping
803
804         :param local_ip: Local IP address
805         :param external_ip: External IP address
806         :param local_port: Local port number (Optional)
807         :param external_port: External port number (Optional)
808         :param vrf_id: VRF ID (Default 0)
809         :param is_add: 1 if add, 0 if delete (Default add)
810         :param external_sw_if_index: External interface instead of IP address
811         :param proto: IP protocol (Mandatory if port specified)
812         """
813         addr_only = 1
814         if local_port and external_port:
815             addr_only = 0
816         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
817         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
818         self.vapi.nat44_add_del_static_mapping(
819             l_ip,
820             e_ip,
821             external_sw_if_index,
822             local_port,
823             external_port,
824             addr_only,
825             vrf_id,
826             proto,
827             is_add)
828
829     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
830         """
831         Add/delete NAT44 address
832
833         :param ip: IP address
834         :param is_add: 1 if add, 0 if delete (Default add)
835         """
836         nat_addr = socket.inet_pton(socket.AF_INET, ip)
837         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
838                                               vrf_id=vrf_id)
839
840     def test_dynamic(self):
841         """ NAT44 dynamic translation test """
842
843         self.nat44_add_address(self.nat_addr)
844         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
845         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
846                                                   is_inside=0)
847
848         # in2out
849         pkts = self.create_stream_in(self.pg0, self.pg1)
850         self.pg0.add_stream(pkts)
851         self.pg_enable_capture(self.pg_interfaces)
852         self.pg_start()
853         capture = self.pg1.get_capture(len(pkts))
854         self.verify_capture_out(capture)
855
856         # out2in
857         pkts = self.create_stream_out(self.pg1)
858         self.pg1.add_stream(pkts)
859         self.pg_enable_capture(self.pg_interfaces)
860         self.pg_start()
861         capture = self.pg0.get_capture(len(pkts))
862         self.verify_capture_in(capture, self.pg0)
863
864     def test_dynamic_icmp_errors_in2out_ttl_1(self):
865         """ NAT44 handling of client packets with TTL=1 """
866
867         self.nat44_add_address(self.nat_addr)
868         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
869         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
870                                                   is_inside=0)
871
872         # Client side - generate traffic
873         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
874         self.pg0.add_stream(pkts)
875         self.pg_enable_capture(self.pg_interfaces)
876         self.pg_start()
877
878         # Client side - verify ICMP type 11 packets
879         capture = self.pg0.get_capture(len(pkts))
880         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
881
882     def test_dynamic_icmp_errors_out2in_ttl_1(self):
883         """ NAT44 handling of server packets with TTL=1 """
884
885         self.nat44_add_address(self.nat_addr)
886         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
888                                                   is_inside=0)
889
890         # Client side - create sessions
891         pkts = self.create_stream_in(self.pg0, self.pg1)
892         self.pg0.add_stream(pkts)
893         self.pg_enable_capture(self.pg_interfaces)
894         self.pg_start()
895
896         # Server side - generate traffic
897         capture = self.pg1.get_capture(len(pkts))
898         self.verify_capture_out(capture)
899         pkts = self.create_stream_out(self.pg1, ttl=1)
900         self.pg1.add_stream(pkts)
901         self.pg_enable_capture(self.pg_interfaces)
902         self.pg_start()
903
904         # Server side - verify ICMP type 11 packets
905         capture = self.pg1.get_capture(len(pkts))
906         self.verify_capture_out_with_icmp_errors(capture,
907                                                  src_ip=self.pg1.local_ip4)
908
909     def test_dynamic_icmp_errors_in2out_ttl_2(self):
910         """ NAT44 handling of error responses to client packets with TTL=2 """
911
912         self.nat44_add_address(self.nat_addr)
913         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
914         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
915                                                   is_inside=0)
916
917         # Client side - generate traffic
918         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
919         self.pg0.add_stream(pkts)
920         self.pg_enable_capture(self.pg_interfaces)
921         self.pg_start()
922
923         # Server side - simulate ICMP type 11 response
924         capture = self.pg1.get_capture(len(pkts))
925         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
926                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
927                 ICMP(type=11) / packet[IP] for packet in capture]
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931
932         # Client side - verify ICMP type 11 packets
933         capture = self.pg0.get_capture(len(pkts))
934         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
935
936     def test_dynamic_icmp_errors_out2in_ttl_2(self):
937         """ NAT44 handling of error responses to server packets with TTL=2 """
938
939         self.nat44_add_address(self.nat_addr)
940         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
941         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
942                                                   is_inside=0)
943
944         # Client side - create sessions
945         pkts = self.create_stream_in(self.pg0, self.pg1)
946         self.pg0.add_stream(pkts)
947         self.pg_enable_capture(self.pg_interfaces)
948         self.pg_start()
949
950         # Server side - generate traffic
951         capture = self.pg1.get_capture(len(pkts))
952         self.verify_capture_out(capture)
953         pkts = self.create_stream_out(self.pg1, ttl=2)
954         self.pg1.add_stream(pkts)
955         self.pg_enable_capture(self.pg_interfaces)
956         self.pg_start()
957
958         # Client side - simulate ICMP type 11 response
959         capture = self.pg0.get_capture(len(pkts))
960         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
961                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
962                 ICMP(type=11) / packet[IP] for packet in capture]
963         self.pg0.add_stream(pkts)
964         self.pg_enable_capture(self.pg_interfaces)
965         self.pg_start()
966
967         # Server side - verify ICMP type 11 packets
968         capture = self.pg1.get_capture(len(pkts))
969         self.verify_capture_out_with_icmp_errors(capture)
970
971     def test_ping_out_interface_from_outside(self):
972         """ Ping NAT44 out interface from outside network """
973
974         self.nat44_add_address(self.nat_addr)
975         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
976         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
977                                                   is_inside=0)
978
979         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
980              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
981              ICMP(id=self.icmp_id_out, type='echo-request'))
982         pkts = [p]
983         self.pg1.add_stream(pkts)
984         self.pg_enable_capture(self.pg_interfaces)
985         self.pg_start()
986         capture = self.pg1.get_capture(len(pkts))
987         self.assertEqual(1, len(capture))
988         packet = capture[0]
989         try:
990             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
991             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
992             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
993             self.assertEqual(packet[ICMP].type, 0)  # echo reply
994         except:
995             self.logger.error(ppp("Unexpected or invalid packet "
996                                   "(outside network):", packet))
997             raise
998
999     def test_ping_internal_host_from_outside(self):
1000         """ Ping internal host from outside network """
1001
1002         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1003         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1004         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1005                                                   is_inside=0)
1006
1007         # out2in
1008         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1009                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1010                ICMP(id=self.icmp_id_out, type='echo-request'))
1011         self.pg1.add_stream(pkt)
1012         self.pg_enable_capture(self.pg_interfaces)
1013         self.pg_start()
1014         capture = self.pg0.get_capture(1)
1015         self.verify_capture_in(capture, self.pg0, packet_num=1)
1016         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1017
1018         # in2out
1019         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1020                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1021                ICMP(id=self.icmp_id_in, type='echo-reply'))
1022         self.pg0.add_stream(pkt)
1023         self.pg_enable_capture(self.pg_interfaces)
1024         self.pg_start()
1025         capture = self.pg1.get_capture(1)
1026         self.verify_capture_out(capture, same_port=True, packet_num=1)
1027         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1028
1029     def test_static_in(self):
1030         """ 1:1 NAT initialized from inside network """
1031
1032         nat_ip = "10.0.0.10"
1033         self.tcp_port_out = 6303
1034         self.udp_port_out = 6304
1035         self.icmp_id_out = 6305
1036
1037         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1038         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1039         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1040                                                   is_inside=0)
1041
1042         # in2out
1043         pkts = self.create_stream_in(self.pg0, self.pg1)
1044         self.pg0.add_stream(pkts)
1045         self.pg_enable_capture(self.pg_interfaces)
1046         self.pg_start()
1047         capture = self.pg1.get_capture(len(pkts))
1048         self.verify_capture_out(capture, nat_ip, True)
1049
1050         # out2in
1051         pkts = self.create_stream_out(self.pg1, nat_ip)
1052         self.pg1.add_stream(pkts)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg0.get_capture(len(pkts))
1056         self.verify_capture_in(capture, self.pg0)
1057
1058     def test_static_out(self):
1059         """ 1:1 NAT initialized from outside network """
1060
1061         nat_ip = "10.0.0.20"
1062         self.tcp_port_out = 6303
1063         self.udp_port_out = 6304
1064         self.icmp_id_out = 6305
1065
1066         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1067         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1068         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1069                                                   is_inside=0)
1070
1071         # out2in
1072         pkts = self.create_stream_out(self.pg1, nat_ip)
1073         self.pg1.add_stream(pkts)
1074         self.pg_enable_capture(self.pg_interfaces)
1075         self.pg_start()
1076         capture = self.pg0.get_capture(len(pkts))
1077         self.verify_capture_in(capture, self.pg0)
1078
1079         # in2out
1080         pkts = self.create_stream_in(self.pg0, self.pg1)
1081         self.pg0.add_stream(pkts)
1082         self.pg_enable_capture(self.pg_interfaces)
1083         self.pg_start()
1084         capture = self.pg1.get_capture(len(pkts))
1085         self.verify_capture_out(capture, nat_ip, True)
1086
1087     def test_static_with_port_in(self):
1088         """ 1:1 NAPT initialized from inside network """
1089
1090         self.tcp_port_out = 3606
1091         self.udp_port_out = 3607
1092         self.icmp_id_out = 3608
1093
1094         self.nat44_add_address(self.nat_addr)
1095         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1096                                       self.tcp_port_in, self.tcp_port_out,
1097                                       proto=IP_PROTOS.tcp)
1098         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1099                                       self.udp_port_in, self.udp_port_out,
1100                                       proto=IP_PROTOS.udp)
1101         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1102                                       self.icmp_id_in, self.icmp_id_out,
1103                                       proto=IP_PROTOS.icmp)
1104         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1105         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1106                                                   is_inside=0)
1107
1108         # in2out
1109         pkts = self.create_stream_in(self.pg0, self.pg1)
1110         self.pg0.add_stream(pkts)
1111         self.pg_enable_capture(self.pg_interfaces)
1112         self.pg_start()
1113         capture = self.pg1.get_capture(len(pkts))
1114         self.verify_capture_out(capture)
1115
1116         # out2in
1117         pkts = self.create_stream_out(self.pg1)
1118         self.pg1.add_stream(pkts)
1119         self.pg_enable_capture(self.pg_interfaces)
1120         self.pg_start()
1121         capture = self.pg0.get_capture(len(pkts))
1122         self.verify_capture_in(capture, self.pg0)
1123
1124     def test_static_with_port_out(self):
1125         """ 1:1 NAPT initialized from outside network """
1126
1127         self.tcp_port_out = 30606
1128         self.udp_port_out = 30607
1129         self.icmp_id_out = 30608
1130
1131         self.nat44_add_address(self.nat_addr)
1132         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1133                                       self.tcp_port_in, self.tcp_port_out,
1134                                       proto=IP_PROTOS.tcp)
1135         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1136                                       self.udp_port_in, self.udp_port_out,
1137                                       proto=IP_PROTOS.udp)
1138         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1139                                       self.icmp_id_in, self.icmp_id_out,
1140                                       proto=IP_PROTOS.icmp)
1141         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1142         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143                                                   is_inside=0)
1144
1145         # out2in
1146         pkts = self.create_stream_out(self.pg1)
1147         self.pg1.add_stream(pkts)
1148         self.pg_enable_capture(self.pg_interfaces)
1149         self.pg_start()
1150         capture = self.pg0.get_capture(len(pkts))
1151         self.verify_capture_in(capture, self.pg0)
1152
1153         # in2out
1154         pkts = self.create_stream_in(self.pg0, self.pg1)
1155         self.pg0.add_stream(pkts)
1156         self.pg_enable_capture(self.pg_interfaces)
1157         self.pg_start()
1158         capture = self.pg1.get_capture(len(pkts))
1159         self.verify_capture_out(capture)
1160
1161     def test_static_vrf_aware(self):
1162         """ 1:1 NAT VRF awareness """
1163
1164         nat_ip1 = "10.0.0.30"
1165         nat_ip2 = "10.0.0.40"
1166         self.tcp_port_out = 6303
1167         self.udp_port_out = 6304
1168         self.icmp_id_out = 6305
1169
1170         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1171                                       vrf_id=10)
1172         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1173                                       vrf_id=10)
1174         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1175                                                   is_inside=0)
1176         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1177         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1178
1179         # inside interface VRF match NAT44 static mapping VRF
1180         pkts = self.create_stream_in(self.pg4, self.pg3)
1181         self.pg4.add_stream(pkts)
1182         self.pg_enable_capture(self.pg_interfaces)
1183         self.pg_start()
1184         capture = self.pg3.get_capture(len(pkts))
1185         self.verify_capture_out(capture, nat_ip1, True)
1186
1187         # inside interface VRF don't match NAT44 static mapping VRF (packets
1188         # are dropped)
1189         pkts = self.create_stream_in(self.pg0, self.pg3)
1190         self.pg0.add_stream(pkts)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193         self.pg3.assert_nothing_captured()
1194
1195     def test_static_lb(self):
1196         """ NAT44 local service load balancing """
1197         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1198         external_port = 80
1199         local_port = 8080
1200         server1 = self.pg0.remote_hosts[0]
1201         server2 = self.pg0.remote_hosts[1]
1202
1203         locals = [{'addr': server1.ip4n,
1204                    'port': local_port,
1205                    'probability': 70},
1206                   {'addr': server2.ip4n,
1207                    'port': local_port,
1208                    'probability': 30}]
1209
1210         self.nat44_add_address(self.nat_addr)
1211         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1212                                                   external_port,
1213                                                   IP_PROTOS.tcp,
1214                                                   local_num=len(locals),
1215                                                   locals=locals)
1216         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1217         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1218                                                   is_inside=0)
1219
1220         # from client to service
1221         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1223              TCP(sport=12345, dport=external_port))
1224         self.pg1.add_stream(p)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227         capture = self.pg0.get_capture(1)
1228         p = capture[0]
1229         server = None
1230         try:
1231             ip = p[IP]
1232             tcp = p[TCP]
1233             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1234             if ip.dst == server1.ip4:
1235                 server = server1
1236             else:
1237                 server = server2
1238             self.assertEqual(tcp.dport, local_port)
1239             self.check_tcp_checksum(p)
1240             self.check_ip_checksum(p)
1241         except:
1242             self.logger.error(ppp("Unexpected or invalid packet:", p))
1243             raise
1244
1245         # from service back to client
1246         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1247              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1248              TCP(sport=local_port, dport=12345))
1249         self.pg0.add_stream(p)
1250         self.pg_enable_capture(self.pg_interfaces)
1251         self.pg_start()
1252         capture = self.pg1.get_capture(1)
1253         p = capture[0]
1254         try:
1255             ip = p[IP]
1256             tcp = p[TCP]
1257             self.assertEqual(ip.src, self.nat_addr)
1258             self.assertEqual(tcp.sport, external_port)
1259             self.check_tcp_checksum(p)
1260             self.check_ip_checksum(p)
1261         except:
1262             self.logger.error(ppp("Unexpected or invalid packet:", p))
1263             raise
1264
1265         # multiple clients
1266         server1_n = 0
1267         server2_n = 0
1268         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1269         pkts = []
1270         for client in clients:
1271             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1272                  IP(src=client, dst=self.nat_addr) /
1273                  TCP(sport=12345, dport=external_port))
1274             pkts.append(p)
1275         self.pg1.add_stream(pkts)
1276         self.pg_enable_capture(self.pg_interfaces)
1277         self.pg_start()
1278         capture = self.pg0.get_capture(len(pkts))
1279         for p in capture:
1280             if p[IP].dst == server1.ip4:
1281                 server1_n += 1
1282             else:
1283                 server2_n += 1
1284         self.assertTrue(server1_n > server2_n)
1285
1286     def test_multiple_inside_interfaces(self):
1287         """ NAT44 multiple non-overlapping address space inside interfaces """
1288
1289         self.nat44_add_address(self.nat_addr)
1290         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1291         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1292         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1293                                                   is_inside=0)
1294
1295         # between two NAT44 inside interfaces (no translation)
1296         pkts = self.create_stream_in(self.pg0, self.pg1)
1297         self.pg0.add_stream(pkts)
1298         self.pg_enable_capture(self.pg_interfaces)
1299         self.pg_start()
1300         capture = self.pg1.get_capture(len(pkts))
1301         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1302
1303         # from NAT44 inside to interface without NAT44 feature (no translation)
1304         pkts = self.create_stream_in(self.pg0, self.pg2)
1305         self.pg0.add_stream(pkts)
1306         self.pg_enable_capture(self.pg_interfaces)
1307         self.pg_start()
1308         capture = self.pg2.get_capture(len(pkts))
1309         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1310
1311         # in2out 1st interface
1312         pkts = self.create_stream_in(self.pg0, self.pg3)
1313         self.pg0.add_stream(pkts)
1314         self.pg_enable_capture(self.pg_interfaces)
1315         self.pg_start()
1316         capture = self.pg3.get_capture(len(pkts))
1317         self.verify_capture_out(capture)
1318
1319         # out2in 1st interface
1320         pkts = self.create_stream_out(self.pg3)
1321         self.pg3.add_stream(pkts)
1322         self.pg_enable_capture(self.pg_interfaces)
1323         self.pg_start()
1324         capture = self.pg0.get_capture(len(pkts))
1325         self.verify_capture_in(capture, self.pg0)
1326
1327         # in2out 2nd interface
1328         pkts = self.create_stream_in(self.pg1, self.pg3)
1329         self.pg1.add_stream(pkts)
1330         self.pg_enable_capture(self.pg_interfaces)
1331         self.pg_start()
1332         capture = self.pg3.get_capture(len(pkts))
1333         self.verify_capture_out(capture)
1334
1335         # out2in 2nd interface
1336         pkts = self.create_stream_out(self.pg3)
1337         self.pg3.add_stream(pkts)
1338         self.pg_enable_capture(self.pg_interfaces)
1339         self.pg_start()
1340         capture = self.pg1.get_capture(len(pkts))
1341         self.verify_capture_in(capture, self.pg1)
1342
1343     def test_inside_overlapping_interfaces(self):
1344         """ NAT44 multiple inside interfaces with overlapping address space """
1345
1346         static_nat_ip = "10.0.0.10"
1347         self.nat44_add_address(self.nat_addr)
1348         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1349                                                   is_inside=0)
1350         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1351         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1352         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1353         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1354                                       vrf_id=20)
1355
1356         # between NAT44 inside interfaces with same VRF (no translation)
1357         pkts = self.create_stream_in(self.pg4, self.pg5)
1358         self.pg4.add_stream(pkts)
1359         self.pg_enable_capture(self.pg_interfaces)
1360         self.pg_start()
1361         capture = self.pg5.get_capture(len(pkts))
1362         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1363
1364         # between NAT44 inside interfaces with different VRF (hairpinning)
1365         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1366              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1367              TCP(sport=1234, dport=5678))
1368         self.pg4.add_stream(p)
1369         self.pg_enable_capture(self.pg_interfaces)
1370         self.pg_start()
1371         capture = self.pg6.get_capture(1)
1372         p = capture[0]
1373         try:
1374             ip = p[IP]
1375             tcp = p[TCP]
1376             self.assertEqual(ip.src, self.nat_addr)
1377             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1378             self.assertNotEqual(tcp.sport, 1234)
1379             self.assertEqual(tcp.dport, 5678)
1380         except:
1381             self.logger.error(ppp("Unexpected or invalid packet:", p))
1382             raise
1383
1384         # in2out 1st interface
1385         pkts = self.create_stream_in(self.pg4, self.pg3)
1386         self.pg4.add_stream(pkts)
1387         self.pg_enable_capture(self.pg_interfaces)
1388         self.pg_start()
1389         capture = self.pg3.get_capture(len(pkts))
1390         self.verify_capture_out(capture)
1391
1392         # out2in 1st interface
1393         pkts = self.create_stream_out(self.pg3)
1394         self.pg3.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg4.get_capture(len(pkts))
1398         self.verify_capture_in(capture, self.pg4)
1399
1400         # in2out 2nd interface
1401         pkts = self.create_stream_in(self.pg5, self.pg3)
1402         self.pg5.add_stream(pkts)
1403         self.pg_enable_capture(self.pg_interfaces)
1404         self.pg_start()
1405         capture = self.pg3.get_capture(len(pkts))
1406         self.verify_capture_out(capture)
1407
1408         # out2in 2nd interface
1409         pkts = self.create_stream_out(self.pg3)
1410         self.pg3.add_stream(pkts)
1411         self.pg_enable_capture(self.pg_interfaces)
1412         self.pg_start()
1413         capture = self.pg5.get_capture(len(pkts))
1414         self.verify_capture_in(capture, self.pg5)
1415
1416         # pg5 session dump
1417         addresses = self.vapi.nat44_address_dump()
1418         self.assertEqual(len(addresses), 1)
1419         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1420         self.assertEqual(len(sessions), 3)
1421         for session in sessions:
1422             self.assertFalse(session.is_static)
1423             self.assertEqual(session.inside_ip_address[0:4],
1424                              self.pg5.remote_ip4n)
1425             self.assertEqual(session.outside_ip_address,
1426                              addresses[0].ip_address)
1427         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1428         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1429         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1430         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1431         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1432         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1433         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1434         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1435         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1436
1437         # in2out 3rd interface
1438         pkts = self.create_stream_in(self.pg6, self.pg3)
1439         self.pg6.add_stream(pkts)
1440         self.pg_enable_capture(self.pg_interfaces)
1441         self.pg_start()
1442         capture = self.pg3.get_capture(len(pkts))
1443         self.verify_capture_out(capture, static_nat_ip, True)
1444
1445         # out2in 3rd interface
1446         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1447         self.pg3.add_stream(pkts)
1448         self.pg_enable_capture(self.pg_interfaces)
1449         self.pg_start()
1450         capture = self.pg6.get_capture(len(pkts))
1451         self.verify_capture_in(capture, self.pg6)
1452
1453         # general user and session dump verifications
1454         users = self.vapi.nat44_user_dump()
1455         self.assertTrue(len(users) >= 3)
1456         addresses = self.vapi.nat44_address_dump()
1457         self.assertEqual(len(addresses), 1)
1458         for user in users:
1459             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1460                                                          user.vrf_id)
1461             for session in sessions:
1462                 self.assertEqual(user.ip_address, session.inside_ip_address)
1463                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1464                 self.assertTrue(session.protocol in
1465                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1466                                  IP_PROTOS.icmp])
1467
1468         # pg4 session dump
1469         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1470         self.assertTrue(len(sessions) >= 4)
1471         for session in sessions:
1472             self.assertFalse(session.is_static)
1473             self.assertEqual(session.inside_ip_address[0:4],
1474                              self.pg4.remote_ip4n)
1475             self.assertEqual(session.outside_ip_address,
1476                              addresses[0].ip_address)
1477
1478         # pg6 session dump
1479         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1480         self.assertTrue(len(sessions) >= 3)
1481         for session in sessions:
1482             self.assertTrue(session.is_static)
1483             self.assertEqual(session.inside_ip_address[0:4],
1484                              self.pg6.remote_ip4n)
1485             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1486                              map(int, static_nat_ip.split('.')))
1487             self.assertTrue(session.inside_port in
1488                             [self.tcp_port_in, self.udp_port_in,
1489                              self.icmp_id_in])
1490
1491     def test_hairpinning(self):
1492         """ NAT44 hairpinning - 1:1 NAPT """
1493
1494         host = self.pg0.remote_hosts[0]
1495         server = self.pg0.remote_hosts[1]
1496         host_in_port = 1234
1497         host_out_port = 0
1498         server_in_port = 5678
1499         server_out_port = 8765
1500
1501         self.nat44_add_address(self.nat_addr)
1502         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1503         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1504                                                   is_inside=0)
1505         # add static mapping for server
1506         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1507                                       server_in_port, server_out_port,
1508                                       proto=IP_PROTOS.tcp)
1509
1510         # send packet from host to server
1511         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1512              IP(src=host.ip4, dst=self.nat_addr) /
1513              TCP(sport=host_in_port, dport=server_out_port))
1514         self.pg0.add_stream(p)
1515         self.pg_enable_capture(self.pg_interfaces)
1516         self.pg_start()
1517         capture = self.pg0.get_capture(1)
1518         p = capture[0]
1519         try:
1520             ip = p[IP]
1521             tcp = p[TCP]
1522             self.assertEqual(ip.src, self.nat_addr)
1523             self.assertEqual(ip.dst, server.ip4)
1524             self.assertNotEqual(tcp.sport, host_in_port)
1525             self.assertEqual(tcp.dport, server_in_port)
1526             self.check_tcp_checksum(p)
1527             host_out_port = tcp.sport
1528         except:
1529             self.logger.error(ppp("Unexpected or invalid packet:", p))
1530             raise
1531
1532         # send reply from server to host
1533         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1534              IP(src=server.ip4, dst=self.nat_addr) /
1535              TCP(sport=server_in_port, dport=host_out_port))
1536         self.pg0.add_stream(p)
1537         self.pg_enable_capture(self.pg_interfaces)
1538         self.pg_start()
1539         capture = self.pg0.get_capture(1)
1540         p = capture[0]
1541         try:
1542             ip = p[IP]
1543             tcp = p[TCP]
1544             self.assertEqual(ip.src, self.nat_addr)
1545             self.assertEqual(ip.dst, host.ip4)
1546             self.assertEqual(tcp.sport, server_out_port)
1547             self.assertEqual(tcp.dport, host_in_port)
1548             self.check_tcp_checksum(p)
1549         except:
1550             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1551             raise
1552
1553     def test_hairpinning2(self):
1554         """ NAT44 hairpinning - 1:1 NAT"""
1555
1556         server1_nat_ip = "10.0.0.10"
1557         server2_nat_ip = "10.0.0.11"
1558         host = self.pg0.remote_hosts[0]
1559         server1 = self.pg0.remote_hosts[1]
1560         server2 = self.pg0.remote_hosts[2]
1561         server_tcp_port = 22
1562         server_udp_port = 20
1563
1564         self.nat44_add_address(self.nat_addr)
1565         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1566         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1567                                                   is_inside=0)
1568
1569         # add static mapping for servers
1570         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1571         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1572
1573         # host to server1
1574         pkts = []
1575         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1576              IP(src=host.ip4, dst=server1_nat_ip) /
1577              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1578         pkts.append(p)
1579         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1580              IP(src=host.ip4, dst=server1_nat_ip) /
1581              UDP(sport=self.udp_port_in, dport=server_udp_port))
1582         pkts.append(p)
1583         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1584              IP(src=host.ip4, dst=server1_nat_ip) /
1585              ICMP(id=self.icmp_id_in, type='echo-request'))
1586         pkts.append(p)
1587         self.pg0.add_stream(pkts)
1588         self.pg_enable_capture(self.pg_interfaces)
1589         self.pg_start()
1590         capture = self.pg0.get_capture(len(pkts))
1591         for packet in capture:
1592             try:
1593                 self.assertEqual(packet[IP].src, self.nat_addr)
1594                 self.assertEqual(packet[IP].dst, server1.ip4)
1595                 if packet.haslayer(TCP):
1596                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1597                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1598                     self.tcp_port_out = packet[TCP].sport
1599                     self.check_tcp_checksum(packet)
1600                 elif packet.haslayer(UDP):
1601                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1602                     self.assertEqual(packet[UDP].dport, server_udp_port)
1603                     self.udp_port_out = packet[UDP].sport
1604                 else:
1605                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1606                     self.icmp_id_out = packet[ICMP].id
1607             except:
1608                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1609                 raise
1610
1611         # server1 to host
1612         pkts = []
1613         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1614              IP(src=server1.ip4, dst=self.nat_addr) /
1615              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1616         pkts.append(p)
1617         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1618              IP(src=server1.ip4, dst=self.nat_addr) /
1619              UDP(sport=server_udp_port, dport=self.udp_port_out))
1620         pkts.append(p)
1621         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1622              IP(src=server1.ip4, dst=self.nat_addr) /
1623              ICMP(id=self.icmp_id_out, type='echo-reply'))
1624         pkts.append(p)
1625         self.pg0.add_stream(pkts)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg0.get_capture(len(pkts))
1629         for packet in capture:
1630             try:
1631                 self.assertEqual(packet[IP].src, server1_nat_ip)
1632                 self.assertEqual(packet[IP].dst, host.ip4)
1633                 if packet.haslayer(TCP):
1634                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1635                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1636                     self.check_tcp_checksum(packet)
1637                 elif packet.haslayer(UDP):
1638                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1639                     self.assertEqual(packet[UDP].sport, server_udp_port)
1640                 else:
1641                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1642             except:
1643                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1644                 raise
1645
1646         # server2 to server1
1647         pkts = []
1648         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1649              IP(src=server2.ip4, dst=server1_nat_ip) /
1650              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1651         pkts.append(p)
1652         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1653              IP(src=server2.ip4, dst=server1_nat_ip) /
1654              UDP(sport=self.udp_port_in, dport=server_udp_port))
1655         pkts.append(p)
1656         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1657              IP(src=server2.ip4, dst=server1_nat_ip) /
1658              ICMP(id=self.icmp_id_in, type='echo-request'))
1659         pkts.append(p)
1660         self.pg0.add_stream(pkts)
1661         self.pg_enable_capture(self.pg_interfaces)
1662         self.pg_start()
1663         capture = self.pg0.get_capture(len(pkts))
1664         for packet in capture:
1665             try:
1666                 self.assertEqual(packet[IP].src, server2_nat_ip)
1667                 self.assertEqual(packet[IP].dst, server1.ip4)
1668                 if packet.haslayer(TCP):
1669                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1670                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1671                     self.tcp_port_out = packet[TCP].sport
1672                     self.check_tcp_checksum(packet)
1673                 elif packet.haslayer(UDP):
1674                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1675                     self.assertEqual(packet[UDP].dport, server_udp_port)
1676                     self.udp_port_out = packet[UDP].sport
1677                 else:
1678                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1679                     self.icmp_id_out = packet[ICMP].id
1680             except:
1681                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1682                 raise
1683
1684         # server1 to server2
1685         pkts = []
1686         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1687              IP(src=server1.ip4, dst=server2_nat_ip) /
1688              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1689         pkts.append(p)
1690         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1691              IP(src=server1.ip4, dst=server2_nat_ip) /
1692              UDP(sport=server_udp_port, dport=self.udp_port_out))
1693         pkts.append(p)
1694         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695              IP(src=server1.ip4, dst=server2_nat_ip) /
1696              ICMP(id=self.icmp_id_out, type='echo-reply'))
1697         pkts.append(p)
1698         self.pg0.add_stream(pkts)
1699         self.pg_enable_capture(self.pg_interfaces)
1700         self.pg_start()
1701         capture = self.pg0.get_capture(len(pkts))
1702         for packet in capture:
1703             try:
1704                 self.assertEqual(packet[IP].src, server1_nat_ip)
1705                 self.assertEqual(packet[IP].dst, server2.ip4)
1706                 if packet.haslayer(TCP):
1707                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1708                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1709                     self.check_tcp_checksum(packet)
1710                 elif packet.haslayer(UDP):
1711                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1712                     self.assertEqual(packet[UDP].sport, server_udp_port)
1713                 else:
1714                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1715             except:
1716                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1717                 raise
1718
1719     def test_max_translations_per_user(self):
1720         """ MAX translations per user - recycle the least recently used """
1721
1722         self.nat44_add_address(self.nat_addr)
1723         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1724         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1725                                                   is_inside=0)
1726
1727         # get maximum number of translations per user
1728         nat44_config = self.vapi.nat_show_config()
1729
1730         # send more than maximum number of translations per user packets
1731         pkts_num = nat44_config.max_translations_per_user + 5
1732         pkts = []
1733         for port in range(0, pkts_num):
1734             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1735                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1736                  TCP(sport=1025 + port))
1737             pkts.append(p)
1738         self.pg0.add_stream(pkts)
1739         self.pg_enable_capture(self.pg_interfaces)
1740         self.pg_start()
1741
1742         # verify number of translated packet
1743         self.pg1.get_capture(pkts_num)
1744
1745     def test_interface_addr(self):
1746         """ Acquire NAT44 addresses from interface """
1747         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1748
1749         # no address in NAT pool
1750         adresses = self.vapi.nat44_address_dump()
1751         self.assertEqual(0, len(adresses))
1752
1753         # configure interface address and check NAT address pool
1754         self.pg7.config_ip4()
1755         adresses = self.vapi.nat44_address_dump()
1756         self.assertEqual(1, len(adresses))
1757         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1758
1759         # remove interface address and check NAT address pool
1760         self.pg7.unconfig_ip4()
1761         adresses = self.vapi.nat44_address_dump()
1762         self.assertEqual(0, len(adresses))
1763
1764     def test_interface_addr_static_mapping(self):
1765         """ Static mapping with addresses from interface """
1766         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1767         self.nat44_add_static_mapping(
1768             '1.2.3.4',
1769             external_sw_if_index=self.pg7.sw_if_index)
1770
1771         # static mappings with external interface
1772         static_mappings = self.vapi.nat44_static_mapping_dump()
1773         self.assertEqual(1, len(static_mappings))
1774         self.assertEqual(self.pg7.sw_if_index,
1775                          static_mappings[0].external_sw_if_index)
1776
1777         # configure interface address and check static mappings
1778         self.pg7.config_ip4()
1779         static_mappings = self.vapi.nat44_static_mapping_dump()
1780         self.assertEqual(1, len(static_mappings))
1781         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1782                          self.pg7.local_ip4n)
1783         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1784
1785         # remove interface address and check static mappings
1786         self.pg7.unconfig_ip4()
1787         static_mappings = self.vapi.nat44_static_mapping_dump()
1788         self.assertEqual(0, len(static_mappings))
1789
1790     def test_ipfix_nat44_sess(self):
1791         """ IPFIX logging NAT44 session created/delted """
1792         self.ipfix_domain_id = 10
1793         self.ipfix_src_port = 20202
1794         colector_port = 30303
1795         bind_layers(UDP, IPFIX, dport=30303)
1796         self.nat44_add_address(self.nat_addr)
1797         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1798         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1799                                                   is_inside=0)
1800         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1801                                      src_address=self.pg3.local_ip4n,
1802                                      path_mtu=512,
1803                                      template_interval=10,
1804                                      collector_port=colector_port)
1805         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1806                             src_port=self.ipfix_src_port)
1807
1808         pkts = self.create_stream_in(self.pg0, self.pg1)
1809         self.pg0.add_stream(pkts)
1810         self.pg_enable_capture(self.pg_interfaces)
1811         self.pg_start()
1812         capture = self.pg1.get_capture(len(pkts))
1813         self.verify_capture_out(capture)
1814         self.nat44_add_address(self.nat_addr, is_add=0)
1815         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1816         capture = self.pg3.get_capture(3)
1817         ipfix = IPFIXDecoder()
1818         # first load template
1819         for p in capture:
1820             self.assertTrue(p.haslayer(IPFIX))
1821             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1822             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1823             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1824             self.assertEqual(p[UDP].dport, colector_port)
1825             self.assertEqual(p[IPFIX].observationDomainID,
1826                              self.ipfix_domain_id)
1827             if p.haslayer(Template):
1828                 ipfix.add_template(p.getlayer(Template))
1829         # verify events in data set
1830         for p in capture:
1831             if p.haslayer(Data):
1832                 data = ipfix.decode_data_set(p.getlayer(Set))
1833                 self.verify_ipfix_nat44_ses(data)
1834
1835     def test_ipfix_addr_exhausted(self):
1836         """ IPFIX logging NAT addresses exhausted """
1837         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1838         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1839                                                   is_inside=0)
1840         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1841                                      src_address=self.pg3.local_ip4n,
1842                                      path_mtu=512,
1843                                      template_interval=10)
1844         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1845                             src_port=self.ipfix_src_port)
1846
1847         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1848              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1849              TCP(sport=3025))
1850         self.pg0.add_stream(p)
1851         self.pg_enable_capture(self.pg_interfaces)
1852         self.pg_start()
1853         capture = self.pg1.get_capture(0)
1854         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1855         capture = self.pg3.get_capture(3)
1856         ipfix = IPFIXDecoder()
1857         # first load template
1858         for p in capture:
1859             self.assertTrue(p.haslayer(IPFIX))
1860             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1861             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1862             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1863             self.assertEqual(p[UDP].dport, 4739)
1864             self.assertEqual(p[IPFIX].observationDomainID,
1865                              self.ipfix_domain_id)
1866             if p.haslayer(Template):
1867                 ipfix.add_template(p.getlayer(Template))
1868         # verify events in data set
1869         for p in capture:
1870             if p.haslayer(Data):
1871                 data = ipfix.decode_data_set(p.getlayer(Set))
1872                 self.verify_ipfix_addr_exhausted(data)
1873
1874     def test_pool_addr_fib(self):
1875         """ NAT44 add pool addresses to FIB """
1876         static_addr = '10.0.0.10'
1877         self.nat44_add_address(self.nat_addr)
1878         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1880                                                   is_inside=0)
1881         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1882
1883         # NAT44 address
1884         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1885              ARP(op=ARP.who_has, pdst=self.nat_addr,
1886                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1887         self.pg1.add_stream(p)
1888         self.pg_enable_capture(self.pg_interfaces)
1889         self.pg_start()
1890         capture = self.pg1.get_capture(1)
1891         self.assertTrue(capture[0].haslayer(ARP))
1892         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1893
1894         # 1:1 NAT address
1895         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1896              ARP(op=ARP.who_has, pdst=static_addr,
1897                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1898         self.pg1.add_stream(p)
1899         self.pg_enable_capture(self.pg_interfaces)
1900         self.pg_start()
1901         capture = self.pg1.get_capture(1)
1902         self.assertTrue(capture[0].haslayer(ARP))
1903         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1904
1905         # send ARP to non-NAT44 interface
1906         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1907              ARP(op=ARP.who_has, pdst=self.nat_addr,
1908                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1909         self.pg2.add_stream(p)
1910         self.pg_enable_capture(self.pg_interfaces)
1911         self.pg_start()
1912         capture = self.pg1.get_capture(0)
1913
1914         # remove addresses and verify
1915         self.nat44_add_address(self.nat_addr, is_add=0)
1916         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1917                                       is_add=0)
1918
1919         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1920              ARP(op=ARP.who_has, pdst=self.nat_addr,
1921                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1922         self.pg1.add_stream(p)
1923         self.pg_enable_capture(self.pg_interfaces)
1924         self.pg_start()
1925         capture = self.pg1.get_capture(0)
1926
1927         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1928              ARP(op=ARP.who_has, pdst=static_addr,
1929                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1930         self.pg1.add_stream(p)
1931         self.pg_enable_capture(self.pg_interfaces)
1932         self.pg_start()
1933         capture = self.pg1.get_capture(0)
1934
1935     def test_vrf_mode(self):
1936         """ NAT44 tenant VRF aware address pool mode """
1937
1938         vrf_id1 = 1
1939         vrf_id2 = 2
1940         nat_ip1 = "10.0.0.10"
1941         nat_ip2 = "10.0.0.11"
1942
1943         self.pg0.unconfig_ip4()
1944         self.pg1.unconfig_ip4()
1945         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1946         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1947         self.pg0.set_table_ip4(vrf_id1)
1948         self.pg1.set_table_ip4(vrf_id2)
1949         self.pg0.config_ip4()
1950         self.pg1.config_ip4()
1951
1952         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1953         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1954         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1956         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1957                                                   is_inside=0)
1958
1959         # first VRF
1960         pkts = self.create_stream_in(self.pg0, self.pg2)
1961         self.pg0.add_stream(pkts)
1962         self.pg_enable_capture(self.pg_interfaces)
1963         self.pg_start()
1964         capture = self.pg2.get_capture(len(pkts))
1965         self.verify_capture_out(capture, nat_ip1)
1966
1967         # second VRF
1968         pkts = self.create_stream_in(self.pg1, self.pg2)
1969         self.pg1.add_stream(pkts)
1970         self.pg_enable_capture(self.pg_interfaces)
1971         self.pg_start()
1972         capture = self.pg2.get_capture(len(pkts))
1973         self.verify_capture_out(capture, nat_ip2)
1974
1975         self.pg0.unconfig_ip4()
1976         self.pg1.unconfig_ip4()
1977         self.pg0.set_table_ip4(0)
1978         self.pg1.set_table_ip4(0)
1979         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1980         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1981
1982     def test_vrf_feature_independent(self):
1983         """ NAT44 tenant VRF independent address pool mode """
1984
1985         nat_ip1 = "10.0.0.10"
1986         nat_ip2 = "10.0.0.11"
1987
1988         self.nat44_add_address(nat_ip1)
1989         self.nat44_add_address(nat_ip2)
1990         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1991         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1992         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1993                                                   is_inside=0)
1994
1995         # first VRF
1996         pkts = self.create_stream_in(self.pg0, self.pg2)
1997         self.pg0.add_stream(pkts)
1998         self.pg_enable_capture(self.pg_interfaces)
1999         self.pg_start()
2000         capture = self.pg2.get_capture(len(pkts))
2001         self.verify_capture_out(capture, nat_ip1)
2002
2003         # second VRF
2004         pkts = self.create_stream_in(self.pg1, self.pg2)
2005         self.pg1.add_stream(pkts)
2006         self.pg_enable_capture(self.pg_interfaces)
2007         self.pg_start()
2008         capture = self.pg2.get_capture(len(pkts))
2009         self.verify_capture_out(capture, nat_ip1)
2010
2011     def test_dynamic_ipless_interfaces(self):
2012         """ NAT44 interfaces without configured IP address """
2013
2014         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2015                                       mactobinary(self.pg7.remote_mac),
2016                                       self.pg7.remote_ip4n,
2017                                       is_static=1)
2018         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2019                                       mactobinary(self.pg8.remote_mac),
2020                                       self.pg8.remote_ip4n,
2021                                       is_static=1)
2022
2023         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2024                                    dst_address_length=32,
2025                                    next_hop_address=self.pg7.remote_ip4n,
2026                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2027         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2028                                    dst_address_length=32,
2029                                    next_hop_address=self.pg8.remote_ip4n,
2030                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2031
2032         self.nat44_add_address(self.nat_addr)
2033         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2034         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2035                                                   is_inside=0)
2036
2037         # in2out
2038         pkts = self.create_stream_in(self.pg7, self.pg8)
2039         self.pg7.add_stream(pkts)
2040         self.pg_enable_capture(self.pg_interfaces)
2041         self.pg_start()
2042         capture = self.pg8.get_capture(len(pkts))
2043         self.verify_capture_out(capture)
2044
2045         # out2in
2046         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2047         self.pg8.add_stream(pkts)
2048         self.pg_enable_capture(self.pg_interfaces)
2049         self.pg_start()
2050         capture = self.pg7.get_capture(len(pkts))
2051         self.verify_capture_in(capture, self.pg7)
2052
2053     def test_static_ipless_interfaces(self):
2054         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2055
2056         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2057                                       mactobinary(self.pg7.remote_mac),
2058                                       self.pg7.remote_ip4n,
2059                                       is_static=1)
2060         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2061                                       mactobinary(self.pg8.remote_mac),
2062                                       self.pg8.remote_ip4n,
2063                                       is_static=1)
2064
2065         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2066                                    dst_address_length=32,
2067                                    next_hop_address=self.pg7.remote_ip4n,
2068                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2069         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2070                                    dst_address_length=32,
2071                                    next_hop_address=self.pg8.remote_ip4n,
2072                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2073
2074         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2075         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2076         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2077                                                   is_inside=0)
2078
2079         # out2in
2080         pkts = self.create_stream_out(self.pg8)
2081         self.pg8.add_stream(pkts)
2082         self.pg_enable_capture(self.pg_interfaces)
2083         self.pg_start()
2084         capture = self.pg7.get_capture(len(pkts))
2085         self.verify_capture_in(capture, self.pg7)
2086
2087         # in2out
2088         pkts = self.create_stream_in(self.pg7, self.pg8)
2089         self.pg7.add_stream(pkts)
2090         self.pg_enable_capture(self.pg_interfaces)
2091         self.pg_start()
2092         capture = self.pg8.get_capture(len(pkts))
2093         self.verify_capture_out(capture, self.nat_addr, True)
2094
2095     def test_static_with_port_ipless_interfaces(self):
2096         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2097
2098         self.tcp_port_out = 30606
2099         self.udp_port_out = 30607
2100         self.icmp_id_out = 30608
2101
2102         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2103                                       mactobinary(self.pg7.remote_mac),
2104                                       self.pg7.remote_ip4n,
2105                                       is_static=1)
2106         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2107                                       mactobinary(self.pg8.remote_mac),
2108                                       self.pg8.remote_ip4n,
2109                                       is_static=1)
2110
2111         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2112                                    dst_address_length=32,
2113                                    next_hop_address=self.pg7.remote_ip4n,
2114                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2115         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2116                                    dst_address_length=32,
2117                                    next_hop_address=self.pg8.remote_ip4n,
2118                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2119
2120         self.nat44_add_address(self.nat_addr)
2121         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2122                                       self.tcp_port_in, self.tcp_port_out,
2123                                       proto=IP_PROTOS.tcp)
2124         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2125                                       self.udp_port_in, self.udp_port_out,
2126                                       proto=IP_PROTOS.udp)
2127         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2128                                       self.icmp_id_in, self.icmp_id_out,
2129                                       proto=IP_PROTOS.icmp)
2130         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2131         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2132                                                   is_inside=0)
2133
2134         # out2in
2135         pkts = self.create_stream_out(self.pg8)
2136         self.pg8.add_stream(pkts)
2137         self.pg_enable_capture(self.pg_interfaces)
2138         self.pg_start()
2139         capture = self.pg7.get_capture(len(pkts))
2140         self.verify_capture_in(capture, self.pg7)
2141
2142         # in2out
2143         pkts = self.create_stream_in(self.pg7, self.pg8)
2144         self.pg7.add_stream(pkts)
2145         self.pg_enable_capture(self.pg_interfaces)
2146         self.pg_start()
2147         capture = self.pg8.get_capture(len(pkts))
2148         self.verify_capture_out(capture)
2149
2150     def test_static_unknown_proto(self):
2151         """ 1:1 NAT translate packet with unknown protocol """
2152         nat_ip = "10.0.0.10"
2153         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2154         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2155         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2156                                                   is_inside=0)
2157
2158         # in2out
2159         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2160              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2161              GRE() /
2162              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2163              TCP(sport=1234, dport=1234))
2164         self.pg0.add_stream(p)
2165         self.pg_enable_capture(self.pg_interfaces)
2166         self.pg_start()
2167         p = self.pg1.get_capture(1)
2168         packet = p[0]
2169         try:
2170             self.assertEqual(packet[IP].src, nat_ip)
2171             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2172             self.assertTrue(packet.haslayer(GRE))
2173             self.check_ip_checksum(packet)
2174         except:
2175             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2176             raise
2177
2178         # out2in
2179         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2180              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2181              GRE() /
2182              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2183              TCP(sport=1234, dport=1234))
2184         self.pg1.add_stream(p)
2185         self.pg_enable_capture(self.pg_interfaces)
2186         self.pg_start()
2187         p = self.pg0.get_capture(1)
2188         packet = p[0]
2189         try:
2190             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2191             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2192             self.assertTrue(packet.haslayer(GRE))
2193             self.check_ip_checksum(packet)
2194         except:
2195             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2196             raise
2197
2198     def test_hairpinning_static_unknown_proto(self):
2199         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2200
2201         host = self.pg0.remote_hosts[0]
2202         server = self.pg0.remote_hosts[1]
2203
2204         host_nat_ip = "10.0.0.10"
2205         server_nat_ip = "10.0.0.11"
2206
2207         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2208         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2209         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2210         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2211                                                   is_inside=0)
2212
2213         # host to server
2214         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2215              IP(src=host.ip4, dst=server_nat_ip) /
2216              GRE() /
2217              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2218              TCP(sport=1234, dport=1234))
2219         self.pg0.add_stream(p)
2220         self.pg_enable_capture(self.pg_interfaces)
2221         self.pg_start()
2222         p = self.pg0.get_capture(1)
2223         packet = p[0]
2224         try:
2225             self.assertEqual(packet[IP].src, host_nat_ip)
2226             self.assertEqual(packet[IP].dst, server.ip4)
2227             self.assertTrue(packet.haslayer(GRE))
2228             self.check_ip_checksum(packet)
2229         except:
2230             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231             raise
2232
2233         # server to host
2234         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2235              IP(src=server.ip4, dst=host_nat_ip) /
2236              GRE() /
2237              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2238              TCP(sport=1234, dport=1234))
2239         self.pg0.add_stream(p)
2240         self.pg_enable_capture(self.pg_interfaces)
2241         self.pg_start()
2242         p = self.pg0.get_capture(1)
2243         packet = p[0]
2244         try:
2245             self.assertEqual(packet[IP].src, server_nat_ip)
2246             self.assertEqual(packet[IP].dst, host.ip4)
2247             self.assertTrue(packet.haslayer(GRE))
2248             self.check_ip_checksum(packet)
2249         except:
2250             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2251             raise
2252
2253     def test_unknown_proto(self):
2254         """ NAT44 translate packet with unknown protocol """
2255         self.nat44_add_address(self.nat_addr)
2256         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2257         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2258                                                   is_inside=0)
2259
2260         # in2out
2261         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2263              TCP(sport=self.tcp_port_in, dport=20))
2264         self.pg0.add_stream(p)
2265         self.pg_enable_capture(self.pg_interfaces)
2266         self.pg_start()
2267         p = self.pg1.get_capture(1)
2268
2269         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2271              GRE() /
2272              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2273              TCP(sport=1234, dport=1234))
2274         self.pg0.add_stream(p)
2275         self.pg_enable_capture(self.pg_interfaces)
2276         self.pg_start()
2277         p = self.pg1.get_capture(1)
2278         packet = p[0]
2279         try:
2280             self.assertEqual(packet[IP].src, self.nat_addr)
2281             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2282             self.assertTrue(packet.haslayer(GRE))
2283             self.check_ip_checksum(packet)
2284         except:
2285             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2286             raise
2287
2288         # out2in
2289         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2290              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2291              GRE() /
2292              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2293              TCP(sport=1234, dport=1234))
2294         self.pg1.add_stream(p)
2295         self.pg_enable_capture(self.pg_interfaces)
2296         self.pg_start()
2297         p = self.pg0.get_capture(1)
2298         packet = p[0]
2299         try:
2300             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2301             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2302             self.assertTrue(packet.haslayer(GRE))
2303             self.check_ip_checksum(packet)
2304         except:
2305             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2306             raise
2307
2308     def test_hairpinning_unknown_proto(self):
2309         """ NAT44 translate packet with unknown protocol - hairpinning """
2310         host = self.pg0.remote_hosts[0]
2311         server = self.pg0.remote_hosts[1]
2312         host_in_port = 1234
2313         host_out_port = 0
2314         server_in_port = 5678
2315         server_out_port = 8765
2316         server_nat_ip = "10.0.0.11"
2317
2318         self.nat44_add_address(self.nat_addr)
2319         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2320         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2321                                                   is_inside=0)
2322
2323         # add static mapping for server
2324         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2325
2326         # host to server
2327         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2328              IP(src=host.ip4, dst=server_nat_ip) /
2329              TCP(sport=host_in_port, dport=server_out_port))
2330         self.pg0.add_stream(p)
2331         self.pg_enable_capture(self.pg_interfaces)
2332         self.pg_start()
2333         capture = self.pg0.get_capture(1)
2334
2335         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2336              IP(src=host.ip4, dst=server_nat_ip) /
2337              GRE() /
2338              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2339              TCP(sport=1234, dport=1234))
2340         self.pg0.add_stream(p)
2341         self.pg_enable_capture(self.pg_interfaces)
2342         self.pg_start()
2343         p = self.pg0.get_capture(1)
2344         packet = p[0]
2345         try:
2346             self.assertEqual(packet[IP].src, self.nat_addr)
2347             self.assertEqual(packet[IP].dst, server.ip4)
2348             self.assertTrue(packet.haslayer(GRE))
2349             self.check_ip_checksum(packet)
2350         except:
2351             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2352             raise
2353
2354         # server to host
2355         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2356              IP(src=server.ip4, dst=self.nat_addr) /
2357              GRE() /
2358              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2359              TCP(sport=1234, dport=1234))
2360         self.pg0.add_stream(p)
2361         self.pg_enable_capture(self.pg_interfaces)
2362         self.pg_start()
2363         p = self.pg0.get_capture(1)
2364         packet = p[0]
2365         try:
2366             self.assertEqual(packet[IP].src, server_nat_ip)
2367             self.assertEqual(packet[IP].dst, host.ip4)
2368             self.assertTrue(packet.haslayer(GRE))
2369             self.check_ip_checksum(packet)
2370         except:
2371             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2372             raise
2373
2374     def test_output_feature(self):
2375         """ NAT44 interface output feature (in2out postrouting) """
2376         self.nat44_add_address(self.nat_addr)
2377         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2378         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2379         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2380                                                          is_inside=0)
2381
2382         # in2out
2383         pkts = self.create_stream_in(self.pg0, self.pg3)
2384         self.pg0.add_stream(pkts)
2385         self.pg_enable_capture(self.pg_interfaces)
2386         self.pg_start()
2387         capture = self.pg3.get_capture(len(pkts))
2388         self.verify_capture_out(capture)
2389
2390         # out2in
2391         pkts = self.create_stream_out(self.pg3)
2392         self.pg3.add_stream(pkts)
2393         self.pg_enable_capture(self.pg_interfaces)
2394         self.pg_start()
2395         capture = self.pg0.get_capture(len(pkts))
2396         self.verify_capture_in(capture, self.pg0)
2397
2398         # from non-NAT interface to NAT inside interface
2399         pkts = self.create_stream_in(self.pg2, self.pg0)
2400         self.pg2.add_stream(pkts)
2401         self.pg_enable_capture(self.pg_interfaces)
2402         self.pg_start()
2403         capture = self.pg0.get_capture(len(pkts))
2404         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2405
2406     def test_output_feature_vrf_aware(self):
2407         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2408         nat_ip_vrf10 = "10.0.0.10"
2409         nat_ip_vrf20 = "10.0.0.20"
2410
2411         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2412                                    dst_address_length=32,
2413                                    next_hop_address=self.pg3.remote_ip4n,
2414                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2415                                    table_id=10)
2416         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2417                                    dst_address_length=32,
2418                                    next_hop_address=self.pg3.remote_ip4n,
2419                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2420                                    table_id=20)
2421
2422         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2423         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2424         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2425         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2426         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2427                                                          is_inside=0)
2428
2429         # in2out VRF 10
2430         pkts = self.create_stream_in(self.pg4, self.pg3)
2431         self.pg4.add_stream(pkts)
2432         self.pg_enable_capture(self.pg_interfaces)
2433         self.pg_start()
2434         capture = self.pg3.get_capture(len(pkts))
2435         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2436
2437         # out2in VRF 10
2438         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2439         self.pg3.add_stream(pkts)
2440         self.pg_enable_capture(self.pg_interfaces)
2441         self.pg_start()
2442         capture = self.pg4.get_capture(len(pkts))
2443         self.verify_capture_in(capture, self.pg4)
2444
2445         # in2out VRF 20
2446         pkts = self.create_stream_in(self.pg6, self.pg3)
2447         self.pg6.add_stream(pkts)
2448         self.pg_enable_capture(self.pg_interfaces)
2449         self.pg_start()
2450         capture = self.pg3.get_capture(len(pkts))
2451         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2452
2453         # out2in VRF 20
2454         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2455         self.pg3.add_stream(pkts)
2456         self.pg_enable_capture(self.pg_interfaces)
2457         self.pg_start()
2458         capture = self.pg6.get_capture(len(pkts))
2459         self.verify_capture_in(capture, self.pg6)
2460
2461     def test_output_feature_hairpinning(self):
2462         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2463         host = self.pg0.remote_hosts[0]
2464         server = self.pg0.remote_hosts[1]
2465         host_in_port = 1234
2466         host_out_port = 0
2467         server_in_port = 5678
2468         server_out_port = 8765
2469
2470         self.nat44_add_address(self.nat_addr)
2471         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2472         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2473                                                          is_inside=0)
2474
2475         # add static mapping for server
2476         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2477                                       server_in_port, server_out_port,
2478                                       proto=IP_PROTOS.tcp)
2479
2480         # send packet from host to server
2481         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2482              IP(src=host.ip4, dst=self.nat_addr) /
2483              TCP(sport=host_in_port, dport=server_out_port))
2484         self.pg0.add_stream(p)
2485         self.pg_enable_capture(self.pg_interfaces)
2486         self.pg_start()
2487         capture = self.pg0.get_capture(1)
2488         p = capture[0]
2489         try:
2490             ip = p[IP]
2491             tcp = p[TCP]
2492             self.assertEqual(ip.src, self.nat_addr)
2493             self.assertEqual(ip.dst, server.ip4)
2494             self.assertNotEqual(tcp.sport, host_in_port)
2495             self.assertEqual(tcp.dport, server_in_port)
2496             self.check_tcp_checksum(p)
2497             host_out_port = tcp.sport
2498         except:
2499             self.logger.error(ppp("Unexpected or invalid packet:", p))
2500             raise
2501
2502         # send reply from server to host
2503         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2504              IP(src=server.ip4, dst=self.nat_addr) /
2505              TCP(sport=server_in_port, dport=host_out_port))
2506         self.pg0.add_stream(p)
2507         self.pg_enable_capture(self.pg_interfaces)
2508         self.pg_start()
2509         capture = self.pg0.get_capture(1)
2510         p = capture[0]
2511         try:
2512             ip = p[IP]
2513             tcp = p[TCP]
2514             self.assertEqual(ip.src, self.nat_addr)
2515             self.assertEqual(ip.dst, host.ip4)
2516             self.assertEqual(tcp.sport, server_out_port)
2517             self.assertEqual(tcp.dport, host_in_port)
2518             self.check_tcp_checksum(p)
2519         except:
2520             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2521             raise
2522
2523     def test_one_armed_nat44(self):
2524         """ One armed NAT44 """
2525         remote_host = self.pg9.remote_hosts[0]
2526         local_host = self.pg9.remote_hosts[1]
2527         external_port = 0
2528
2529         self.nat44_add_address(self.nat_addr)
2530         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2531         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2532                                                   is_inside=0)
2533
2534         # in2out
2535         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2536              IP(src=local_host.ip4, dst=remote_host.ip4) /
2537              TCP(sport=12345, dport=80))
2538         self.pg9.add_stream(p)
2539         self.pg_enable_capture(self.pg_interfaces)
2540         self.pg_start()
2541         capture = self.pg9.get_capture(1)
2542         p = capture[0]
2543         try:
2544             ip = p[IP]
2545             tcp = p[TCP]
2546             self.assertEqual(ip.src, self.nat_addr)
2547             self.assertEqual(ip.dst, remote_host.ip4)
2548             self.assertNotEqual(tcp.sport, 12345)
2549             external_port = tcp.sport
2550             self.assertEqual(tcp.dport, 80)
2551             self.check_tcp_checksum(p)
2552             self.check_ip_checksum(p)
2553         except:
2554             self.logger.error(ppp("Unexpected or invalid packet:", p))
2555             raise
2556
2557         # out2in
2558         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2559              IP(src=remote_host.ip4, dst=self.nat_addr) /
2560              TCP(sport=80, dport=external_port))
2561         self.pg9.add_stream(p)
2562         self.pg_enable_capture(self.pg_interfaces)
2563         self.pg_start()
2564         capture = self.pg9.get_capture(1)
2565         p = capture[0]
2566         try:
2567             ip = p[IP]
2568             tcp = p[TCP]
2569             self.assertEqual(ip.src, remote_host.ip4)
2570             self.assertEqual(ip.dst, local_host.ip4)
2571             self.assertEqual(tcp.sport, 80)
2572             self.assertEqual(tcp.dport, 12345)
2573             self.check_tcp_checksum(p)
2574             self.check_ip_checksum(p)
2575         except:
2576             self.logger.error(ppp("Unexpected or invalid packet:", p))
2577             raise
2578
2579     def test_del_session(self):
2580         """ Delete NAT44 session """
2581         self.nat44_add_address(self.nat_addr)
2582         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2583         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2584                                                   is_inside=0)
2585
2586         pkts = self.create_stream_in(self.pg0, self.pg1)
2587         self.pg0.add_stream(pkts)
2588         self.pg_enable_capture(self.pg_interfaces)
2589         self.pg_start()
2590         capture = self.pg1.get_capture(len(pkts))
2591
2592         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2593         nsessions = len(sessions)
2594
2595         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2596                                     sessions[0].inside_port,
2597                                     sessions[0].protocol)
2598         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2599                                     sessions[1].outside_port,
2600                                     sessions[1].protocol,
2601                                     is_in=0)
2602
2603         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2604         self.assertEqual(nsessions - len(sessions), 2)
2605
2606     def test_set_get_reass(self):
2607         """ NAT44 set/get virtual fragmentation reassembly """
2608         reas_cfg1 = self.vapi.nat_get_reass()
2609
2610         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2611                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2612                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2613
2614         reas_cfg2 = self.vapi.nat_get_reass()
2615
2616         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2617         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2618         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2619
2620         self.vapi.nat_set_reass(drop_frag=1)
2621         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2622
2623     def test_frag_in_order(self):
2624         """ NAT44 translate fragments arriving in order """
2625         self.nat44_add_address(self.nat_addr)
2626         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2627         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2628                                                   is_inside=0)
2629
2630         data = "A" * 4 + "B" * 16 + "C" * 3
2631         self.tcp_port_in = random.randint(1025, 65535)
2632
2633         reass = self.vapi.nat_reass_dump()
2634         reass_n_start = len(reass)
2635
2636         # in2out
2637         pkts = self.create_stream_frag(self.pg0,
2638                                        self.pg1.remote_ip4,
2639                                        self.tcp_port_in,
2640                                        20,
2641                                        data)
2642         self.pg0.add_stream(pkts)
2643         self.pg_enable_capture(self.pg_interfaces)
2644         self.pg_start()
2645         frags = self.pg1.get_capture(len(pkts))
2646         p = self.reass_frags_and_verify(frags,
2647                                         self.nat_addr,
2648                                         self.pg1.remote_ip4)
2649         self.assertEqual(p[TCP].dport, 20)
2650         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2651         self.tcp_port_out = p[TCP].sport
2652         self.assertEqual(data, p[Raw].load)
2653
2654         # out2in
2655         pkts = self.create_stream_frag(self.pg1,
2656                                        self.nat_addr,
2657                                        20,
2658                                        self.tcp_port_out,
2659                                        data)
2660         self.pg1.add_stream(pkts)
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663         frags = self.pg0.get_capture(len(pkts))
2664         p = self.reass_frags_and_verify(frags,
2665                                         self.pg1.remote_ip4,
2666                                         self.pg0.remote_ip4)
2667         self.assertEqual(p[TCP].sport, 20)
2668         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2669         self.assertEqual(data, p[Raw].load)
2670
2671         reass = self.vapi.nat_reass_dump()
2672         reass_n_end = len(reass)
2673
2674         self.assertEqual(reass_n_end - reass_n_start, 2)
2675
2676     def test_reass_hairpinning(self):
2677         """ NAT44 fragments hairpinning """
2678         host = self.pg0.remote_hosts[0]
2679         server = self.pg0.remote_hosts[1]
2680         host_in_port = random.randint(1025, 65535)
2681         host_out_port = 0
2682         server_in_port = random.randint(1025, 65535)
2683         server_out_port = random.randint(1025, 65535)
2684         data = "A" * 4 + "B" * 16 + "C" * 3
2685
2686         self.nat44_add_address(self.nat_addr)
2687         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2688         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2689                                                   is_inside=0)
2690         # add static mapping for server
2691         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2692                                       server_in_port, server_out_port,
2693                                       proto=IP_PROTOS.tcp)
2694
2695         # send packet from host to server
2696         pkts = self.create_stream_frag(self.pg0,
2697                                        self.nat_addr,
2698                                        host_in_port,
2699                                        server_out_port,
2700                                        data)
2701         self.pg0.add_stream(pkts)
2702         self.pg_enable_capture(self.pg_interfaces)
2703         self.pg_start()
2704         frags = self.pg0.get_capture(len(pkts))
2705         p = self.reass_frags_and_verify(frags,
2706                                         self.nat_addr,
2707                                         server.ip4)
2708         self.assertNotEqual(p[TCP].sport, host_in_port)
2709         self.assertEqual(p[TCP].dport, server_in_port)
2710         self.assertEqual(data, p[Raw].load)
2711
2712     def test_frag_out_of_order(self):
2713         """ NAT44 translate fragments arriving out of order """
2714         self.nat44_add_address(self.nat_addr)
2715         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2716         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2717                                                   is_inside=0)
2718
2719         data = "A" * 4 + "B" * 16 + "C" * 3
2720         random.randint(1025, 65535)
2721
2722         # in2out
2723         pkts = self.create_stream_frag(self.pg0,
2724                                        self.pg1.remote_ip4,
2725                                        self.tcp_port_in,
2726                                        20,
2727                                        data)
2728         pkts.reverse()
2729         self.pg0.add_stream(pkts)
2730         self.pg_enable_capture(self.pg_interfaces)
2731         self.pg_start()
2732         frags = self.pg1.get_capture(len(pkts))
2733         p = self.reass_frags_and_verify(frags,
2734                                         self.nat_addr,
2735                                         self.pg1.remote_ip4)
2736         self.assertEqual(p[TCP].dport, 20)
2737         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2738         self.tcp_port_out = p[TCP].sport
2739         self.assertEqual(data, p[Raw].load)
2740
2741         # out2in
2742         pkts = self.create_stream_frag(self.pg1,
2743                                        self.nat_addr,
2744                                        20,
2745                                        self.tcp_port_out,
2746                                        data)
2747         pkts.reverse()
2748         self.pg1.add_stream(pkts)
2749         self.pg_enable_capture(self.pg_interfaces)
2750         self.pg_start()
2751         frags = self.pg0.get_capture(len(pkts))
2752         p = self.reass_frags_and_verify(frags,
2753                                         self.pg1.remote_ip4,
2754                                         self.pg0.remote_ip4)
2755         self.assertEqual(p[TCP].sport, 20)
2756         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2757         self.assertEqual(data, p[Raw].load)
2758
2759     def tearDown(self):
2760         super(TestNAT44, self).tearDown()
2761         if not self.vpp_dead:
2762             self.logger.info(self.vapi.cli("show nat44 verbose"))
2763             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
2764             self.clear_nat44()
2765
2766
2767 class TestDeterministicNAT(MethodHolder):
2768     """ Deterministic NAT Test Cases """
2769
2770     @classmethod
2771     def setUpConstants(cls):
2772         super(TestDeterministicNAT, cls).setUpConstants()
2773         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2774
2775     @classmethod
2776     def setUpClass(cls):
2777         super(TestDeterministicNAT, cls).setUpClass()
2778
2779         try:
2780             cls.tcp_port_in = 6303
2781             cls.tcp_external_port = 6303
2782             cls.udp_port_in = 6304
2783             cls.udp_external_port = 6304
2784             cls.icmp_id_in = 6305
2785             cls.nat_addr = '10.0.0.3'
2786
2787             cls.create_pg_interfaces(range(3))
2788             cls.interfaces = list(cls.pg_interfaces)
2789
2790             for i in cls.interfaces:
2791                 i.admin_up()
2792                 i.config_ip4()
2793                 i.resolve_arp()
2794
2795             cls.pg0.generate_remote_hosts(2)
2796             cls.pg0.configure_ipv4_neighbors()
2797
2798         except Exception:
2799             super(TestDeterministicNAT, cls).tearDownClass()
2800             raise
2801
2802     def create_stream_in(self, in_if, out_if, ttl=64):
2803         """
2804         Create packet stream for inside network
2805
2806         :param in_if: Inside interface
2807         :param out_if: Outside interface
2808         :param ttl: TTL of generated packets
2809         """
2810         pkts = []
2811         # TCP
2812         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2813              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2814              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2815         pkts.append(p)
2816
2817         # UDP
2818         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2819              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2820              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2821         pkts.append(p)
2822
2823         # ICMP
2824         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2825              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2826              ICMP(id=self.icmp_id_in, type='echo-request'))
2827         pkts.append(p)
2828
2829         return pkts
2830
2831     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2832         """
2833         Create packet stream for outside network
2834
2835         :param out_if: Outside interface
2836         :param dst_ip: Destination IP address (Default use global NAT address)
2837         :param ttl: TTL of generated packets
2838         """
2839         if dst_ip is None:
2840             dst_ip = self.nat_addr
2841         pkts = []
2842         # TCP
2843         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2844              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2845              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2846         pkts.append(p)
2847
2848         # UDP
2849         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2850              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2851              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2852         pkts.append(p)
2853
2854         # ICMP
2855         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2856              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2857              ICMP(id=self.icmp_external_id, type='echo-reply'))
2858         pkts.append(p)
2859
2860         return pkts
2861
2862     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2863         """
2864         Verify captured packets on outside network
2865
2866         :param capture: Captured packets
2867         :param nat_ip: Translated IP address (Default use global NAT address)
2868         :param same_port: Sorce port number is not translated (Default False)
2869         :param packet_num: Expected number of packets (Default 3)
2870         """
2871         if nat_ip is None:
2872             nat_ip = self.nat_addr
2873         self.assertEqual(packet_num, len(capture))
2874         for packet in capture:
2875             try:
2876                 self.assertEqual(packet[IP].src, nat_ip)
2877                 if packet.haslayer(TCP):
2878                     self.tcp_port_out = packet[TCP].sport
2879                 elif packet.haslayer(UDP):
2880                     self.udp_port_out = packet[UDP].sport
2881                 else:
2882                     self.icmp_external_id = packet[ICMP].id
2883             except:
2884                 self.logger.error(ppp("Unexpected or invalid packet "
2885                                       "(outside network):", packet))
2886                 raise
2887
2888     def initiate_tcp_session(self, in_if, out_if):
2889         """
2890         Initiates TCP session
2891
2892         :param in_if: Inside interface
2893         :param out_if: Outside interface
2894         """
2895         try:
2896             # SYN packet in->out
2897             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2898                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2899                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2900                      flags="S"))
2901             in_if.add_stream(p)
2902             self.pg_enable_capture(self.pg_interfaces)
2903             self.pg_start()
2904             capture = out_if.get_capture(1)
2905             p = capture[0]
2906             self.tcp_port_out = p[TCP].sport
2907
2908             # SYN + ACK packet out->in
2909             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2910                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2911                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2912                      flags="SA"))
2913             out_if.add_stream(p)
2914             self.pg_enable_capture(self.pg_interfaces)
2915             self.pg_start()
2916             in_if.get_capture(1)
2917
2918             # ACK packet in->out
2919             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2920                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2921                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2922                      flags="A"))
2923             in_if.add_stream(p)
2924             self.pg_enable_capture(self.pg_interfaces)
2925             self.pg_start()
2926             out_if.get_capture(1)
2927
2928         except:
2929             self.logger.error("TCP 3 way handshake failed")
2930             raise
2931
2932     def verify_ipfix_max_entries_per_user(self, data):
2933         """
2934         Verify IPFIX maximum entries per user exceeded event
2935
2936         :param data: Decoded IPFIX data records
2937         """
2938         self.assertEqual(1, len(data))
2939         record = data[0]
2940         # natEvent
2941         self.assertEqual(ord(record[230]), 13)
2942         # natQuotaExceededEvent
2943         self.assertEqual('\x03\x00\x00\x00', record[466])
2944         # sourceIPv4Address
2945         self.assertEqual(self.pg0.remote_ip4n, record[8])
2946
2947     def test_deterministic_mode(self):
2948         """ NAT plugin run deterministic mode """
2949         in_addr = '172.16.255.0'
2950         out_addr = '172.17.255.50'
2951         in_addr_t = '172.16.255.20'
2952         in_addr_n = socket.inet_aton(in_addr)
2953         out_addr_n = socket.inet_aton(out_addr)
2954         in_addr_t_n = socket.inet_aton(in_addr_t)
2955         in_plen = 24
2956         out_plen = 32
2957
2958         nat_config = self.vapi.nat_show_config()
2959         self.assertEqual(1, nat_config.deterministic)
2960
2961         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2962
2963         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2964         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2965         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2966         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2967
2968         deterministic_mappings = self.vapi.nat_det_map_dump()
2969         self.assertEqual(len(deterministic_mappings), 1)
2970         dsm = deterministic_mappings[0]
2971         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2972         self.assertEqual(in_plen, dsm.in_plen)
2973         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2974         self.assertEqual(out_plen, dsm.out_plen)
2975
2976         self.clear_nat_det()
2977         deterministic_mappings = self.vapi.nat_det_map_dump()
2978         self.assertEqual(len(deterministic_mappings), 0)
2979
2980     def test_set_timeouts(self):
2981         """ Set deterministic NAT timeouts """
2982         timeouts_before = self.vapi.nat_det_get_timeouts()
2983
2984         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2985                                        timeouts_before.tcp_established + 10,
2986                                        timeouts_before.tcp_transitory + 10,
2987                                        timeouts_before.icmp + 10)
2988
2989         timeouts_after = self.vapi.nat_det_get_timeouts()
2990
2991         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2992         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2993         self.assertNotEqual(timeouts_before.tcp_established,
2994                             timeouts_after.tcp_established)
2995         self.assertNotEqual(timeouts_before.tcp_transitory,
2996                             timeouts_after.tcp_transitory)
2997
2998     def test_det_in(self):
2999         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3000
3001         nat_ip = "10.0.0.10"
3002
3003         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3004                                       32,
3005                                       socket.inet_aton(nat_ip),
3006                                       32)
3007         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3008         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3009                                                   is_inside=0)
3010
3011         # in2out
3012         pkts = self.create_stream_in(self.pg0, self.pg1)
3013         self.pg0.add_stream(pkts)
3014         self.pg_enable_capture(self.pg_interfaces)
3015         self.pg_start()
3016         capture = self.pg1.get_capture(len(pkts))
3017         self.verify_capture_out(capture, nat_ip)
3018
3019         # out2in
3020         pkts = self.create_stream_out(self.pg1, nat_ip)
3021         self.pg1.add_stream(pkts)
3022         self.pg_enable_capture(self.pg_interfaces)
3023         self.pg_start()
3024         capture = self.pg0.get_capture(len(pkts))
3025         self.verify_capture_in(capture, self.pg0)
3026
3027         # session dump test
3028         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3029         self.assertEqual(len(sessions), 3)
3030
3031         # TCP session
3032         s = sessions[0]
3033         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3034         self.assertEqual(s.in_port, self.tcp_port_in)
3035         self.assertEqual(s.out_port, self.tcp_port_out)
3036         self.assertEqual(s.ext_port, self.tcp_external_port)
3037
3038         # UDP session
3039         s = sessions[1]
3040         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3041         self.assertEqual(s.in_port, self.udp_port_in)
3042         self.assertEqual(s.out_port, self.udp_port_out)
3043         self.assertEqual(s.ext_port, self.udp_external_port)
3044
3045         # ICMP session
3046         s = sessions[2]
3047         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3048         self.assertEqual(s.in_port, self.icmp_id_in)
3049         self.assertEqual(s.out_port, self.icmp_external_id)
3050
3051     def test_multiple_users(self):
3052         """ Deterministic NAT multiple users """
3053
3054         nat_ip = "10.0.0.10"
3055         port_in = 80
3056         external_port = 6303
3057
3058         host0 = self.pg0.remote_hosts[0]
3059         host1 = self.pg0.remote_hosts[1]
3060
3061         self.vapi.nat_det_add_del_map(host0.ip4n,
3062                                       24,
3063                                       socket.inet_aton(nat_ip),
3064                                       32)
3065         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3066         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3067                                                   is_inside=0)
3068
3069         # host0 to out
3070         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3071              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3072              TCP(sport=port_in, dport=external_port))
3073         self.pg0.add_stream(p)
3074         self.pg_enable_capture(self.pg_interfaces)
3075         self.pg_start()
3076         capture = self.pg1.get_capture(1)
3077         p = capture[0]
3078         try:
3079             ip = p[IP]
3080             tcp = p[TCP]
3081             self.assertEqual(ip.src, nat_ip)
3082             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3083             self.assertEqual(tcp.dport, external_port)
3084             port_out0 = tcp.sport
3085         except:
3086             self.logger.error(ppp("Unexpected or invalid packet:", p))
3087             raise
3088
3089         # host1 to out
3090         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3091              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3092              TCP(sport=port_in, dport=external_port))
3093         self.pg0.add_stream(p)
3094         self.pg_enable_capture(self.pg_interfaces)
3095         self.pg_start()
3096         capture = self.pg1.get_capture(1)
3097         p = capture[0]
3098         try:
3099             ip = p[IP]
3100             tcp = p[TCP]
3101             self.assertEqual(ip.src, nat_ip)
3102             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3103             self.assertEqual(tcp.dport, external_port)
3104             port_out1 = tcp.sport
3105         except:
3106             self.logger.error(ppp("Unexpected or invalid packet:", p))
3107             raise
3108
3109         dms = self.vapi.nat_det_map_dump()
3110         self.assertEqual(1, len(dms))
3111         self.assertEqual(2, dms[0].ses_num)
3112
3113         # out to host0
3114         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3115              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3116              TCP(sport=external_port, dport=port_out0))
3117         self.pg1.add_stream(p)
3118         self.pg_enable_capture(self.pg_interfaces)
3119         self.pg_start()
3120         capture = self.pg0.get_capture(1)
3121         p = capture[0]
3122         try:
3123             ip = p[IP]
3124             tcp = p[TCP]
3125             self.assertEqual(ip.src, self.pg1.remote_ip4)
3126             self.assertEqual(ip.dst, host0.ip4)
3127             self.assertEqual(tcp.dport, port_in)
3128             self.assertEqual(tcp.sport, external_port)
3129         except:
3130             self.logger.error(ppp("Unexpected or invalid packet:", p))
3131             raise
3132
3133         # out to host1
3134         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3135              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3136              TCP(sport=external_port, dport=port_out1))
3137         self.pg1.add_stream(p)
3138         self.pg_enable_capture(self.pg_interfaces)
3139         self.pg_start()
3140         capture = self.pg0.get_capture(1)
3141         p = capture[0]
3142         try:
3143             ip = p[IP]
3144             tcp = p[TCP]
3145             self.assertEqual(ip.src, self.pg1.remote_ip4)
3146             self.assertEqual(ip.dst, host1.ip4)
3147             self.assertEqual(tcp.dport, port_in)
3148             self.assertEqual(tcp.sport, external_port)
3149         except:
3150             self.logger.error(ppp("Unexpected or invalid packet", p))
3151             raise
3152
3153         # session close api test
3154         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3155                                             port_out1,
3156                                             self.pg1.remote_ip4n,
3157                                             external_port)
3158         dms = self.vapi.nat_det_map_dump()
3159         self.assertEqual(dms[0].ses_num, 1)
3160
3161         self.vapi.nat_det_close_session_in(host0.ip4n,
3162                                            port_in,
3163                                            self.pg1.remote_ip4n,
3164                                            external_port)
3165         dms = self.vapi.nat_det_map_dump()
3166         self.assertEqual(dms[0].ses_num, 0)
3167
3168     def test_tcp_session_close_detection_in(self):
3169         """ Deterministic NAT TCP session close from inside network """
3170         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3171                                       32,
3172                                       socket.inet_aton(self.nat_addr),
3173                                       32)
3174         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3175         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3176                                                   is_inside=0)
3177
3178         self.initiate_tcp_session(self.pg0, self.pg1)
3179
3180         # close the session from inside
3181         try:
3182             # FIN packet in -> out
3183             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3184                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3185                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3186                      flags="F"))
3187             self.pg0.add_stream(p)
3188             self.pg_enable_capture(self.pg_interfaces)
3189             self.pg_start()
3190             self.pg1.get_capture(1)
3191
3192             pkts = []
3193
3194             # ACK packet out -> in
3195             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3196                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3197                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3198                      flags="A"))
3199             pkts.append(p)
3200
3201             # FIN packet out -> in
3202             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3203                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3204                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3205                      flags="F"))
3206             pkts.append(p)
3207
3208             self.pg1.add_stream(pkts)
3209             self.pg_enable_capture(self.pg_interfaces)
3210             self.pg_start()
3211             self.pg0.get_capture(2)
3212
3213             # ACK packet in -> out
3214             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3215                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3216                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3217                      flags="A"))
3218             self.pg0.add_stream(p)
3219             self.pg_enable_capture(self.pg_interfaces)
3220             self.pg_start()
3221             self.pg1.get_capture(1)
3222
3223             # Check if deterministic NAT44 closed the session
3224             dms = self.vapi.nat_det_map_dump()
3225             self.assertEqual(0, dms[0].ses_num)
3226         except:
3227             self.logger.error("TCP session termination failed")
3228             raise
3229
3230     def test_tcp_session_close_detection_out(self):
3231         """ Deterministic NAT TCP session close from outside network """
3232         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3233                                       32,
3234                                       socket.inet_aton(self.nat_addr),
3235                                       32)
3236         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3237         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3238                                                   is_inside=0)
3239
3240         self.initiate_tcp_session(self.pg0, self.pg1)
3241
3242         # close the session from outside
3243         try:
3244             # FIN packet out -> in
3245             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3246                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3247                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3248                      flags="F"))
3249             self.pg1.add_stream(p)
3250             self.pg_enable_capture(self.pg_interfaces)
3251             self.pg_start()
3252             self.pg0.get_capture(1)
3253
3254             pkts = []
3255
3256             # ACK packet in -> out
3257             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3258                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3259                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3260                      flags="A"))
3261             pkts.append(p)
3262
3263             # ACK packet in -> out
3264             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3265                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3266                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3267                      flags="F"))
3268             pkts.append(p)
3269
3270             self.pg0.add_stream(pkts)
3271             self.pg_enable_capture(self.pg_interfaces)
3272             self.pg_start()
3273             self.pg1.get_capture(2)
3274
3275             # ACK packet out -> in
3276             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3277                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3278                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3279                      flags="A"))
3280             self.pg1.add_stream(p)
3281             self.pg_enable_capture(self.pg_interfaces)
3282             self.pg_start()
3283             self.pg0.get_capture(1)
3284
3285             # Check if deterministic NAT44 closed the session
3286             dms = self.vapi.nat_det_map_dump()
3287             self.assertEqual(0, dms[0].ses_num)
3288         except:
3289             self.logger.error("TCP session termination failed")
3290             raise
3291
3292     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3293     def test_session_timeout(self):
3294         """ Deterministic NAT session timeouts """
3295         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3296                                       32,
3297                                       socket.inet_aton(self.nat_addr),
3298                                       32)
3299         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3300         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3301                                                   is_inside=0)
3302
3303         self.initiate_tcp_session(self.pg0, self.pg1)
3304         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3305         pkts = self.create_stream_in(self.pg0, self.pg1)
3306         self.pg0.add_stream(pkts)
3307         self.pg_enable_capture(self.pg_interfaces)
3308         self.pg_start()
3309         capture = self.pg1.get_capture(len(pkts))
3310         sleep(15)
3311
3312         dms = self.vapi.nat_det_map_dump()
3313         self.assertEqual(0, dms[0].ses_num)
3314
3315     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3316     def test_session_limit_per_user(self):
3317         """ Deterministic NAT maximum sessions per user limit """
3318         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3319                                       32,
3320                                       socket.inet_aton(self.nat_addr),
3321                                       32)
3322         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3323         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3324                                                   is_inside=0)
3325         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3326                                      src_address=self.pg2.local_ip4n,
3327                                      path_mtu=512,
3328                                      template_interval=10)
3329         self.vapi.nat_ipfix()
3330
3331         pkts = []
3332         for port in range(1025, 2025):
3333             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3334                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3335                  UDP(sport=port, dport=port))
3336             pkts.append(p)
3337
3338         self.pg0.add_stream(pkts)
3339         self.pg_enable_capture(self.pg_interfaces)
3340         self.pg_start()
3341         capture = self.pg1.get_capture(len(pkts))
3342
3343         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3344              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3345              UDP(sport=3001, dport=3002))
3346         self.pg0.add_stream(p)
3347         self.pg_enable_capture(self.pg_interfaces)
3348         self.pg_start()
3349         capture = self.pg1.assert_nothing_captured()
3350
3351         # verify ICMP error packet
3352         capture = self.pg0.get_capture(1)
3353         p = capture[0]
3354         self.assertTrue(p.haslayer(ICMP))
3355         icmp = p[ICMP]
3356         self.assertEqual(icmp.type, 3)
3357         self.assertEqual(icmp.code, 1)
3358         self.assertTrue(icmp.haslayer(IPerror))
3359         inner_ip = icmp[IPerror]
3360         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3361         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3362
3363         dms = self.vapi.nat_det_map_dump()
3364
3365         self.assertEqual(1000, dms[0].ses_num)
3366
3367         # verify IPFIX logging
3368         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3369         sleep(1)
3370         capture = self.pg2.get_capture(2)
3371         ipfix = IPFIXDecoder()
3372         # first load template
3373         for p in capture:
3374             self.assertTrue(p.haslayer(IPFIX))
3375             if p.haslayer(Template):
3376                 ipfix.add_template(p.getlayer(Template))
3377         # verify events in data set
3378         for p in capture:
3379             if p.haslayer(Data):
3380                 data = ipfix.decode_data_set(p.getlayer(Set))
3381                 self.verify_ipfix_max_entries_per_user(data)
3382
3383     def clear_nat_det(self):
3384         """
3385         Clear deterministic NAT configuration.
3386         """
3387         self.vapi.nat_ipfix(enable=0)
3388         self.vapi.nat_det_set_timeouts()
3389         deterministic_mappings = self.vapi.nat_det_map_dump()
3390         for dsm in deterministic_mappings:
3391             self.vapi.nat_det_add_del_map(dsm.in_addr,
3392                                           dsm.in_plen,
3393                                           dsm.out_addr,
3394                                           dsm.out_plen,
3395                                           is_add=0)
3396
3397         interfaces = self.vapi.nat44_interface_dump()
3398         for intf in interfaces:
3399             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3400                                                       intf.is_inside,
3401                                                       is_add=0)
3402
3403     def tearDown(self):
3404         super(TestDeterministicNAT, self).tearDown()
3405         if not self.vpp_dead:
3406             self.logger.info(self.vapi.cli("show nat44 detail"))
3407             self.clear_nat_det()
3408
3409
3410 class TestNAT64(MethodHolder):
3411     """ NAT64 Test Cases """
3412
3413     @classmethod
3414     def setUpClass(cls):
3415         super(TestNAT64, cls).setUpClass()
3416
3417         try:
3418             cls.tcp_port_in = 6303
3419             cls.tcp_port_out = 6303
3420             cls.udp_port_in = 6304
3421             cls.udp_port_out = 6304
3422             cls.icmp_id_in = 6305
3423             cls.icmp_id_out = 6305
3424             cls.nat_addr = '10.0.0.3'
3425             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3426             cls.vrf1_id = 10
3427             cls.vrf1_nat_addr = '10.0.10.3'
3428             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3429                                                    cls.vrf1_nat_addr)
3430
3431             cls.create_pg_interfaces(range(4))
3432             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3433             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3434             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3435
3436             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3437
3438             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3439
3440             cls.pg0.generate_remote_hosts(2)
3441
3442             for i in cls.ip6_interfaces:
3443                 i.admin_up()
3444                 i.config_ip6()
3445                 i.configure_ipv6_neighbors()
3446
3447             for i in cls.ip4_interfaces:
3448                 i.admin_up()
3449                 i.config_ip4()
3450                 i.resolve_arp()
3451
3452             cls.pg3.admin_up()
3453             cls.pg3.config_ip4()
3454             cls.pg3.resolve_arp()
3455             cls.pg3.config_ip6()
3456             cls.pg3.configure_ipv6_neighbors()
3457
3458         except Exception:
3459             super(TestNAT64, cls).tearDownClass()
3460             raise
3461
3462     def test_pool(self):
3463         """ Add/delete address to NAT64 pool """
3464         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3465
3466         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3467
3468         addresses = self.vapi.nat64_pool_addr_dump()
3469         self.assertEqual(len(addresses), 1)
3470         self.assertEqual(addresses[0].address, nat_addr)
3471
3472         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3473
3474         addresses = self.vapi.nat64_pool_addr_dump()
3475         self.assertEqual(len(addresses), 0)
3476
3477     def test_interface(self):
3478         """ Enable/disable NAT64 feature on the interface """
3479         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3480         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3481
3482         interfaces = self.vapi.nat64_interface_dump()
3483         self.assertEqual(len(interfaces), 2)
3484         pg0_found = False
3485         pg1_found = False
3486         for intf in interfaces:
3487             if intf.sw_if_index == self.pg0.sw_if_index:
3488                 self.assertEqual(intf.is_inside, 1)
3489                 pg0_found = True
3490             elif intf.sw_if_index == self.pg1.sw_if_index:
3491                 self.assertEqual(intf.is_inside, 0)
3492                 pg1_found = True
3493         self.assertTrue(pg0_found)
3494         self.assertTrue(pg1_found)
3495
3496         features = self.vapi.cli("show interface features pg0")
3497         self.assertNotEqual(features.find('nat64-in2out'), -1)
3498         features = self.vapi.cli("show interface features pg1")
3499         self.assertNotEqual(features.find('nat64-out2in'), -1)
3500
3501         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3502         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3503
3504         interfaces = self.vapi.nat64_interface_dump()
3505         self.assertEqual(len(interfaces), 0)
3506
3507     def test_static_bib(self):
3508         """ Add/delete static BIB entry """
3509         in_addr = socket.inet_pton(socket.AF_INET6,
3510                                    '2001:db8:85a3::8a2e:370:7334')
3511         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3512         in_port = 1234
3513         out_port = 5678
3514         proto = IP_PROTOS.tcp
3515
3516         self.vapi.nat64_add_del_static_bib(in_addr,
3517                                            out_addr,
3518                                            in_port,
3519                                            out_port,
3520                                            proto)
3521         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3522         static_bib_num = 0
3523         for bibe in bib:
3524             if bibe.is_static:
3525                 static_bib_num += 1
3526                 self.assertEqual(bibe.i_addr, in_addr)
3527                 self.assertEqual(bibe.o_addr, out_addr)
3528                 self.assertEqual(bibe.i_port, in_port)
3529                 self.assertEqual(bibe.o_port, out_port)
3530         self.assertEqual(static_bib_num, 1)
3531
3532         self.vapi.nat64_add_del_static_bib(in_addr,
3533                                            out_addr,
3534                                            in_port,
3535                                            out_port,
3536                                            proto,
3537                                            is_add=0)
3538         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3539         static_bib_num = 0
3540         for bibe in bib:
3541             if bibe.is_static:
3542                 static_bib_num += 1
3543         self.assertEqual(static_bib_num, 0)
3544
3545     def test_set_timeouts(self):
3546         """ Set NAT64 timeouts """
3547         # verify default values
3548         timeouts = self.vapi.nat64_get_timeouts()
3549         self.assertEqual(timeouts.udp, 300)
3550         self.assertEqual(timeouts.icmp, 60)
3551         self.assertEqual(timeouts.tcp_trans, 240)
3552         self.assertEqual(timeouts.tcp_est, 7440)
3553         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3554
3555         # set and verify custom values
3556         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3557                                      tcp_est=7450, tcp_incoming_syn=10)
3558         timeouts = self.vapi.nat64_get_timeouts()
3559         self.assertEqual(timeouts.udp, 200)
3560         self.assertEqual(timeouts.icmp, 30)
3561         self.assertEqual(timeouts.tcp_trans, 250)
3562         self.assertEqual(timeouts.tcp_est, 7450)
3563         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3564
3565     def test_dynamic(self):
3566         """ NAT64 dynamic translation test """
3567         self.tcp_port_in = 6303
3568         self.udp_port_in = 6304
3569         self.icmp_id_in = 6305
3570
3571         ses_num_start = self.nat64_get_ses_num()
3572
3573         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3574                                                 self.nat_addr_n)
3575         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3576         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3577
3578         # in2out
3579         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3580         self.pg0.add_stream(pkts)
3581         self.pg_enable_capture(self.pg_interfaces)
3582         self.pg_start()
3583         capture = self.pg1.get_capture(len(pkts))
3584         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3585                                 dst_ip=self.pg1.remote_ip4)
3586
3587         # out2in
3588         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3589         self.pg1.add_stream(pkts)
3590         self.pg_enable_capture(self.pg_interfaces)
3591         self.pg_start()
3592         capture = self.pg0.get_capture(len(pkts))
3593         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3594         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3595
3596         # in2out
3597         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3598         self.pg0.add_stream(pkts)
3599         self.pg_enable_capture(self.pg_interfaces)
3600         self.pg_start()
3601         capture = self.pg1.get_capture(len(pkts))
3602         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3603                                 dst_ip=self.pg1.remote_ip4)
3604
3605         # out2in
3606         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3607         self.pg1.add_stream(pkts)
3608         self.pg_enable_capture(self.pg_interfaces)
3609         self.pg_start()
3610         capture = self.pg0.get_capture(len(pkts))
3611         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3612
3613         ses_num_end = self.nat64_get_ses_num()
3614
3615         self.assertEqual(ses_num_end - ses_num_start, 3)
3616
3617         # tenant with specific VRF
3618         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3619                                                 self.vrf1_nat_addr_n,
3620                                                 vrf_id=self.vrf1_id)
3621         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3622
3623         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3624         self.pg2.add_stream(pkts)
3625         self.pg_enable_capture(self.pg_interfaces)
3626         self.pg_start()
3627         capture = self.pg1.get_capture(len(pkts))
3628         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3629                                 dst_ip=self.pg1.remote_ip4)
3630
3631         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3632         self.pg1.add_stream(pkts)
3633         self.pg_enable_capture(self.pg_interfaces)
3634         self.pg_start()
3635         capture = self.pg2.get_capture(len(pkts))
3636         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3637
3638     def test_static(self):
3639         """ NAT64 static translation test """
3640         self.tcp_port_in = 60303
3641         self.udp_port_in = 60304
3642         self.icmp_id_in = 60305
3643         self.tcp_port_out = 60303
3644         self.udp_port_out = 60304
3645         self.icmp_id_out = 60305
3646
3647         ses_num_start = self.nat64_get_ses_num()
3648
3649         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3650                                                 self.nat_addr_n)
3651         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3652         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3653
3654         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3655                                            self.nat_addr_n,
3656                                            self.tcp_port_in,
3657                                            self.tcp_port_out,
3658                                            IP_PROTOS.tcp)
3659         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3660                                            self.nat_addr_n,
3661                                            self.udp_port_in,
3662                                            self.udp_port_out,
3663                                            IP_PROTOS.udp)
3664         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3665                                            self.nat_addr_n,
3666                                            self.icmp_id_in,
3667                                            self.icmp_id_out,
3668                                            IP_PROTOS.icmp)
3669
3670         # in2out
3671         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3672         self.pg0.add_stream(pkts)
3673         self.pg_enable_capture(self.pg_interfaces)
3674         self.pg_start()
3675         capture = self.pg1.get_capture(len(pkts))
3676         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3677                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3678
3679         # out2in
3680         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3681         self.pg1.add_stream(pkts)
3682         self.pg_enable_capture(self.pg_interfaces)
3683         self.pg_start()
3684         capture = self.pg0.get_capture(len(pkts))
3685         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3686         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3687
3688         ses_num_end = self.nat64_get_ses_num()
3689
3690         self.assertEqual(ses_num_end - ses_num_start, 3)
3691
3692     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3693     def test_session_timeout(self):
3694         """ NAT64 session timeout """
3695         self.icmp_id_in = 1234
3696         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3697                                                 self.nat_addr_n)
3698         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3699         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3700         self.vapi.nat64_set_timeouts(icmp=5)
3701
3702         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3703         self.pg0.add_stream(pkts)
3704         self.pg_enable_capture(self.pg_interfaces)
3705         self.pg_start()
3706         capture = self.pg1.get_capture(len(pkts))
3707
3708         ses_num_before_timeout = self.nat64_get_ses_num()
3709
3710         sleep(15)
3711
3712         # ICMP session after timeout
3713         ses_num_after_timeout = self.nat64_get_ses_num()
3714         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3715
3716     def test_icmp_error(self):
3717         """ NAT64 ICMP Error message translation """
3718         self.tcp_port_in = 6303
3719         self.udp_port_in = 6304
3720         self.icmp_id_in = 6305
3721
3722         ses_num_start = self.nat64_get_ses_num()
3723
3724         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3725                                                 self.nat_addr_n)
3726         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3727         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3728
3729         # send some packets to create sessions
3730         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3731         self.pg0.add_stream(pkts)
3732         self.pg_enable_capture(self.pg_interfaces)
3733         self.pg_start()
3734         capture_ip4 = self.pg1.get_capture(len(pkts))
3735         self.verify_capture_out(capture_ip4,
3736                                 nat_ip=self.nat_addr,
3737                                 dst_ip=self.pg1.remote_ip4)
3738
3739         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3740         self.pg1.add_stream(pkts)
3741         self.pg_enable_capture(self.pg_interfaces)
3742         self.pg_start()
3743         capture_ip6 = self.pg0.get_capture(len(pkts))
3744         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3745         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3746                                    self.pg0.remote_ip6)
3747
3748         # in2out
3749         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3750                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3751                 ICMPv6DestUnreach(code=1) /
3752                 packet[IPv6] for packet in capture_ip6]
3753         self.pg0.add_stream(pkts)
3754         self.pg_enable_capture(self.pg_interfaces)
3755         self.pg_start()
3756         capture = self.pg1.get_capture(len(pkts))
3757         for packet in capture:
3758             try:
3759                 self.assertEqual(packet[IP].src, self.nat_addr)
3760                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3761                 self.assertEqual(packet[ICMP].type, 3)
3762                 self.assertEqual(packet[ICMP].code, 13)
3763                 inner = packet[IPerror]
3764                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3765                 self.assertEqual(inner.dst, self.nat_addr)
3766                 self.check_icmp_checksum(packet)
3767                 if inner.haslayer(TCPerror):
3768                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3769                 elif inner.haslayer(UDPerror):
3770                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3771                 else:
3772                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3773             except:
3774                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3775                 raise
3776
3777         # out2in
3778         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3779                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3780                 ICMP(type=3, code=13) /
3781                 packet[IP] for packet in capture_ip4]
3782         self.pg1.add_stream(pkts)
3783         self.pg_enable_capture(self.pg_interfaces)
3784         self.pg_start()
3785         capture = self.pg0.get_capture(len(pkts))
3786         for packet in capture:
3787             try:
3788                 self.assertEqual(packet[IPv6].src, ip.src)
3789                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3790                 icmp = packet[ICMPv6DestUnreach]
3791                 self.assertEqual(icmp.code, 1)
3792                 inner = icmp[IPerror6]
3793                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3794                 self.assertEqual(inner.dst, ip.src)
3795                 self.check_icmpv6_checksum(packet)
3796                 if inner.haslayer(TCPerror):
3797                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3798                 elif inner.haslayer(UDPerror):
3799                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3800                 else:
3801                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3802                                      self.icmp_id_in)
3803             except:
3804                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3805                 raise
3806
3807     def test_hairpinning(self):
3808         """ NAT64 hairpinning """
3809
3810         client = self.pg0.remote_hosts[0]
3811         server = self.pg0.remote_hosts[1]
3812         server_tcp_in_port = 22
3813         server_tcp_out_port = 4022
3814         server_udp_in_port = 23
3815         server_udp_out_port = 4023
3816         client_tcp_in_port = 1234
3817         client_udp_in_port = 1235
3818         client_tcp_out_port = 0
3819         client_udp_out_port = 0
3820         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3821         nat_addr_ip6 = ip.src
3822
3823         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3824                                                 self.nat_addr_n)
3825         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3826         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3827
3828         self.vapi.nat64_add_del_static_bib(server.ip6n,
3829                                            self.nat_addr_n,
3830                                            server_tcp_in_port,
3831                                            server_tcp_out_port,
3832                                            IP_PROTOS.tcp)
3833         self.vapi.nat64_add_del_static_bib(server.ip6n,
3834                                            self.nat_addr_n,
3835                                            server_udp_in_port,
3836                                            server_udp_out_port,
3837                                            IP_PROTOS.udp)
3838
3839         # client to server
3840         pkts = []
3841         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3842              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3843              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3844         pkts.append(p)
3845         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3846              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3847              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3848         pkts.append(p)
3849         self.pg0.add_stream(pkts)
3850         self.pg_enable_capture(self.pg_interfaces)
3851         self.pg_start()
3852         capture = self.pg0.get_capture(len(pkts))
3853         for packet in capture:
3854             try:
3855                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3856                 self.assertEqual(packet[IPv6].dst, server.ip6)
3857                 if packet.haslayer(TCP):
3858                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3859                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3860                     self.check_tcp_checksum(packet)
3861                     client_tcp_out_port = packet[TCP].sport
3862                 else:
3863                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3864                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3865                     self.check_udp_checksum(packet)
3866                     client_udp_out_port = packet[UDP].sport
3867             except:
3868                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3869                 raise
3870
3871         # server to client
3872         pkts = []
3873         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3874              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3875              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3876         pkts.append(p)
3877         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3878              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3879              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3880         pkts.append(p)
3881         self.pg0.add_stream(pkts)
3882         self.pg_enable_capture(self.pg_interfaces)
3883         self.pg_start()
3884         capture = self.pg0.get_capture(len(pkts))
3885         for packet in capture:
3886             try:
3887                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3888                 self.assertEqual(packet[IPv6].dst, client.ip6)
3889                 if packet.haslayer(TCP):
3890                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3891                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3892                     self.check_tcp_checksum(packet)
3893                 else:
3894                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3895                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3896                     self.check_udp_checksum(packet)
3897             except:
3898                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3899                 raise
3900
3901         # ICMP error
3902         pkts = []
3903         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3904                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3905                 ICMPv6DestUnreach(code=1) /
3906                 packet[IPv6] for packet in capture]
3907         self.pg0.add_stream(pkts)
3908         self.pg_enable_capture(self.pg_interfaces)
3909         self.pg_start()
3910         capture = self.pg0.get_capture(len(pkts))
3911         for packet in capture:
3912             try:
3913                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3914                 self.assertEqual(packet[IPv6].dst, server.ip6)
3915                 icmp = packet[ICMPv6DestUnreach]
3916                 self.assertEqual(icmp.code, 1)
3917                 inner = icmp[IPerror6]
3918                 self.assertEqual(inner.src, server.ip6)
3919                 self.assertEqual(inner.dst, nat_addr_ip6)
3920                 self.check_icmpv6_checksum(packet)
3921                 if inner.haslayer(TCPerror):
3922                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3923                     self.assertEqual(inner[TCPerror].dport,
3924                                      client_tcp_out_port)
3925                 else:
3926                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3927                     self.assertEqual(inner[UDPerror].dport,
3928                                      client_udp_out_port)
3929             except:
3930                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3931                 raise
3932
3933     def test_prefix(self):
3934         """ NAT64 Network-Specific Prefix """
3935
3936         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3937                                                 self.nat_addr_n)
3938         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3939         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3940         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3941                                                 self.vrf1_nat_addr_n,
3942                                                 vrf_id=self.vrf1_id)
3943         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3944
3945         # Add global prefix
3946         global_pref64 = "2001:db8::"
3947         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3948         global_pref64_len = 32
3949         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3950
3951         prefix = self.vapi.nat64_prefix_dump()
3952         self.assertEqual(len(prefix), 1)
3953         self.assertEqual(prefix[0].prefix, global_pref64_n)
3954         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3955         self.assertEqual(prefix[0].vrf_id, 0)
3956
3957         # Add tenant specific prefix
3958         vrf1_pref64 = "2001:db8:122:300::"
3959         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3960         vrf1_pref64_len = 56
3961         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3962                                        vrf1_pref64_len,
3963                                        vrf_id=self.vrf1_id)
3964         prefix = self.vapi.nat64_prefix_dump()
3965         self.assertEqual(len(prefix), 2)
3966
3967         # Global prefix
3968         pkts = self.create_stream_in_ip6(self.pg0,
3969                                          self.pg1,
3970                                          pref=global_pref64,
3971                                          plen=global_pref64_len)
3972         self.pg0.add_stream(pkts)
3973         self.pg_enable_capture(self.pg_interfaces)
3974         self.pg_start()
3975         capture = self.pg1.get_capture(len(pkts))
3976         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3977                                 dst_ip=self.pg1.remote_ip4)
3978
3979         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3980         self.pg1.add_stream(pkts)
3981         self.pg_enable_capture(self.pg_interfaces)
3982         self.pg_start()
3983         capture = self.pg0.get_capture(len(pkts))
3984         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3985                                   global_pref64,
3986                                   global_pref64_len)
3987         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3988
3989         # Tenant specific prefix
3990         pkts = self.create_stream_in_ip6(self.pg2,
3991                                          self.pg1,
3992                                          pref=vrf1_pref64,
3993                                          plen=vrf1_pref64_len)
3994         self.pg2.add_stream(pkts)
3995         self.pg_enable_capture(self.pg_interfaces)
3996         self.pg_start()
3997         capture = self.pg1.get_capture(len(pkts))
3998         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3999                                 dst_ip=self.pg1.remote_ip4)
4000
4001         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4002         self.pg1.add_stream(pkts)
4003         self.pg_enable_capture(self.pg_interfaces)
4004         self.pg_start()
4005         capture = self.pg2.get_capture(len(pkts))
4006         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4007                                   vrf1_pref64,
4008                                   vrf1_pref64_len)
4009         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4010
4011     def test_unknown_proto(self):
4012         """ NAT64 translate packet with unknown protocol """
4013
4014         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4015                                                 self.nat_addr_n)
4016         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4017         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4018         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4019
4020         # in2out
4021         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4022              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4023              TCP(sport=self.tcp_port_in, dport=20))
4024         self.pg0.add_stream(p)
4025         self.pg_enable_capture(self.pg_interfaces)
4026         self.pg_start()
4027         p = self.pg1.get_capture(1)
4028
4029         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4030              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4031              GRE() /
4032              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4033              TCP(sport=1234, dport=1234))
4034         self.pg0.add_stream(p)
4035         self.pg_enable_capture(self.pg_interfaces)
4036         self.pg_start()
4037         p = self.pg1.get_capture(1)
4038         packet = p[0]
4039         try:
4040             self.assertEqual(packet[IP].src, self.nat_addr)
4041             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4042             self.assertTrue(packet.haslayer(GRE))
4043             self.check_ip_checksum(packet)
4044         except:
4045             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4046             raise
4047
4048         # out2in
4049         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4050              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4051              GRE() /
4052              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4053              TCP(sport=1234, dport=1234))
4054         self.pg1.add_stream(p)
4055         self.pg_enable_capture(self.pg_interfaces)
4056         self.pg_start()
4057         p = self.pg0.get_capture(1)
4058         packet = p[0]
4059         try:
4060             self.assertEqual(packet[IPv6].src, remote_ip6)
4061             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4062             self.assertEqual(packet[IPv6].nh, 47)
4063         except:
4064             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4065             raise
4066
4067     def test_hairpinning_unknown_proto(self):
4068         """ NAT64 translate packet with unknown protocol - hairpinning """
4069
4070         client = self.pg0.remote_hosts[0]
4071         server = self.pg0.remote_hosts[1]
4072         server_tcp_in_port = 22
4073         server_tcp_out_port = 4022
4074         client_tcp_in_port = 1234
4075         client_tcp_out_port = 1235
4076         server_nat_ip = "10.0.0.100"
4077         client_nat_ip = "10.0.0.110"
4078         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4079         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4080         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4081         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4082
4083         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4084                                                 client_nat_ip_n)
4085         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4086         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4087
4088         self.vapi.nat64_add_del_static_bib(server.ip6n,
4089                                            server_nat_ip_n,
4090                                            server_tcp_in_port,
4091                                            server_tcp_out_port,
4092                                            IP_PROTOS.tcp)
4093
4094         self.vapi.nat64_add_del_static_bib(server.ip6n,
4095                                            server_nat_ip_n,
4096                                            0,
4097                                            0,
4098                                            IP_PROTOS.gre)
4099
4100         self.vapi.nat64_add_del_static_bib(client.ip6n,
4101                                            client_nat_ip_n,
4102                                            client_tcp_in_port,
4103                                            client_tcp_out_port,
4104                                            IP_PROTOS.tcp)
4105
4106         # client to server
4107         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4108              IPv6(src=client.ip6, dst=server_nat_ip6) /
4109              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4110         self.pg0.add_stream(p)
4111         self.pg_enable_capture(self.pg_interfaces)
4112         self.pg_start()
4113         p = self.pg0.get_capture(1)
4114
4115         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4116              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4117              GRE() /
4118              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4119              TCP(sport=1234, dport=1234))
4120         self.pg0.add_stream(p)
4121         self.pg_enable_capture(self.pg_interfaces)
4122         self.pg_start()
4123         p = self.pg0.get_capture(1)
4124         packet = p[0]
4125         try:
4126             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4127             self.assertEqual(packet[IPv6].dst, server.ip6)
4128             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4129         except:
4130             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4131             raise
4132
4133         # server to client
4134         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4135              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4136              GRE() /
4137              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4138              TCP(sport=1234, dport=1234))
4139         self.pg0.add_stream(p)
4140         self.pg_enable_capture(self.pg_interfaces)
4141         self.pg_start()
4142         p = self.pg0.get_capture(1)
4143         packet = p[0]
4144         try:
4145             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4146             self.assertEqual(packet[IPv6].dst, client.ip6)
4147             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4148         except:
4149             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4150             raise
4151
4152     def test_one_armed_nat64(self):
4153         """ One armed NAT64 """
4154         external_port = 0
4155         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4156                                            '64:ff9b::',
4157                                            96)
4158
4159         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4160                                                 self.nat_addr_n)
4161         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4162         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4163
4164         # in2out
4165         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4166              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4167              TCP(sport=12345, dport=80))
4168         self.pg3.add_stream(p)
4169         self.pg_enable_capture(self.pg_interfaces)
4170         self.pg_start()
4171         capture = self.pg3.get_capture(1)
4172         p = capture[0]
4173         try:
4174             ip = p[IP]
4175             tcp = p[TCP]
4176             self.assertEqual(ip.src, self.nat_addr)
4177             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4178             self.assertNotEqual(tcp.sport, 12345)
4179             external_port = tcp.sport
4180             self.assertEqual(tcp.dport, 80)
4181             self.check_tcp_checksum(p)
4182             self.check_ip_checksum(p)
4183         except:
4184             self.logger.error(ppp("Unexpected or invalid packet:", p))
4185             raise
4186
4187         # out2in
4188         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4189              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4190              TCP(sport=80, dport=external_port))
4191         self.pg3.add_stream(p)
4192         self.pg_enable_capture(self.pg_interfaces)
4193         self.pg_start()
4194         capture = self.pg3.get_capture(1)
4195         p = capture[0]
4196         try:
4197             ip = p[IPv6]
4198             tcp = p[TCP]
4199             self.assertEqual(ip.src, remote_host_ip6)
4200             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4201             self.assertEqual(tcp.sport, 80)
4202             self.assertEqual(tcp.dport, 12345)
4203             self.check_tcp_checksum(p)
4204         except:
4205             self.logger.error(ppp("Unexpected or invalid packet:", p))
4206             raise
4207
4208     def test_frag_in_order(self):
4209         """ NAT64 translate fragments arriving in order """
4210         self.tcp_port_in = random.randint(1025, 65535)
4211
4212         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4213                                                 self.nat_addr_n)
4214         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4215         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4216
4217         reass = self.vapi.nat_reass_dump()
4218         reass_n_start = len(reass)
4219
4220         # in2out
4221         data = 'a' * 200
4222         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4223                                            self.tcp_port_in, 20, data)
4224         self.pg0.add_stream(pkts)
4225         self.pg_enable_capture(self.pg_interfaces)
4226         self.pg_start()
4227         frags = self.pg1.get_capture(len(pkts))
4228         p = self.reass_frags_and_verify(frags,
4229                                         self.nat_addr,
4230                                         self.pg1.remote_ip4)
4231         self.assertEqual(p[TCP].dport, 20)
4232         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4233         self.tcp_port_out = p[TCP].sport
4234         self.assertEqual(data, p[Raw].load)
4235
4236         # out2in
4237         data = "A" * 4 + "b" * 16 + "C" * 3
4238         pkts = self.create_stream_frag(self.pg1,
4239                                        self.nat_addr,
4240                                        20,
4241                                        self.tcp_port_out,
4242                                        data)
4243         self.pg1.add_stream(pkts)
4244         self.pg_enable_capture(self.pg_interfaces)
4245         self.pg_start()
4246         frags = self.pg0.get_capture(len(pkts))
4247         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4248         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4249         self.assertEqual(p[TCP].sport, 20)
4250         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4251         self.assertEqual(data, p[Raw].load)
4252
4253         reass = self.vapi.nat_reass_dump()
4254         reass_n_end = len(reass)
4255
4256         self.assertEqual(reass_n_end - reass_n_start, 2)
4257
4258     def test_reass_hairpinning(self):
4259         """ NAT64 fragments hairpinning """
4260         data = 'a' * 200
4261         client = self.pg0.remote_hosts[0]
4262         server = self.pg0.remote_hosts[1]
4263         server_in_port = random.randint(1025, 65535)
4264         server_out_port = random.randint(1025, 65535)
4265         client_in_port = random.randint(1025, 65535)
4266         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4267         nat_addr_ip6 = ip.src
4268
4269         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4270                                                 self.nat_addr_n)
4271         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4272         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4273
4274         # add static BIB entry for server
4275         self.vapi.nat64_add_del_static_bib(server.ip6n,
4276                                            self.nat_addr_n,
4277                                            server_in_port,
4278                                            server_out_port,
4279                                            IP_PROTOS.tcp)
4280
4281         # send packet from host to server
4282         pkts = self.create_stream_frag_ip6(self.pg0,
4283                                            self.nat_addr,
4284                                            client_in_port,
4285                                            server_out_port,
4286                                            data)
4287         self.pg0.add_stream(pkts)
4288         self.pg_enable_capture(self.pg_interfaces)
4289         self.pg_start()
4290         frags = self.pg0.get_capture(len(pkts))
4291         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4292         self.assertNotEqual(p[TCP].sport, client_in_port)
4293         self.assertEqual(p[TCP].dport, server_in_port)
4294         self.assertEqual(data, p[Raw].load)
4295
4296     def test_frag_out_of_order(self):
4297         """ NAT64 translate fragments arriving out of order """
4298         self.tcp_port_in = random.randint(1025, 65535)
4299
4300         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4301                                                 self.nat_addr_n)
4302         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4303         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4304
4305         # in2out
4306         data = 'a' * 200
4307         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4308                                            self.tcp_port_in, 20, data)
4309         pkts.reverse()
4310         self.pg0.add_stream(pkts)
4311         self.pg_enable_capture(self.pg_interfaces)
4312         self.pg_start()
4313         frags = self.pg1.get_capture(len(pkts))
4314         p = self.reass_frags_and_verify(frags,
4315                                         self.nat_addr,
4316                                         self.pg1.remote_ip4)
4317         self.assertEqual(p[TCP].dport, 20)
4318         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4319         self.tcp_port_out = p[TCP].sport
4320         self.assertEqual(data, p[Raw].load)
4321
4322         # out2in
4323         data = "A" * 4 + "B" * 16 + "C" * 3
4324         pkts = self.create_stream_frag(self.pg1,
4325                                        self.nat_addr,
4326                                        20,
4327                                        self.tcp_port_out,
4328                                        data)
4329         pkts.reverse()
4330         self.pg1.add_stream(pkts)
4331         self.pg_enable_capture(self.pg_interfaces)
4332         self.pg_start()
4333         frags = self.pg0.get_capture(len(pkts))
4334         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4335         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4336         self.assertEqual(p[TCP].sport, 20)
4337         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4338         self.assertEqual(data, p[Raw].load)
4339
4340     def nat64_get_ses_num(self):
4341         """
4342         Return number of active NAT64 sessions.
4343         """
4344         st = self.vapi.nat64_st_dump()
4345         return len(st)
4346
4347     def clear_nat64(self):
4348         """
4349         Clear NAT64 configuration.
4350         """
4351         self.vapi.nat64_set_timeouts()
4352
4353         interfaces = self.vapi.nat64_interface_dump()
4354         for intf in interfaces:
4355             if intf.is_inside > 1:
4356                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4357                                                   0,
4358                                                   is_add=0)
4359             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4360                                               intf.is_inside,
4361                                               is_add=0)
4362
4363         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4364         for bibe in bib:
4365             if bibe.is_static:
4366                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4367                                                    bibe.o_addr,
4368                                                    bibe.i_port,
4369                                                    bibe.o_port,
4370                                                    bibe.proto,
4371                                                    bibe.vrf_id,
4372                                                    is_add=0)
4373
4374         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4375         for bibe in bib:
4376             if bibe.is_static:
4377                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4378                                                    bibe.o_addr,
4379                                                    bibe.i_port,
4380                                                    bibe.o_port,
4381                                                    bibe.proto,
4382                                                    bibe.vrf_id,
4383                                                    is_add=0)
4384
4385         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4386         for bibe in bib:
4387             if bibe.is_static:
4388                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4389                                                    bibe.o_addr,
4390                                                    bibe.i_port,
4391                                                    bibe.o_port,
4392                                                    bibe.proto,
4393                                                    bibe.vrf_id,
4394                                                    is_add=0)
4395
4396         adresses = self.vapi.nat64_pool_addr_dump()
4397         for addr in adresses:
4398             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4399                                                     addr.address,
4400                                                     vrf_id=addr.vrf_id,
4401                                                     is_add=0)
4402
4403         prefixes = self.vapi.nat64_prefix_dump()
4404         for prefix in prefixes:
4405             self.vapi.nat64_add_del_prefix(prefix.prefix,
4406                                            prefix.prefix_len,
4407                                            vrf_id=prefix.vrf_id,
4408                                            is_add=0)
4409
4410     def tearDown(self):
4411         super(TestNAT64, self).tearDown()
4412         if not self.vpp_dead:
4413             self.logger.info(self.vapi.cli("show nat64 pool"))
4414             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4415             self.logger.info(self.vapi.cli("show nat64 prefix"))
4416             self.logger.info(self.vapi.cli("show nat64 bib all"))
4417             self.logger.info(self.vapi.cli("show nat64 session table all"))
4418             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4419             self.clear_nat64()
4420
4421
4422 class TestDSlite(MethodHolder):
4423     """ DS-Lite Test Cases """
4424
4425     @classmethod
4426     def setUpClass(cls):
4427         super(TestDSlite, cls).setUpClass()
4428
4429         try:
4430             cls.nat_addr = '10.0.0.3'
4431             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4432
4433             cls.create_pg_interfaces(range(2))
4434             cls.pg0.admin_up()
4435             cls.pg0.config_ip4()
4436             cls.pg0.resolve_arp()
4437             cls.pg1.admin_up()
4438             cls.pg1.config_ip6()
4439             cls.pg1.generate_remote_hosts(2)
4440             cls.pg1.configure_ipv6_neighbors()
4441
4442         except Exception:
4443             super(TestDSlite, cls).tearDownClass()
4444             raise
4445
4446     def test_dslite(self):
4447         """ Test DS-Lite """
4448         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4449                                                  self.nat_addr_n)
4450         aftr_ip4 = '192.0.0.1'
4451         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4452         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4453         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4454         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4455
4456         # UDP
4457         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4458              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4459              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4460              UDP(sport=20000, dport=10000))
4461         self.pg1.add_stream(p)
4462         self.pg_enable_capture(self.pg_interfaces)
4463         self.pg_start()
4464         capture = self.pg0.get_capture(1)
4465         capture = capture[0]
4466         self.assertFalse(capture.haslayer(IPv6))
4467         self.assertEqual(capture[IP].src, self.nat_addr)
4468         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4469         self.assertNotEqual(capture[UDP].sport, 20000)
4470         self.assertEqual(capture[UDP].dport, 10000)
4471         self.check_ip_checksum(capture)
4472         out_port = capture[UDP].sport
4473
4474         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4475              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4476              UDP(sport=10000, dport=out_port))
4477         self.pg0.add_stream(p)
4478         self.pg_enable_capture(self.pg_interfaces)
4479         self.pg_start()
4480         capture = self.pg1.get_capture(1)
4481         capture = capture[0]
4482         self.assertEqual(capture[IPv6].src, aftr_ip6)
4483         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4484         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4485         self.assertEqual(capture[IP].dst, '192.168.1.1')
4486         self.assertEqual(capture[UDP].sport, 10000)
4487         self.assertEqual(capture[UDP].dport, 20000)
4488         self.check_ip_checksum(capture)
4489
4490         # TCP
4491         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4492              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4493              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4494              TCP(sport=20001, dport=10001))
4495         self.pg1.add_stream(p)
4496         self.pg_enable_capture(self.pg_interfaces)
4497         self.pg_start()
4498         capture = self.pg0.get_capture(1)
4499         capture = capture[0]
4500         self.assertFalse(capture.haslayer(IPv6))
4501         self.assertEqual(capture[IP].src, self.nat_addr)
4502         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4503         self.assertNotEqual(capture[TCP].sport, 20001)
4504         self.assertEqual(capture[TCP].dport, 10001)
4505         self.check_ip_checksum(capture)
4506         self.check_tcp_checksum(capture)
4507         out_port = capture[TCP].sport
4508
4509         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4510              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4511              TCP(sport=10001, dport=out_port))
4512         self.pg0.add_stream(p)
4513         self.pg_enable_capture(self.pg_interfaces)
4514         self.pg_start()
4515         capture = self.pg1.get_capture(1)
4516         capture = capture[0]
4517         self.assertEqual(capture[IPv6].src, aftr_ip6)
4518         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4519         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4520         self.assertEqual(capture[IP].dst, '192.168.1.1')
4521         self.assertEqual(capture[TCP].sport, 10001)
4522         self.assertEqual(capture[TCP].dport, 20001)
4523         self.check_ip_checksum(capture)
4524         self.check_tcp_checksum(capture)
4525
4526         # ICMP
4527         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4528              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4529              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4530              ICMP(id=4000, type='echo-request'))
4531         self.pg1.add_stream(p)
4532         self.pg_enable_capture(self.pg_interfaces)
4533         self.pg_start()
4534         capture = self.pg0.get_capture(1)
4535         capture = capture[0]
4536         self.assertFalse(capture.haslayer(IPv6))
4537         self.assertEqual(capture[IP].src, self.nat_addr)
4538         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4539         self.assertNotEqual(capture[ICMP].id, 4000)
4540         self.check_ip_checksum(capture)
4541         self.check_icmp_checksum(capture)
4542         out_id = capture[ICMP].id
4543
4544         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4545              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4546              ICMP(id=out_id, type='echo-reply'))
4547         self.pg0.add_stream(p)
4548         self.pg_enable_capture(self.pg_interfaces)
4549         self.pg_start()
4550         capture = self.pg1.get_capture(1)
4551         capture = capture[0]
4552         self.assertEqual(capture[IPv6].src, aftr_ip6)
4553         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4554         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4555         self.assertEqual(capture[IP].dst, '192.168.1.1')
4556         self.assertEqual(capture[ICMP].id, 4000)
4557         self.check_ip_checksum(capture)
4558         self.check_icmp_checksum(capture)
4559
4560     def tearDown(self):
4561         super(TestDSlite, self).tearDown()
4562         if not self.vpp_dead:
4563             self.logger.info(self.vapi.cli("show dslite pool"))
4564             self.logger.info(
4565                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4566             self.logger.info(self.vapi.cli("show dslite sessions"))
4567
4568 if __name__ == '__main__':
4569     unittest.main(testRunner=VppTestRunner)