NAT: Twice NAT44 (VPP-969)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
246         """
247         Create packet stream for outside network
248
249         :param out_if: Outside interface
250         :param dst_ip: Destination IP address (Default use global NAT address)
251         :param ttl: TTL of generated packets
252         """
253         if dst_ip is None:
254             dst_ip = self.nat_addr
255         pkts = []
256         # TCP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              TCP(dport=self.tcp_port_out, sport=20))
260         pkts.append(p)
261
262         # UDP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              UDP(dport=self.udp_port_out, sport=20))
266         pkts.append(p)
267
268         # ICMP
269         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
270              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
271              ICMP(id=self.icmp_id_out, type='echo-reply'))
272         pkts.append(p)
273
274         return pkts
275
276     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
277                            packet_num=3, dst_ip=None):
278         """
279         Verify captured packets on outside network
280
281         :param capture: Captured packets
282         :param nat_ip: Translated IP address (Default use global NAT address)
283         :param same_port: Sorce port number is not translated (Default False)
284         :param packet_num: Expected number of packets (Default 3)
285         :param dst_ip: Destination IP address (Default do not verify)
286         """
287         if nat_ip is None:
288             nat_ip = self.nat_addr
289         self.assertEqual(packet_num, len(capture))
290         for packet in capture:
291             try:
292                 self.check_ip_checksum(packet)
293                 self.assertEqual(packet[IP].src, nat_ip)
294                 if dst_ip is not None:
295                     self.assertEqual(packet[IP].dst, dst_ip)
296                 if packet.haslayer(TCP):
297                     if same_port:
298                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
299                     else:
300                         self.assertNotEqual(
301                             packet[TCP].sport, self.tcp_port_in)
302                     self.tcp_port_out = packet[TCP].sport
303                     self.check_tcp_checksum(packet)
304                 elif packet.haslayer(UDP):
305                     if same_port:
306                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
307                     else:
308                         self.assertNotEqual(
309                             packet[UDP].sport, self.udp_port_in)
310                     self.udp_port_out = packet[UDP].sport
311                 else:
312                     if same_port:
313                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
314                     else:
315                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
316                     self.icmp_id_out = packet[ICMP].id
317                     self.check_icmp_checksum(packet)
318             except:
319                 self.logger.error(ppp("Unexpected or invalid packet "
320                                       "(outside network):", packet))
321                 raise
322
323     def verify_capture_in(self, capture, in_if, packet_num=3):
324         """
325         Verify captured packets on inside network
326
327         :param capture: Captured packets
328         :param in_if: Inside interface
329         :param packet_num: Expected number of packets (Default 3)
330         """
331         self.assertEqual(packet_num, len(capture))
332         for packet in capture:
333             try:
334                 self.check_ip_checksum(packet)
335                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
336                 if packet.haslayer(TCP):
337                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
338                     self.check_tcp_checksum(packet)
339                 elif packet.haslayer(UDP):
340                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
341                 else:
342                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
343                     self.check_icmp_checksum(packet)
344             except:
345                 self.logger.error(ppp("Unexpected or invalid packet "
346                                       "(inside network):", packet))
347                 raise
348
349     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
350         """
351         Verify captured IPv6 packets on inside network
352
353         :param capture: Captured packets
354         :param src_ip: Source IP
355         :param dst_ip: Destination IP address
356         :param packet_num: Expected number of packets (Default 3)
357         """
358         self.assertEqual(packet_num, len(capture))
359         for packet in capture:
360             try:
361                 self.assertEqual(packet[IPv6].src, src_ip)
362                 self.assertEqual(packet[IPv6].dst, dst_ip)
363                 if packet.haslayer(TCP):
364                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
365                     self.check_tcp_checksum(packet)
366                 elif packet.haslayer(UDP):
367                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
368                     self.check_udp_checksum(packet)
369                 else:
370                     self.assertEqual(packet[ICMPv6EchoReply].id,
371                                      self.icmp_id_in)
372                     self.check_icmpv6_checksum(packet)
373             except:
374                 self.logger.error(ppp("Unexpected or invalid packet "
375                                       "(inside network):", packet))
376                 raise
377
378     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
379         """
380         Verify captured packet that don't have to be translated
381
382         :param capture: Captured packets
383         :param ingress_if: Ingress interface
384         :param egress_if: Egress interface
385         """
386         for packet in capture:
387             try:
388                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
389                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
390                 if packet.haslayer(TCP):
391                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
392                 elif packet.haslayer(UDP):
393                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
394                 else:
395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
396             except:
397                 self.logger.error(ppp("Unexpected or invalid packet "
398                                       "(inside network):", packet))
399                 raise
400
401     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
402                                             packet_num=3, icmp_type=11):
403         """
404         Verify captured packets with ICMP errors on outside network
405
406         :param capture: Captured packets
407         :param src_ip: Translated IP address or IP address of VPP
408                        (Default use global NAT address)
409         :param packet_num: Expected number of packets (Default 3)
410         :param icmp_type: Type of error ICMP packet
411                           we are expecting (Default 11)
412         """
413         if src_ip is None:
414             src_ip = self.nat_addr
415         self.assertEqual(packet_num, len(capture))
416         for packet in capture:
417             try:
418                 self.assertEqual(packet[IP].src, src_ip)
419                 self.assertTrue(packet.haslayer(ICMP))
420                 icmp = packet[ICMP]
421                 self.assertEqual(icmp.type, icmp_type)
422                 self.assertTrue(icmp.haslayer(IPerror))
423                 inner_ip = icmp[IPerror]
424                 if inner_ip.haslayer(TCPerror):
425                     self.assertEqual(inner_ip[TCPerror].dport,
426                                      self.tcp_port_out)
427                 elif inner_ip.haslayer(UDPerror):
428                     self.assertEqual(inner_ip[UDPerror].dport,
429                                      self.udp_port_out)
430                 else:
431                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
432             except:
433                 self.logger.error(ppp("Unexpected or invalid packet "
434                                       "(outside network):", packet))
435                 raise
436
437     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
438                                            icmp_type=11):
439         """
440         Verify captured packets with ICMP errors on inside network
441
442         :param capture: Captured packets
443         :param in_if: Inside interface
444         :param packet_num: Expected number of packets (Default 3)
445         :param icmp_type: Type of error ICMP packet
446                           we are expecting (Default 11)
447         """
448         self.assertEqual(packet_num, len(capture))
449         for packet in capture:
450             try:
451                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
452                 self.assertTrue(packet.haslayer(ICMP))
453                 icmp = packet[ICMP]
454                 self.assertEqual(icmp.type, icmp_type)
455                 self.assertTrue(icmp.haslayer(IPerror))
456                 inner_ip = icmp[IPerror]
457                 if inner_ip.haslayer(TCPerror):
458                     self.assertEqual(inner_ip[TCPerror].sport,
459                                      self.tcp_port_in)
460                 elif inner_ip.haslayer(UDPerror):
461                     self.assertEqual(inner_ip[UDPerror].sport,
462                                      self.udp_port_in)
463                 else:
464                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
465             except:
466                 self.logger.error(ppp("Unexpected or invalid packet "
467                                       "(inside network):", packet))
468                 raise
469
470     def create_stream_frag(self, src_if, dst, sport, dport, data):
471         """
472         Create fragmented packet stream
473
474         :param src_if: Source interface
475         :param dst: Destination IPv4 address
476         :param sport: Source TCP port
477         :param dport: Destination TCP port
478         :param data: Payload data
479         :returns: Fragmets
480         """
481         id = random.randint(0, 65535)
482         p = (IP(src=src_if.remote_ip4, dst=dst) /
483              TCP(sport=sport, dport=dport) /
484              Raw(data))
485         p = p.__class__(str(p))
486         chksum = p['TCP'].chksum
487         pkts = []
488         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
490              TCP(sport=sport, dport=dport, chksum=chksum) /
491              Raw(data[0:4]))
492         pkts.append(p)
493         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
495                 proto=IP_PROTOS.tcp) /
496              Raw(data[4:20]))
497         pkts.append(p)
498         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
499              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500                 id=id) /
501              Raw(data[20:]))
502         pkts.append(p)
503         return pkts
504
505     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
506                                pref=None, plen=0, frag_size=128):
507         """
508         Create fragmented packet stream
509
510         :param src_if: Source interface
511         :param dst: Destination IPv4 address
512         :param sport: Source TCP port
513         :param dport: Destination TCP port
514         :param data: Payload data
515         :param pref: NAT64 prefix
516         :param plen: NAT64 prefix length
517         :param fragsize: size of fragments
518         :returns: Fragmets
519         """
520         if pref is None:
521             dst_ip6 = ''.join(['64:ff9b::', dst])
522         else:
523             dst_ip6 = self.compose_ip6(dst, pref, plen)
524
525         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
526              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
527              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
528              TCP(sport=sport, dport=dport) /
529              Raw(data))
530
531         return fragment6(p, frag_size)
532
533     def reass_frags_and_verify(self, frags, src, dst):
534         """
535         Reassemble and verify fragmented packet
536
537         :param frags: Captured fragments
538         :param src: Source IPv4 address to verify
539         :param dst: Destination IPv4 address to verify
540
541         :returns: Reassembled IPv4 packet
542         """
543         buffer = StringIO.StringIO()
544         for p in frags:
545             self.assertEqual(p[IP].src, src)
546             self.assertEqual(p[IP].dst, dst)
547             self.check_ip_checksum(p)
548             buffer.seek(p[IP].frag * 8)
549             buffer.write(p[IP].payload)
550         ip = frags[0].getlayer(IP)
551         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
552                 proto=frags[0][IP].proto)
553         if ip.proto == IP_PROTOS.tcp:
554             p = (ip / TCP(buffer.getvalue()))
555             self.check_tcp_checksum(p)
556         elif ip.proto == IP_PROTOS.udp:
557             p = (ip / UDP(buffer.getvalue()))
558         return p
559
560     def reass_frags_and_verify_ip6(self, frags, src, dst):
561         """
562         Reassemble and verify fragmented packet
563
564         :param frags: Captured fragments
565         :param src: Source IPv6 address to verify
566         :param dst: Destination IPv6 address to verify
567
568         :returns: Reassembled IPv6 packet
569         """
570         buffer = StringIO.StringIO()
571         for p in frags:
572             self.assertEqual(p[IPv6].src, src)
573             self.assertEqual(p[IPv6].dst, dst)
574             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
575             buffer.write(p[IPv6ExtHdrFragment].payload)
576         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
577                   nh=frags[0][IPv6ExtHdrFragment].nh)
578         if ip.nh == IP_PROTOS.tcp:
579             p = (ip / TCP(buffer.getvalue()))
580             self.check_tcp_checksum(p)
581         elif ip.nh == IP_PROTOS.udp:
582             p = (ip / UDP(buffer.getvalue()))
583         return p
584
585     def verify_ipfix_nat44_ses(self, data):
586         """
587         Verify IPFIX NAT44 session create/delete event
588
589         :param data: Decoded IPFIX data records
590         """
591         nat44_ses_create_num = 0
592         nat44_ses_delete_num = 0
593         self.assertEqual(6, len(data))
594         for record in data:
595             # natEvent
596             self.assertIn(ord(record[230]), [4, 5])
597             if ord(record[230]) == 4:
598                 nat44_ses_create_num += 1
599             else:
600                 nat44_ses_delete_num += 1
601             # sourceIPv4Address
602             self.assertEqual(self.pg0.remote_ip4n, record[8])
603             # postNATSourceIPv4Address
604             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
605                              record[225])
606             # ingressVRFID
607             self.assertEqual(struct.pack("!I", 0), record[234])
608             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
609             if IP_PROTOS.icmp == ord(record[4]):
610                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
611                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
612                                  record[227])
613             elif IP_PROTOS.tcp == ord(record[4]):
614                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
615                                  record[7])
616                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
617                                  record[227])
618             elif IP_PROTOS.udp == ord(record[4]):
619                 self.assertEqual(struct.pack("!H", self.udp_port_in),
620                                  record[7])
621                 self.assertEqual(struct.pack("!H", self.udp_port_out),
622                                  record[227])
623             else:
624                 self.fail("Invalid protocol")
625         self.assertEqual(3, nat44_ses_create_num)
626         self.assertEqual(3, nat44_ses_delete_num)
627
628     def verify_ipfix_addr_exhausted(self, data):
629         """
630         Verify IPFIX NAT addresses event
631
632         :param data: Decoded IPFIX data records
633         """
634         self.assertEqual(1, len(data))
635         record = data[0]
636         # natEvent
637         self.assertEqual(ord(record[230]), 3)
638         # natPoolID
639         self.assertEqual(struct.pack("!I", 0), record[283])
640
641
642 class TestNAT44(MethodHolder):
643     """ NAT44 Test Cases """
644
645     @classmethod
646     def setUpClass(cls):
647         super(TestNAT44, cls).setUpClass()
648
649         try:
650             cls.tcp_port_in = 6303
651             cls.tcp_port_out = 6303
652             cls.udp_port_in = 6304
653             cls.udp_port_out = 6304
654             cls.icmp_id_in = 6305
655             cls.icmp_id_out = 6305
656             cls.nat_addr = '10.0.0.3'
657             cls.ipfix_src_port = 4739
658             cls.ipfix_domain_id = 1
659
660             cls.create_pg_interfaces(range(10))
661             cls.interfaces = list(cls.pg_interfaces[0:4])
662
663             for i in cls.interfaces:
664                 i.admin_up()
665                 i.config_ip4()
666                 i.resolve_arp()
667
668             cls.pg0.generate_remote_hosts(3)
669             cls.pg0.configure_ipv4_neighbors()
670
671             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
672             cls.vapi.ip_table_add_del(10, is_add=1)
673             cls.vapi.ip_table_add_del(20, is_add=1)
674
675             cls.pg4._local_ip4 = "172.16.255.1"
676             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
677             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
678             cls.pg4.set_table_ip4(10)
679             cls.pg5._local_ip4 = "172.17.255.3"
680             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
681             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
682             cls.pg5.set_table_ip4(10)
683             cls.pg6._local_ip4 = "172.16.255.1"
684             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
685             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
686             cls.pg6.set_table_ip4(20)
687             for i in cls.overlapping_interfaces:
688                 i.config_ip4()
689                 i.admin_up()
690                 i.resolve_arp()
691
692             cls.pg7.admin_up()
693             cls.pg8.admin_up()
694
695             cls.pg9.generate_remote_hosts(2)
696             cls.pg9.config_ip4()
697             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
698             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
699                                                   ip_addr_n,
700                                                   24)
701             cls.pg9.admin_up()
702             cls.pg9.resolve_arp()
703             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
704             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
705             cls.pg9.resolve_arp()
706
707         except Exception:
708             super(TestNAT44, cls).tearDownClass()
709             raise
710
711     def clear_nat44(self):
712         """
713         Clear NAT44 configuration.
714         """
715         # I found no elegant way to do this
716         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
717                                    dst_address_length=32,
718                                    next_hop_address=self.pg7.remote_ip4n,
719                                    next_hop_sw_if_index=self.pg7.sw_if_index,
720                                    is_add=0)
721         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
722                                    dst_address_length=32,
723                                    next_hop_address=self.pg8.remote_ip4n,
724                                    next_hop_sw_if_index=self.pg8.sw_if_index,
725                                    is_add=0)
726
727         for intf in [self.pg7, self.pg8]:
728             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
729             for n in neighbors:
730                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
731                                               n.mac_address,
732                                               n.ip_address,
733                                               is_add=0)
734
735         if self.pg7.has_ip4_config:
736             self.pg7.unconfig_ip4()
737
738         interfaces = self.vapi.nat44_interface_addr_dump()
739         for intf in interfaces:
740             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
741                                                twice_nat=intf.twice_nat,
742                                                is_add=0)
743
744         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
745                             domain_id=self.ipfix_domain_id)
746         self.ipfix_src_port = 4739
747         self.ipfix_domain_id = 1
748
749         interfaces = self.vapi.nat44_interface_dump()
750         for intf in interfaces:
751             if intf.is_inside > 1:
752                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
753                                                           0,
754                                                           is_add=0)
755             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
756                                                       intf.is_inside,
757                                                       is_add=0)
758
759         interfaces = self.vapi.nat44_interface_output_feature_dump()
760         for intf in interfaces:
761             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
762                                                              intf.is_inside,
763                                                              is_add=0)
764
765         static_mappings = self.vapi.nat44_static_mapping_dump()
766         for sm in static_mappings:
767             self.vapi.nat44_add_del_static_mapping(
768                 sm.local_ip_address,
769                 sm.external_ip_address,
770                 local_port=sm.local_port,
771                 external_port=sm.external_port,
772                 addr_only=sm.addr_only,
773                 vrf_id=sm.vrf_id,
774                 protocol=sm.protocol,
775                 twice_nat=sm.twice_nat,
776                 is_add=0)
777
778         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
779         for lb_sm in lb_static_mappings:
780             self.vapi.nat44_add_del_lb_static_mapping(
781                 lb_sm.external_addr,
782                 lb_sm.external_port,
783                 lb_sm.protocol,
784                 vrf_id=lb_sm.vrf_id,
785                 twice_nat=lb_sm.twice_nat,
786                 is_add=0,
787                 local_num=0,
788                 locals=[])
789
790         identity_mappings = self.vapi.nat44_identity_mapping_dump()
791         for id_m in identity_mappings:
792             self.vapi.nat44_add_del_identity_mapping(
793                 addr_only=id_m.addr_only,
794                 ip=id_m.ip_address,
795                 port=id_m.port,
796                 sw_if_index=id_m.sw_if_index,
797                 vrf_id=id_m.vrf_id,
798                 protocol=id_m.protocol,
799                 is_add=0)
800
801         adresses = self.vapi.nat44_address_dump()
802         for addr in adresses:
803             self.vapi.nat44_add_del_address_range(addr.ip_address,
804                                                   addr.ip_address,
805                                                   twice_nat=addr.twice_nat,
806                                                   is_add=0)
807
808         self.vapi.nat_set_reass()
809         self.vapi.nat_set_reass(is_ip6=1)
810
811     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
812                                  local_port=0, external_port=0, vrf_id=0,
813                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
814                                  proto=0, twice_nat=0):
815         """
816         Add/delete NAT44 static mapping
817
818         :param local_ip: Local IP address
819         :param external_ip: External IP address
820         :param local_port: Local port number (Optional)
821         :param external_port: External port number (Optional)
822         :param vrf_id: VRF ID (Default 0)
823         :param is_add: 1 if add, 0 if delete (Default add)
824         :param external_sw_if_index: External interface instead of IP address
825         :param proto: IP protocol (Mandatory if port specified)
826         :param twice_nat: 1 if translate external host address and port
827         """
828         addr_only = 1
829         if local_port and external_port:
830             addr_only = 0
831         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
832         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
833         self.vapi.nat44_add_del_static_mapping(
834             l_ip,
835             e_ip,
836             external_sw_if_index,
837             local_port,
838             external_port,
839             addr_only,
840             vrf_id,
841             proto,
842             twice_nat,
843             is_add)
844
845     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
846         """
847         Add/delete NAT44 address
848
849         :param ip: IP address
850         :param is_add: 1 if add, 0 if delete (Default add)
851         :param twice_nat: twice NAT address for extenal hosts
852         """
853         nat_addr = socket.inet_pton(socket.AF_INET, ip)
854         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
855                                               vrf_id=vrf_id,
856                                               twice_nat=twice_nat)
857
858     def test_dynamic(self):
859         """ NAT44 dynamic translation test """
860
861         self.nat44_add_address(self.nat_addr)
862         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
863         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
864                                                   is_inside=0)
865
866         # in2out
867         pkts = self.create_stream_in(self.pg0, self.pg1)
868         self.pg0.add_stream(pkts)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         capture = self.pg1.get_capture(len(pkts))
872         self.verify_capture_out(capture)
873
874         # out2in
875         pkts = self.create_stream_out(self.pg1)
876         self.pg1.add_stream(pkts)
877         self.pg_enable_capture(self.pg_interfaces)
878         self.pg_start()
879         capture = self.pg0.get_capture(len(pkts))
880         self.verify_capture_in(capture, self.pg0)
881
882     def test_dynamic_icmp_errors_in2out_ttl_1(self):
883         """ NAT44 handling of client packets with TTL=1 """
884
885         self.nat44_add_address(self.nat_addr)
886         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
887         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
888                                                   is_inside=0)
889
890         # Client side - generate traffic
891         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
892         self.pg0.add_stream(pkts)
893         self.pg_enable_capture(self.pg_interfaces)
894         self.pg_start()
895
896         # Client side - verify ICMP type 11 packets
897         capture = self.pg0.get_capture(len(pkts))
898         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
899
900     def test_dynamic_icmp_errors_out2in_ttl_1(self):
901         """ NAT44 handling of server packets with TTL=1 """
902
903         self.nat44_add_address(self.nat_addr)
904         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
905         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
906                                                   is_inside=0)
907
908         # Client side - create sessions
909         pkts = self.create_stream_in(self.pg0, self.pg1)
910         self.pg0.add_stream(pkts)
911         self.pg_enable_capture(self.pg_interfaces)
912         self.pg_start()
913
914         # Server side - generate traffic
915         capture = self.pg1.get_capture(len(pkts))
916         self.verify_capture_out(capture)
917         pkts = self.create_stream_out(self.pg1, ttl=1)
918         self.pg1.add_stream(pkts)
919         self.pg_enable_capture(self.pg_interfaces)
920         self.pg_start()
921
922         # Server side - verify ICMP type 11 packets
923         capture = self.pg1.get_capture(len(pkts))
924         self.verify_capture_out_with_icmp_errors(capture,
925                                                  src_ip=self.pg1.local_ip4)
926
927     def test_dynamic_icmp_errors_in2out_ttl_2(self):
928         """ NAT44 handling of error responses to client packets with TTL=2 """
929
930         self.nat44_add_address(self.nat_addr)
931         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
932         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
933                                                   is_inside=0)
934
935         # Client side - generate traffic
936         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
937         self.pg0.add_stream(pkts)
938         self.pg_enable_capture(self.pg_interfaces)
939         self.pg_start()
940
941         # Server side - simulate ICMP type 11 response
942         capture = self.pg1.get_capture(len(pkts))
943         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
944                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
945                 ICMP(type=11) / packet[IP] for packet in capture]
946         self.pg1.add_stream(pkts)
947         self.pg_enable_capture(self.pg_interfaces)
948         self.pg_start()
949
950         # Client side - verify ICMP type 11 packets
951         capture = self.pg0.get_capture(len(pkts))
952         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
953
954     def test_dynamic_icmp_errors_out2in_ttl_2(self):
955         """ NAT44 handling of error responses to server packets with TTL=2 """
956
957         self.nat44_add_address(self.nat_addr)
958         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
959         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
960                                                   is_inside=0)
961
962         # Client side - create sessions
963         pkts = self.create_stream_in(self.pg0, self.pg1)
964         self.pg0.add_stream(pkts)
965         self.pg_enable_capture(self.pg_interfaces)
966         self.pg_start()
967
968         # Server side - generate traffic
969         capture = self.pg1.get_capture(len(pkts))
970         self.verify_capture_out(capture)
971         pkts = self.create_stream_out(self.pg1, ttl=2)
972         self.pg1.add_stream(pkts)
973         self.pg_enable_capture(self.pg_interfaces)
974         self.pg_start()
975
976         # Client side - simulate ICMP type 11 response
977         capture = self.pg0.get_capture(len(pkts))
978         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
979                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
980                 ICMP(type=11) / packet[IP] for packet in capture]
981         self.pg0.add_stream(pkts)
982         self.pg_enable_capture(self.pg_interfaces)
983         self.pg_start()
984
985         # Server side - verify ICMP type 11 packets
986         capture = self.pg1.get_capture(len(pkts))
987         self.verify_capture_out_with_icmp_errors(capture)
988
989     def test_ping_out_interface_from_outside(self):
990         """ Ping NAT44 out interface from outside network """
991
992         self.nat44_add_address(self.nat_addr)
993         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
994         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
995                                                   is_inside=0)
996
997         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
998              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
999              ICMP(id=self.icmp_id_out, type='echo-request'))
1000         pkts = [p]
1001         self.pg1.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture = self.pg1.get_capture(len(pkts))
1005         self.assertEqual(1, len(capture))
1006         packet = capture[0]
1007         try:
1008             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1009             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1010             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1011             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1012         except:
1013             self.logger.error(ppp("Unexpected or invalid packet "
1014                                   "(outside network):", packet))
1015             raise
1016
1017     def test_ping_internal_host_from_outside(self):
1018         """ Ping internal host from outside network """
1019
1020         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1021         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1022         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1023                                                   is_inside=0)
1024
1025         # out2in
1026         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1027                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1028                ICMP(id=self.icmp_id_out, type='echo-request'))
1029         self.pg1.add_stream(pkt)
1030         self.pg_enable_capture(self.pg_interfaces)
1031         self.pg_start()
1032         capture = self.pg0.get_capture(1)
1033         self.verify_capture_in(capture, self.pg0, packet_num=1)
1034         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1035
1036         # in2out
1037         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1038                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1039                ICMP(id=self.icmp_id_in, type='echo-reply'))
1040         self.pg0.add_stream(pkt)
1041         self.pg_enable_capture(self.pg_interfaces)
1042         self.pg_start()
1043         capture = self.pg1.get_capture(1)
1044         self.verify_capture_out(capture, same_port=True, packet_num=1)
1045         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1046
1047     def test_static_in(self):
1048         """ 1:1 NAT initialized from inside network """
1049
1050         nat_ip = "10.0.0.10"
1051         self.tcp_port_out = 6303
1052         self.udp_port_out = 6304
1053         self.icmp_id_out = 6305
1054
1055         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1056         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1057         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1058                                                   is_inside=0)
1059
1060         # in2out
1061         pkts = self.create_stream_in(self.pg0, self.pg1)
1062         self.pg0.add_stream(pkts)
1063         self.pg_enable_capture(self.pg_interfaces)
1064         self.pg_start()
1065         capture = self.pg1.get_capture(len(pkts))
1066         self.verify_capture_out(capture, nat_ip, True)
1067
1068         # out2in
1069         pkts = self.create_stream_out(self.pg1, nat_ip)
1070         self.pg1.add_stream(pkts)
1071         self.pg_enable_capture(self.pg_interfaces)
1072         self.pg_start()
1073         capture = self.pg0.get_capture(len(pkts))
1074         self.verify_capture_in(capture, self.pg0)
1075
1076     def test_static_out(self):
1077         """ 1:1 NAT initialized from outside network """
1078
1079         nat_ip = "10.0.0.20"
1080         self.tcp_port_out = 6303
1081         self.udp_port_out = 6304
1082         self.icmp_id_out = 6305
1083
1084         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1085         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1086         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1087                                                   is_inside=0)
1088
1089         # out2in
1090         pkts = self.create_stream_out(self.pg1, nat_ip)
1091         self.pg1.add_stream(pkts)
1092         self.pg_enable_capture(self.pg_interfaces)
1093         self.pg_start()
1094         capture = self.pg0.get_capture(len(pkts))
1095         self.verify_capture_in(capture, self.pg0)
1096
1097         # in2out
1098         pkts = self.create_stream_in(self.pg0, self.pg1)
1099         self.pg0.add_stream(pkts)
1100         self.pg_enable_capture(self.pg_interfaces)
1101         self.pg_start()
1102         capture = self.pg1.get_capture(len(pkts))
1103         self.verify_capture_out(capture, nat_ip, True)
1104
1105     def test_static_with_port_in(self):
1106         """ 1:1 NAPT initialized from inside network """
1107
1108         self.tcp_port_out = 3606
1109         self.udp_port_out = 3607
1110         self.icmp_id_out = 3608
1111
1112         self.nat44_add_address(self.nat_addr)
1113         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1114                                       self.tcp_port_in, self.tcp_port_out,
1115                                       proto=IP_PROTOS.tcp)
1116         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1117                                       self.udp_port_in, self.udp_port_out,
1118                                       proto=IP_PROTOS.udp)
1119         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1120                                       self.icmp_id_in, self.icmp_id_out,
1121                                       proto=IP_PROTOS.icmp)
1122         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124                                                   is_inside=0)
1125
1126         # in2out
1127         pkts = self.create_stream_in(self.pg0, self.pg1)
1128         self.pg0.add_stream(pkts)
1129         self.pg_enable_capture(self.pg_interfaces)
1130         self.pg_start()
1131         capture = self.pg1.get_capture(len(pkts))
1132         self.verify_capture_out(capture)
1133
1134         # out2in
1135         pkts = self.create_stream_out(self.pg1)
1136         self.pg1.add_stream(pkts)
1137         self.pg_enable_capture(self.pg_interfaces)
1138         self.pg_start()
1139         capture = self.pg0.get_capture(len(pkts))
1140         self.verify_capture_in(capture, self.pg0)
1141
1142     def test_static_with_port_out(self):
1143         """ 1:1 NAPT initialized from outside network """
1144
1145         self.tcp_port_out = 30606
1146         self.udp_port_out = 30607
1147         self.icmp_id_out = 30608
1148
1149         self.nat44_add_address(self.nat_addr)
1150         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1151                                       self.tcp_port_in, self.tcp_port_out,
1152                                       proto=IP_PROTOS.tcp)
1153         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1154                                       self.udp_port_in, self.udp_port_out,
1155                                       proto=IP_PROTOS.udp)
1156         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1157                                       self.icmp_id_in, self.icmp_id_out,
1158                                       proto=IP_PROTOS.icmp)
1159         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1160         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1161                                                   is_inside=0)
1162
1163         # out2in
1164         pkts = self.create_stream_out(self.pg1)
1165         self.pg1.add_stream(pkts)
1166         self.pg_enable_capture(self.pg_interfaces)
1167         self.pg_start()
1168         capture = self.pg0.get_capture(len(pkts))
1169         self.verify_capture_in(capture, self.pg0)
1170
1171         # in2out
1172         pkts = self.create_stream_in(self.pg0, self.pg1)
1173         self.pg0.add_stream(pkts)
1174         self.pg_enable_capture(self.pg_interfaces)
1175         self.pg_start()
1176         capture = self.pg1.get_capture(len(pkts))
1177         self.verify_capture_out(capture)
1178
1179     def test_static_vrf_aware(self):
1180         """ 1:1 NAT VRF awareness """
1181
1182         nat_ip1 = "10.0.0.30"
1183         nat_ip2 = "10.0.0.40"
1184         self.tcp_port_out = 6303
1185         self.udp_port_out = 6304
1186         self.icmp_id_out = 6305
1187
1188         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1189                                       vrf_id=10)
1190         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1191                                       vrf_id=10)
1192         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1193                                                   is_inside=0)
1194         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1195         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1196
1197         # inside interface VRF match NAT44 static mapping VRF
1198         pkts = self.create_stream_in(self.pg4, self.pg3)
1199         self.pg4.add_stream(pkts)
1200         self.pg_enable_capture(self.pg_interfaces)
1201         self.pg_start()
1202         capture = self.pg3.get_capture(len(pkts))
1203         self.verify_capture_out(capture, nat_ip1, True)
1204
1205         # inside interface VRF don't match NAT44 static mapping VRF (packets
1206         # are dropped)
1207         pkts = self.create_stream_in(self.pg0, self.pg3)
1208         self.pg0.add_stream(pkts)
1209         self.pg_enable_capture(self.pg_interfaces)
1210         self.pg_start()
1211         self.pg3.assert_nothing_captured()
1212
1213     def test_identity_nat(self):
1214         """ Identity NAT """
1215
1216         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1217         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1218         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1219                                                   is_inside=0)
1220
1221         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1222              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1223              TCP(sport=12345, dport=56789))
1224         self.pg1.add_stream(p)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227         capture = self.pg0.get_capture(1)
1228         p = capture[0]
1229         try:
1230             ip = p[IP]
1231             tcp = p[TCP]
1232             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1233             self.assertEqual(ip.src, self.pg1.remote_ip4)
1234             self.assertEqual(tcp.dport, 56789)
1235             self.assertEqual(tcp.sport, 12345)
1236             self.check_tcp_checksum(p)
1237             self.check_ip_checksum(p)
1238         except:
1239             self.logger.error(ppp("Unexpected or invalid packet:", p))
1240             raise
1241
1242     def test_static_lb(self):
1243         """ NAT44 local service load balancing """
1244         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1245         external_port = 80
1246         local_port = 8080
1247         server1 = self.pg0.remote_hosts[0]
1248         server2 = self.pg0.remote_hosts[1]
1249
1250         locals = [{'addr': server1.ip4n,
1251                    'port': local_port,
1252                    'probability': 70},
1253                   {'addr': server2.ip4n,
1254                    'port': local_port,
1255                    'probability': 30}]
1256
1257         self.nat44_add_address(self.nat_addr)
1258         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1259                                                   external_port,
1260                                                   IP_PROTOS.tcp,
1261                                                   local_num=len(locals),
1262                                                   locals=locals)
1263         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1264         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1265                                                   is_inside=0)
1266
1267         # from client to service
1268         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1269              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1270              TCP(sport=12345, dport=external_port))
1271         self.pg1.add_stream(p)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg0.get_capture(1)
1275         p = capture[0]
1276         server = None
1277         try:
1278             ip = p[IP]
1279             tcp = p[TCP]
1280             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1281             if ip.dst == server1.ip4:
1282                 server = server1
1283             else:
1284                 server = server2
1285             self.assertEqual(tcp.dport, local_port)
1286             self.check_tcp_checksum(p)
1287             self.check_ip_checksum(p)
1288         except:
1289             self.logger.error(ppp("Unexpected or invalid packet:", p))
1290             raise
1291
1292         # from service back to client
1293         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1294              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1295              TCP(sport=local_port, dport=12345))
1296         self.pg0.add_stream(p)
1297         self.pg_enable_capture(self.pg_interfaces)
1298         self.pg_start()
1299         capture = self.pg1.get_capture(1)
1300         p = capture[0]
1301         try:
1302             ip = p[IP]
1303             tcp = p[TCP]
1304             self.assertEqual(ip.src, self.nat_addr)
1305             self.assertEqual(tcp.sport, external_port)
1306             self.check_tcp_checksum(p)
1307             self.check_ip_checksum(p)
1308         except:
1309             self.logger.error(ppp("Unexpected or invalid packet:", p))
1310             raise
1311
1312         # multiple clients
1313         server1_n = 0
1314         server2_n = 0
1315         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1316         pkts = []
1317         for client in clients:
1318             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1319                  IP(src=client, dst=self.nat_addr) /
1320                  TCP(sport=12345, dport=external_port))
1321             pkts.append(p)
1322         self.pg1.add_stream(pkts)
1323         self.pg_enable_capture(self.pg_interfaces)
1324         self.pg_start()
1325         capture = self.pg0.get_capture(len(pkts))
1326         for p in capture:
1327             if p[IP].dst == server1.ip4:
1328                 server1_n += 1
1329             else:
1330                 server2_n += 1
1331         self.assertTrue(server1_n > server2_n)
1332
1333     def test_multiple_inside_interfaces(self):
1334         """ NAT44 multiple non-overlapping address space inside interfaces """
1335
1336         self.nat44_add_address(self.nat_addr)
1337         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1338         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1339         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1340                                                   is_inside=0)
1341
1342         # between two NAT44 inside interfaces (no translation)
1343         pkts = self.create_stream_in(self.pg0, self.pg1)
1344         self.pg0.add_stream(pkts)
1345         self.pg_enable_capture(self.pg_interfaces)
1346         self.pg_start()
1347         capture = self.pg1.get_capture(len(pkts))
1348         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1349
1350         # from NAT44 inside to interface without NAT44 feature (no translation)
1351         pkts = self.create_stream_in(self.pg0, self.pg2)
1352         self.pg0.add_stream(pkts)
1353         self.pg_enable_capture(self.pg_interfaces)
1354         self.pg_start()
1355         capture = self.pg2.get_capture(len(pkts))
1356         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1357
1358         # in2out 1st interface
1359         pkts = self.create_stream_in(self.pg0, self.pg3)
1360         self.pg0.add_stream(pkts)
1361         self.pg_enable_capture(self.pg_interfaces)
1362         self.pg_start()
1363         capture = self.pg3.get_capture(len(pkts))
1364         self.verify_capture_out(capture)
1365
1366         # out2in 1st interface
1367         pkts = self.create_stream_out(self.pg3)
1368         self.pg3.add_stream(pkts)
1369         self.pg_enable_capture(self.pg_interfaces)
1370         self.pg_start()
1371         capture = self.pg0.get_capture(len(pkts))
1372         self.verify_capture_in(capture, self.pg0)
1373
1374         # in2out 2nd interface
1375         pkts = self.create_stream_in(self.pg1, self.pg3)
1376         self.pg1.add_stream(pkts)
1377         self.pg_enable_capture(self.pg_interfaces)
1378         self.pg_start()
1379         capture = self.pg3.get_capture(len(pkts))
1380         self.verify_capture_out(capture)
1381
1382         # out2in 2nd interface
1383         pkts = self.create_stream_out(self.pg3)
1384         self.pg3.add_stream(pkts)
1385         self.pg_enable_capture(self.pg_interfaces)
1386         self.pg_start()
1387         capture = self.pg1.get_capture(len(pkts))
1388         self.verify_capture_in(capture, self.pg1)
1389
1390     def test_inside_overlapping_interfaces(self):
1391         """ NAT44 multiple inside interfaces with overlapping address space """
1392
1393         static_nat_ip = "10.0.0.10"
1394         self.nat44_add_address(self.nat_addr)
1395         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1396                                                   is_inside=0)
1397         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1398         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1399         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1400         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1401                                       vrf_id=20)
1402
1403         # between NAT44 inside interfaces with same VRF (no translation)
1404         pkts = self.create_stream_in(self.pg4, self.pg5)
1405         self.pg4.add_stream(pkts)
1406         self.pg_enable_capture(self.pg_interfaces)
1407         self.pg_start()
1408         capture = self.pg5.get_capture(len(pkts))
1409         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1410
1411         # between NAT44 inside interfaces with different VRF (hairpinning)
1412         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1413              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1414              TCP(sport=1234, dport=5678))
1415         self.pg4.add_stream(p)
1416         self.pg_enable_capture(self.pg_interfaces)
1417         self.pg_start()
1418         capture = self.pg6.get_capture(1)
1419         p = capture[0]
1420         try:
1421             ip = p[IP]
1422             tcp = p[TCP]
1423             self.assertEqual(ip.src, self.nat_addr)
1424             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1425             self.assertNotEqual(tcp.sport, 1234)
1426             self.assertEqual(tcp.dport, 5678)
1427         except:
1428             self.logger.error(ppp("Unexpected or invalid packet:", p))
1429             raise
1430
1431         # in2out 1st interface
1432         pkts = self.create_stream_in(self.pg4, self.pg3)
1433         self.pg4.add_stream(pkts)
1434         self.pg_enable_capture(self.pg_interfaces)
1435         self.pg_start()
1436         capture = self.pg3.get_capture(len(pkts))
1437         self.verify_capture_out(capture)
1438
1439         # out2in 1st interface
1440         pkts = self.create_stream_out(self.pg3)
1441         self.pg3.add_stream(pkts)
1442         self.pg_enable_capture(self.pg_interfaces)
1443         self.pg_start()
1444         capture = self.pg4.get_capture(len(pkts))
1445         self.verify_capture_in(capture, self.pg4)
1446
1447         # in2out 2nd interface
1448         pkts = self.create_stream_in(self.pg5, self.pg3)
1449         self.pg5.add_stream(pkts)
1450         self.pg_enable_capture(self.pg_interfaces)
1451         self.pg_start()
1452         capture = self.pg3.get_capture(len(pkts))
1453         self.verify_capture_out(capture)
1454
1455         # out2in 2nd interface
1456         pkts = self.create_stream_out(self.pg3)
1457         self.pg3.add_stream(pkts)
1458         self.pg_enable_capture(self.pg_interfaces)
1459         self.pg_start()
1460         capture = self.pg5.get_capture(len(pkts))
1461         self.verify_capture_in(capture, self.pg5)
1462
1463         # pg5 session dump
1464         addresses = self.vapi.nat44_address_dump()
1465         self.assertEqual(len(addresses), 1)
1466         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1467         self.assertEqual(len(sessions), 3)
1468         for session in sessions:
1469             self.assertFalse(session.is_static)
1470             self.assertEqual(session.inside_ip_address[0:4],
1471                              self.pg5.remote_ip4n)
1472             self.assertEqual(session.outside_ip_address,
1473                              addresses[0].ip_address)
1474         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1475         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1476         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1477         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1478         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1479         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1480         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1481         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1482         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1483
1484         # in2out 3rd interface
1485         pkts = self.create_stream_in(self.pg6, self.pg3)
1486         self.pg6.add_stream(pkts)
1487         self.pg_enable_capture(self.pg_interfaces)
1488         self.pg_start()
1489         capture = self.pg3.get_capture(len(pkts))
1490         self.verify_capture_out(capture, static_nat_ip, True)
1491
1492         # out2in 3rd interface
1493         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1494         self.pg3.add_stream(pkts)
1495         self.pg_enable_capture(self.pg_interfaces)
1496         self.pg_start()
1497         capture = self.pg6.get_capture(len(pkts))
1498         self.verify_capture_in(capture, self.pg6)
1499
1500         # general user and session dump verifications
1501         users = self.vapi.nat44_user_dump()
1502         self.assertTrue(len(users) >= 3)
1503         addresses = self.vapi.nat44_address_dump()
1504         self.assertEqual(len(addresses), 1)
1505         for user in users:
1506             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1507                                                          user.vrf_id)
1508             for session in sessions:
1509                 self.assertEqual(user.ip_address, session.inside_ip_address)
1510                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1511                 self.assertTrue(session.protocol in
1512                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1513                                  IP_PROTOS.icmp])
1514
1515         # pg4 session dump
1516         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1517         self.assertTrue(len(sessions) >= 4)
1518         for session in sessions:
1519             self.assertFalse(session.is_static)
1520             self.assertEqual(session.inside_ip_address[0:4],
1521                              self.pg4.remote_ip4n)
1522             self.assertEqual(session.outside_ip_address,
1523                              addresses[0].ip_address)
1524
1525         # pg6 session dump
1526         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1527         self.assertTrue(len(sessions) >= 3)
1528         for session in sessions:
1529             self.assertTrue(session.is_static)
1530             self.assertEqual(session.inside_ip_address[0:4],
1531                              self.pg6.remote_ip4n)
1532             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1533                              map(int, static_nat_ip.split('.')))
1534             self.assertTrue(session.inside_port in
1535                             [self.tcp_port_in, self.udp_port_in,
1536                              self.icmp_id_in])
1537
1538     def test_hairpinning(self):
1539         """ NAT44 hairpinning - 1:1 NAPT """
1540
1541         host = self.pg0.remote_hosts[0]
1542         server = self.pg0.remote_hosts[1]
1543         host_in_port = 1234
1544         host_out_port = 0
1545         server_in_port = 5678
1546         server_out_port = 8765
1547
1548         self.nat44_add_address(self.nat_addr)
1549         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1550         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1551                                                   is_inside=0)
1552         # add static mapping for server
1553         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1554                                       server_in_port, server_out_port,
1555                                       proto=IP_PROTOS.tcp)
1556
1557         # send packet from host to server
1558         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1559              IP(src=host.ip4, dst=self.nat_addr) /
1560              TCP(sport=host_in_port, dport=server_out_port))
1561         self.pg0.add_stream(p)
1562         self.pg_enable_capture(self.pg_interfaces)
1563         self.pg_start()
1564         capture = self.pg0.get_capture(1)
1565         p = capture[0]
1566         try:
1567             ip = p[IP]
1568             tcp = p[TCP]
1569             self.assertEqual(ip.src, self.nat_addr)
1570             self.assertEqual(ip.dst, server.ip4)
1571             self.assertNotEqual(tcp.sport, host_in_port)
1572             self.assertEqual(tcp.dport, server_in_port)
1573             self.check_tcp_checksum(p)
1574             host_out_port = tcp.sport
1575         except:
1576             self.logger.error(ppp("Unexpected or invalid packet:", p))
1577             raise
1578
1579         # send reply from server to host
1580         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1581              IP(src=server.ip4, dst=self.nat_addr) /
1582              TCP(sport=server_in_port, dport=host_out_port))
1583         self.pg0.add_stream(p)
1584         self.pg_enable_capture(self.pg_interfaces)
1585         self.pg_start()
1586         capture = self.pg0.get_capture(1)
1587         p = capture[0]
1588         try:
1589             ip = p[IP]
1590             tcp = p[TCP]
1591             self.assertEqual(ip.src, self.nat_addr)
1592             self.assertEqual(ip.dst, host.ip4)
1593             self.assertEqual(tcp.sport, server_out_port)
1594             self.assertEqual(tcp.dport, host_in_port)
1595             self.check_tcp_checksum(p)
1596         except:
1597             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1598             raise
1599
1600     def test_hairpinning2(self):
1601         """ NAT44 hairpinning - 1:1 NAT"""
1602
1603         server1_nat_ip = "10.0.0.10"
1604         server2_nat_ip = "10.0.0.11"
1605         host = self.pg0.remote_hosts[0]
1606         server1 = self.pg0.remote_hosts[1]
1607         server2 = self.pg0.remote_hosts[2]
1608         server_tcp_port = 22
1609         server_udp_port = 20
1610
1611         self.nat44_add_address(self.nat_addr)
1612         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1613         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1614                                                   is_inside=0)
1615
1616         # add static mapping for servers
1617         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1618         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1619
1620         # host to server1
1621         pkts = []
1622         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1623              IP(src=host.ip4, dst=server1_nat_ip) /
1624              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1625         pkts.append(p)
1626         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1627              IP(src=host.ip4, dst=server1_nat_ip) /
1628              UDP(sport=self.udp_port_in, dport=server_udp_port))
1629         pkts.append(p)
1630         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1631              IP(src=host.ip4, dst=server1_nat_ip) /
1632              ICMP(id=self.icmp_id_in, type='echo-request'))
1633         pkts.append(p)
1634         self.pg0.add_stream(pkts)
1635         self.pg_enable_capture(self.pg_interfaces)
1636         self.pg_start()
1637         capture = self.pg0.get_capture(len(pkts))
1638         for packet in capture:
1639             try:
1640                 self.assertEqual(packet[IP].src, self.nat_addr)
1641                 self.assertEqual(packet[IP].dst, server1.ip4)
1642                 if packet.haslayer(TCP):
1643                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1644                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1645                     self.tcp_port_out = packet[TCP].sport
1646                     self.check_tcp_checksum(packet)
1647                 elif packet.haslayer(UDP):
1648                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1649                     self.assertEqual(packet[UDP].dport, server_udp_port)
1650                     self.udp_port_out = packet[UDP].sport
1651                 else:
1652                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1653                     self.icmp_id_out = packet[ICMP].id
1654             except:
1655                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1656                 raise
1657
1658         # server1 to host
1659         pkts = []
1660         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1661              IP(src=server1.ip4, dst=self.nat_addr) /
1662              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1663         pkts.append(p)
1664         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1665              IP(src=server1.ip4, dst=self.nat_addr) /
1666              UDP(sport=server_udp_port, dport=self.udp_port_out))
1667         pkts.append(p)
1668         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1669              IP(src=server1.ip4, dst=self.nat_addr) /
1670              ICMP(id=self.icmp_id_out, type='echo-reply'))
1671         pkts.append(p)
1672         self.pg0.add_stream(pkts)
1673         self.pg_enable_capture(self.pg_interfaces)
1674         self.pg_start()
1675         capture = self.pg0.get_capture(len(pkts))
1676         for packet in capture:
1677             try:
1678                 self.assertEqual(packet[IP].src, server1_nat_ip)
1679                 self.assertEqual(packet[IP].dst, host.ip4)
1680                 if packet.haslayer(TCP):
1681                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1682                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1683                     self.check_tcp_checksum(packet)
1684                 elif packet.haslayer(UDP):
1685                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1686                     self.assertEqual(packet[UDP].sport, server_udp_port)
1687                 else:
1688                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1689             except:
1690                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1691                 raise
1692
1693         # server2 to server1
1694         pkts = []
1695         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1696              IP(src=server2.ip4, dst=server1_nat_ip) /
1697              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1698         pkts.append(p)
1699         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1700              IP(src=server2.ip4, dst=server1_nat_ip) /
1701              UDP(sport=self.udp_port_in, dport=server_udp_port))
1702         pkts.append(p)
1703         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1704              IP(src=server2.ip4, dst=server1_nat_ip) /
1705              ICMP(id=self.icmp_id_in, type='echo-request'))
1706         pkts.append(p)
1707         self.pg0.add_stream(pkts)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg0.get_capture(len(pkts))
1711         for packet in capture:
1712             try:
1713                 self.assertEqual(packet[IP].src, server2_nat_ip)
1714                 self.assertEqual(packet[IP].dst, server1.ip4)
1715                 if packet.haslayer(TCP):
1716                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1717                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1718                     self.tcp_port_out = packet[TCP].sport
1719                     self.check_tcp_checksum(packet)
1720                 elif packet.haslayer(UDP):
1721                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1722                     self.assertEqual(packet[UDP].dport, server_udp_port)
1723                     self.udp_port_out = packet[UDP].sport
1724                 else:
1725                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1726                     self.icmp_id_out = packet[ICMP].id
1727             except:
1728                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1729                 raise
1730
1731         # server1 to server2
1732         pkts = []
1733         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1734              IP(src=server1.ip4, dst=server2_nat_ip) /
1735              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1736         pkts.append(p)
1737         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1738              IP(src=server1.ip4, dst=server2_nat_ip) /
1739              UDP(sport=server_udp_port, dport=self.udp_port_out))
1740         pkts.append(p)
1741         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1742              IP(src=server1.ip4, dst=server2_nat_ip) /
1743              ICMP(id=self.icmp_id_out, type='echo-reply'))
1744         pkts.append(p)
1745         self.pg0.add_stream(pkts)
1746         self.pg_enable_capture(self.pg_interfaces)
1747         self.pg_start()
1748         capture = self.pg0.get_capture(len(pkts))
1749         for packet in capture:
1750             try:
1751                 self.assertEqual(packet[IP].src, server1_nat_ip)
1752                 self.assertEqual(packet[IP].dst, server2.ip4)
1753                 if packet.haslayer(TCP):
1754                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1755                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1756                     self.check_tcp_checksum(packet)
1757                 elif packet.haslayer(UDP):
1758                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1759                     self.assertEqual(packet[UDP].sport, server_udp_port)
1760                 else:
1761                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1762             except:
1763                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1764                 raise
1765
1766     def test_max_translations_per_user(self):
1767         """ MAX translations per user - recycle the least recently used """
1768
1769         self.nat44_add_address(self.nat_addr)
1770         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1771         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1772                                                   is_inside=0)
1773
1774         # get maximum number of translations per user
1775         nat44_config = self.vapi.nat_show_config()
1776
1777         # send more than maximum number of translations per user packets
1778         pkts_num = nat44_config.max_translations_per_user + 5
1779         pkts = []
1780         for port in range(0, pkts_num):
1781             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1782                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1783                  TCP(sport=1025 + port))
1784             pkts.append(p)
1785         self.pg0.add_stream(pkts)
1786         self.pg_enable_capture(self.pg_interfaces)
1787         self.pg_start()
1788
1789         # verify number of translated packet
1790         self.pg1.get_capture(pkts_num)
1791
1792     def test_interface_addr(self):
1793         """ Acquire NAT44 addresses from interface """
1794         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1795
1796         # no address in NAT pool
1797         adresses = self.vapi.nat44_address_dump()
1798         self.assertEqual(0, len(adresses))
1799
1800         # configure interface address and check NAT address pool
1801         self.pg7.config_ip4()
1802         adresses = self.vapi.nat44_address_dump()
1803         self.assertEqual(1, len(adresses))
1804         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1805
1806         # remove interface address and check NAT address pool
1807         self.pg7.unconfig_ip4()
1808         adresses = self.vapi.nat44_address_dump()
1809         self.assertEqual(0, len(adresses))
1810
1811     def test_interface_addr_static_mapping(self):
1812         """ Static mapping with addresses from interface """
1813         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1814         self.nat44_add_static_mapping(
1815             '1.2.3.4',
1816             external_sw_if_index=self.pg7.sw_if_index)
1817
1818         # static mappings with external interface
1819         static_mappings = self.vapi.nat44_static_mapping_dump()
1820         self.assertEqual(1, len(static_mappings))
1821         self.assertEqual(self.pg7.sw_if_index,
1822                          static_mappings[0].external_sw_if_index)
1823
1824         # configure interface address and check static mappings
1825         self.pg7.config_ip4()
1826         static_mappings = self.vapi.nat44_static_mapping_dump()
1827         self.assertEqual(1, len(static_mappings))
1828         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1829                          self.pg7.local_ip4n)
1830         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1831
1832         # remove interface address and check static mappings
1833         self.pg7.unconfig_ip4()
1834         static_mappings = self.vapi.nat44_static_mapping_dump()
1835         self.assertEqual(0, len(static_mappings))
1836
1837     def test_interface_addr_identity_nat(self):
1838         """ Identity NAT with addresses from interface """
1839
1840         port = 53053
1841         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1842         self.vapi.nat44_add_del_identity_mapping(
1843             sw_if_index=self.pg7.sw_if_index,
1844             port=port,
1845             protocol=IP_PROTOS.tcp,
1846             addr_only=0)
1847
1848         # identity mappings with external interface
1849         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1850         self.assertEqual(1, len(identity_mappings))
1851         self.assertEqual(self.pg7.sw_if_index,
1852                          identity_mappings[0].sw_if_index)
1853
1854         # configure interface address and check identity mappings
1855         self.pg7.config_ip4()
1856         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1857         self.assertEqual(1, len(identity_mappings))
1858         self.assertEqual(identity_mappings[0].ip_address,
1859                          self.pg7.local_ip4n)
1860         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1861         self.assertEqual(port, identity_mappings[0].port)
1862         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1863
1864         # remove interface address and check identity mappings
1865         self.pg7.unconfig_ip4()
1866         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1867         self.assertEqual(0, len(identity_mappings))
1868
1869     def test_ipfix_nat44_sess(self):
1870         """ IPFIX logging NAT44 session created/delted """
1871         self.ipfix_domain_id = 10
1872         self.ipfix_src_port = 20202
1873         colector_port = 30303
1874         bind_layers(UDP, IPFIX, dport=30303)
1875         self.nat44_add_address(self.nat_addr)
1876         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1878                                                   is_inside=0)
1879         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1880                                      src_address=self.pg3.local_ip4n,
1881                                      path_mtu=512,
1882                                      template_interval=10,
1883                                      collector_port=colector_port)
1884         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1885                             src_port=self.ipfix_src_port)
1886
1887         pkts = self.create_stream_in(self.pg0, self.pg1)
1888         self.pg0.add_stream(pkts)
1889         self.pg_enable_capture(self.pg_interfaces)
1890         self.pg_start()
1891         capture = self.pg1.get_capture(len(pkts))
1892         self.verify_capture_out(capture)
1893         self.nat44_add_address(self.nat_addr, is_add=0)
1894         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1895         capture = self.pg3.get_capture(3)
1896         ipfix = IPFIXDecoder()
1897         # first load template
1898         for p in capture:
1899             self.assertTrue(p.haslayer(IPFIX))
1900             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1901             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1902             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1903             self.assertEqual(p[UDP].dport, colector_port)
1904             self.assertEqual(p[IPFIX].observationDomainID,
1905                              self.ipfix_domain_id)
1906             if p.haslayer(Template):
1907                 ipfix.add_template(p.getlayer(Template))
1908         # verify events in data set
1909         for p in capture:
1910             if p.haslayer(Data):
1911                 data = ipfix.decode_data_set(p.getlayer(Set))
1912                 self.verify_ipfix_nat44_ses(data)
1913
1914     def test_ipfix_addr_exhausted(self):
1915         """ IPFIX logging NAT addresses exhausted """
1916         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1917         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1918                                                   is_inside=0)
1919         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1920                                      src_address=self.pg3.local_ip4n,
1921                                      path_mtu=512,
1922                                      template_interval=10)
1923         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1924                             src_port=self.ipfix_src_port)
1925
1926         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1927              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1928              TCP(sport=3025))
1929         self.pg0.add_stream(p)
1930         self.pg_enable_capture(self.pg_interfaces)
1931         self.pg_start()
1932         capture = self.pg1.get_capture(0)
1933         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1934         capture = self.pg3.get_capture(3)
1935         ipfix = IPFIXDecoder()
1936         # first load template
1937         for p in capture:
1938             self.assertTrue(p.haslayer(IPFIX))
1939             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1940             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1941             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1942             self.assertEqual(p[UDP].dport, 4739)
1943             self.assertEqual(p[IPFIX].observationDomainID,
1944                              self.ipfix_domain_id)
1945             if p.haslayer(Template):
1946                 ipfix.add_template(p.getlayer(Template))
1947         # verify events in data set
1948         for p in capture:
1949             if p.haslayer(Data):
1950                 data = ipfix.decode_data_set(p.getlayer(Set))
1951                 self.verify_ipfix_addr_exhausted(data)
1952
1953     def test_pool_addr_fib(self):
1954         """ NAT44 add pool addresses to FIB """
1955         static_addr = '10.0.0.10'
1956         self.nat44_add_address(self.nat_addr)
1957         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1958         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1959                                                   is_inside=0)
1960         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1961
1962         # NAT44 address
1963         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1964              ARP(op=ARP.who_has, pdst=self.nat_addr,
1965                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1966         self.pg1.add_stream(p)
1967         self.pg_enable_capture(self.pg_interfaces)
1968         self.pg_start()
1969         capture = self.pg1.get_capture(1)
1970         self.assertTrue(capture[0].haslayer(ARP))
1971         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1972
1973         # 1:1 NAT address
1974         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1975              ARP(op=ARP.who_has, pdst=static_addr,
1976                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1977         self.pg1.add_stream(p)
1978         self.pg_enable_capture(self.pg_interfaces)
1979         self.pg_start()
1980         capture = self.pg1.get_capture(1)
1981         self.assertTrue(capture[0].haslayer(ARP))
1982         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1983
1984         # send ARP to non-NAT44 interface
1985         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1986              ARP(op=ARP.who_has, pdst=self.nat_addr,
1987                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1988         self.pg2.add_stream(p)
1989         self.pg_enable_capture(self.pg_interfaces)
1990         self.pg_start()
1991         capture = self.pg1.get_capture(0)
1992
1993         # remove addresses and verify
1994         self.nat44_add_address(self.nat_addr, is_add=0)
1995         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1996                                       is_add=0)
1997
1998         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1999              ARP(op=ARP.who_has, pdst=self.nat_addr,
2000                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2001         self.pg1.add_stream(p)
2002         self.pg_enable_capture(self.pg_interfaces)
2003         self.pg_start()
2004         capture = self.pg1.get_capture(0)
2005
2006         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2007              ARP(op=ARP.who_has, pdst=static_addr,
2008                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2009         self.pg1.add_stream(p)
2010         self.pg_enable_capture(self.pg_interfaces)
2011         self.pg_start()
2012         capture = self.pg1.get_capture(0)
2013
2014     def test_vrf_mode(self):
2015         """ NAT44 tenant VRF aware address pool mode """
2016
2017         vrf_id1 = 1
2018         vrf_id2 = 2
2019         nat_ip1 = "10.0.0.10"
2020         nat_ip2 = "10.0.0.11"
2021
2022         self.pg0.unconfig_ip4()
2023         self.pg1.unconfig_ip4()
2024         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2025         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2026         self.pg0.set_table_ip4(vrf_id1)
2027         self.pg1.set_table_ip4(vrf_id2)
2028         self.pg0.config_ip4()
2029         self.pg1.config_ip4()
2030
2031         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2032         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2033         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2034         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2035         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2036                                                   is_inside=0)
2037
2038         # first VRF
2039         pkts = self.create_stream_in(self.pg0, self.pg2)
2040         self.pg0.add_stream(pkts)
2041         self.pg_enable_capture(self.pg_interfaces)
2042         self.pg_start()
2043         capture = self.pg2.get_capture(len(pkts))
2044         self.verify_capture_out(capture, nat_ip1)
2045
2046         # second VRF
2047         pkts = self.create_stream_in(self.pg1, self.pg2)
2048         self.pg1.add_stream(pkts)
2049         self.pg_enable_capture(self.pg_interfaces)
2050         self.pg_start()
2051         capture = self.pg2.get_capture(len(pkts))
2052         self.verify_capture_out(capture, nat_ip2)
2053
2054         self.pg0.unconfig_ip4()
2055         self.pg1.unconfig_ip4()
2056         self.pg0.set_table_ip4(0)
2057         self.pg1.set_table_ip4(0)
2058         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2059         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2060
2061     def test_vrf_feature_independent(self):
2062         """ NAT44 tenant VRF independent address pool mode """
2063
2064         nat_ip1 = "10.0.0.10"
2065         nat_ip2 = "10.0.0.11"
2066
2067         self.nat44_add_address(nat_ip1)
2068         self.nat44_add_address(nat_ip2, vrf_id=99)
2069         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2070         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2071         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2072                                                   is_inside=0)
2073
2074         # first VRF
2075         pkts = self.create_stream_in(self.pg0, self.pg2)
2076         self.pg0.add_stream(pkts)
2077         self.pg_enable_capture(self.pg_interfaces)
2078         self.pg_start()
2079         capture = self.pg2.get_capture(len(pkts))
2080         self.verify_capture_out(capture, nat_ip1)
2081
2082         # second VRF
2083         pkts = self.create_stream_in(self.pg1, self.pg2)
2084         self.pg1.add_stream(pkts)
2085         self.pg_enable_capture(self.pg_interfaces)
2086         self.pg_start()
2087         capture = self.pg2.get_capture(len(pkts))
2088         self.verify_capture_out(capture, nat_ip1)
2089
2090     def test_dynamic_ipless_interfaces(self):
2091         """ NAT44 interfaces without configured IP address """
2092
2093         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2094                                       mactobinary(self.pg7.remote_mac),
2095                                       self.pg7.remote_ip4n,
2096                                       is_static=1)
2097         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2098                                       mactobinary(self.pg8.remote_mac),
2099                                       self.pg8.remote_ip4n,
2100                                       is_static=1)
2101
2102         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2103                                    dst_address_length=32,
2104                                    next_hop_address=self.pg7.remote_ip4n,
2105                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2106         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2107                                    dst_address_length=32,
2108                                    next_hop_address=self.pg8.remote_ip4n,
2109                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2110
2111         self.nat44_add_address(self.nat_addr)
2112         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2113         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2114                                                   is_inside=0)
2115
2116         # in2out
2117         pkts = self.create_stream_in(self.pg7, self.pg8)
2118         self.pg7.add_stream(pkts)
2119         self.pg_enable_capture(self.pg_interfaces)
2120         self.pg_start()
2121         capture = self.pg8.get_capture(len(pkts))
2122         self.verify_capture_out(capture)
2123
2124         # out2in
2125         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2126         self.pg8.add_stream(pkts)
2127         self.pg_enable_capture(self.pg_interfaces)
2128         self.pg_start()
2129         capture = self.pg7.get_capture(len(pkts))
2130         self.verify_capture_in(capture, self.pg7)
2131
2132     def test_static_ipless_interfaces(self):
2133         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2134
2135         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2136                                       mactobinary(self.pg7.remote_mac),
2137                                       self.pg7.remote_ip4n,
2138                                       is_static=1)
2139         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2140                                       mactobinary(self.pg8.remote_mac),
2141                                       self.pg8.remote_ip4n,
2142                                       is_static=1)
2143
2144         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2145                                    dst_address_length=32,
2146                                    next_hop_address=self.pg7.remote_ip4n,
2147                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2148         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2149                                    dst_address_length=32,
2150                                    next_hop_address=self.pg8.remote_ip4n,
2151                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2152
2153         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2154         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2155         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2156                                                   is_inside=0)
2157
2158         # out2in
2159         pkts = self.create_stream_out(self.pg8)
2160         self.pg8.add_stream(pkts)
2161         self.pg_enable_capture(self.pg_interfaces)
2162         self.pg_start()
2163         capture = self.pg7.get_capture(len(pkts))
2164         self.verify_capture_in(capture, self.pg7)
2165
2166         # in2out
2167         pkts = self.create_stream_in(self.pg7, self.pg8)
2168         self.pg7.add_stream(pkts)
2169         self.pg_enable_capture(self.pg_interfaces)
2170         self.pg_start()
2171         capture = self.pg8.get_capture(len(pkts))
2172         self.verify_capture_out(capture, self.nat_addr, True)
2173
2174     def test_static_with_port_ipless_interfaces(self):
2175         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2176
2177         self.tcp_port_out = 30606
2178         self.udp_port_out = 30607
2179         self.icmp_id_out = 30608
2180
2181         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2182                                       mactobinary(self.pg7.remote_mac),
2183                                       self.pg7.remote_ip4n,
2184                                       is_static=1)
2185         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2186                                       mactobinary(self.pg8.remote_mac),
2187                                       self.pg8.remote_ip4n,
2188                                       is_static=1)
2189
2190         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2191                                    dst_address_length=32,
2192                                    next_hop_address=self.pg7.remote_ip4n,
2193                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2194         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2195                                    dst_address_length=32,
2196                                    next_hop_address=self.pg8.remote_ip4n,
2197                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2198
2199         self.nat44_add_address(self.nat_addr)
2200         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2201                                       self.tcp_port_in, self.tcp_port_out,
2202                                       proto=IP_PROTOS.tcp)
2203         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2204                                       self.udp_port_in, self.udp_port_out,
2205                                       proto=IP_PROTOS.udp)
2206         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2207                                       self.icmp_id_in, self.icmp_id_out,
2208                                       proto=IP_PROTOS.icmp)
2209         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2210         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2211                                                   is_inside=0)
2212
2213         # out2in
2214         pkts = self.create_stream_out(self.pg8)
2215         self.pg8.add_stream(pkts)
2216         self.pg_enable_capture(self.pg_interfaces)
2217         self.pg_start()
2218         capture = self.pg7.get_capture(len(pkts))
2219         self.verify_capture_in(capture, self.pg7)
2220
2221         # in2out
2222         pkts = self.create_stream_in(self.pg7, self.pg8)
2223         self.pg7.add_stream(pkts)
2224         self.pg_enable_capture(self.pg_interfaces)
2225         self.pg_start()
2226         capture = self.pg8.get_capture(len(pkts))
2227         self.verify_capture_out(capture)
2228
2229     def test_static_unknown_proto(self):
2230         """ 1:1 NAT translate packet with unknown protocol """
2231         nat_ip = "10.0.0.10"
2232         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2233         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2234         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2235                                                   is_inside=0)
2236
2237         # in2out
2238         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2240              GRE() /
2241              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2242              TCP(sport=1234, dport=1234))
2243         self.pg0.add_stream(p)
2244         self.pg_enable_capture(self.pg_interfaces)
2245         self.pg_start()
2246         p = self.pg1.get_capture(1)
2247         packet = p[0]
2248         try:
2249             self.assertEqual(packet[IP].src, nat_ip)
2250             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2251             self.assertTrue(packet.haslayer(GRE))
2252             self.check_ip_checksum(packet)
2253         except:
2254             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2255             raise
2256
2257         # out2in
2258         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2259              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2260              GRE() /
2261              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2262              TCP(sport=1234, dport=1234))
2263         self.pg1.add_stream(p)
2264         self.pg_enable_capture(self.pg_interfaces)
2265         self.pg_start()
2266         p = self.pg0.get_capture(1)
2267         packet = p[0]
2268         try:
2269             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2270             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2271             self.assertTrue(packet.haslayer(GRE))
2272             self.check_ip_checksum(packet)
2273         except:
2274             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2275             raise
2276
2277     def test_hairpinning_static_unknown_proto(self):
2278         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2279
2280         host = self.pg0.remote_hosts[0]
2281         server = self.pg0.remote_hosts[1]
2282
2283         host_nat_ip = "10.0.0.10"
2284         server_nat_ip = "10.0.0.11"
2285
2286         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2287         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2288         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2289         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2290                                                   is_inside=0)
2291
2292         # host to server
2293         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2294              IP(src=host.ip4, dst=server_nat_ip) /
2295              GRE() /
2296              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2297              TCP(sport=1234, dport=1234))
2298         self.pg0.add_stream(p)
2299         self.pg_enable_capture(self.pg_interfaces)
2300         self.pg_start()
2301         p = self.pg0.get_capture(1)
2302         packet = p[0]
2303         try:
2304             self.assertEqual(packet[IP].src, host_nat_ip)
2305             self.assertEqual(packet[IP].dst, server.ip4)
2306             self.assertTrue(packet.haslayer(GRE))
2307             self.check_ip_checksum(packet)
2308         except:
2309             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2310             raise
2311
2312         # server to host
2313         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2314              IP(src=server.ip4, dst=host_nat_ip) /
2315              GRE() /
2316              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2317              TCP(sport=1234, dport=1234))
2318         self.pg0.add_stream(p)
2319         self.pg_enable_capture(self.pg_interfaces)
2320         self.pg_start()
2321         p = self.pg0.get_capture(1)
2322         packet = p[0]
2323         try:
2324             self.assertEqual(packet[IP].src, server_nat_ip)
2325             self.assertEqual(packet[IP].dst, host.ip4)
2326             self.assertTrue(packet.haslayer(GRE))
2327             self.check_ip_checksum(packet)
2328         except:
2329             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2330             raise
2331
2332     def test_unknown_proto(self):
2333         """ NAT44 translate packet with unknown protocol """
2334         self.nat44_add_address(self.nat_addr)
2335         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2336         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2337                                                   is_inside=0)
2338
2339         # in2out
2340         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2341              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2342              TCP(sport=self.tcp_port_in, dport=20))
2343         self.pg0.add_stream(p)
2344         self.pg_enable_capture(self.pg_interfaces)
2345         self.pg_start()
2346         p = self.pg1.get_capture(1)
2347
2348         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2349              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2350              GRE() /
2351              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2352              TCP(sport=1234, dport=1234))
2353         self.pg0.add_stream(p)
2354         self.pg_enable_capture(self.pg_interfaces)
2355         self.pg_start()
2356         p = self.pg1.get_capture(1)
2357         packet = p[0]
2358         try:
2359             self.assertEqual(packet[IP].src, self.nat_addr)
2360             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2361             self.assertTrue(packet.haslayer(GRE))
2362             self.check_ip_checksum(packet)
2363         except:
2364             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2365             raise
2366
2367         # out2in
2368         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2369              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2370              GRE() /
2371              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2372              TCP(sport=1234, dport=1234))
2373         self.pg1.add_stream(p)
2374         self.pg_enable_capture(self.pg_interfaces)
2375         self.pg_start()
2376         p = self.pg0.get_capture(1)
2377         packet = p[0]
2378         try:
2379             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2380             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2381             self.assertTrue(packet.haslayer(GRE))
2382             self.check_ip_checksum(packet)
2383         except:
2384             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2385             raise
2386
2387     def test_hairpinning_unknown_proto(self):
2388         """ NAT44 translate packet with unknown protocol - hairpinning """
2389         host = self.pg0.remote_hosts[0]
2390         server = self.pg0.remote_hosts[1]
2391         host_in_port = 1234
2392         host_out_port = 0
2393         server_in_port = 5678
2394         server_out_port = 8765
2395         server_nat_ip = "10.0.0.11"
2396
2397         self.nat44_add_address(self.nat_addr)
2398         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2399         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2400                                                   is_inside=0)
2401
2402         # add static mapping for server
2403         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2404
2405         # host to server
2406         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2407              IP(src=host.ip4, dst=server_nat_ip) /
2408              TCP(sport=host_in_port, dport=server_out_port))
2409         self.pg0.add_stream(p)
2410         self.pg_enable_capture(self.pg_interfaces)
2411         self.pg_start()
2412         capture = self.pg0.get_capture(1)
2413
2414         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2415              IP(src=host.ip4, dst=server_nat_ip) /
2416              GRE() /
2417              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2418              TCP(sport=1234, dport=1234))
2419         self.pg0.add_stream(p)
2420         self.pg_enable_capture(self.pg_interfaces)
2421         self.pg_start()
2422         p = self.pg0.get_capture(1)
2423         packet = p[0]
2424         try:
2425             self.assertEqual(packet[IP].src, self.nat_addr)
2426             self.assertEqual(packet[IP].dst, server.ip4)
2427             self.assertTrue(packet.haslayer(GRE))
2428             self.check_ip_checksum(packet)
2429         except:
2430             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2431             raise
2432
2433         # server to host
2434         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2435              IP(src=server.ip4, dst=self.nat_addr) /
2436              GRE() /
2437              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2438              TCP(sport=1234, dport=1234))
2439         self.pg0.add_stream(p)
2440         self.pg_enable_capture(self.pg_interfaces)
2441         self.pg_start()
2442         p = self.pg0.get_capture(1)
2443         packet = p[0]
2444         try:
2445             self.assertEqual(packet[IP].src, server_nat_ip)
2446             self.assertEqual(packet[IP].dst, host.ip4)
2447             self.assertTrue(packet.haslayer(GRE))
2448             self.check_ip_checksum(packet)
2449         except:
2450             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2451             raise
2452
2453     def test_output_feature(self):
2454         """ NAT44 interface output feature (in2out postrouting) """
2455         self.nat44_add_address(self.nat_addr)
2456         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2457         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2458         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2459                                                          is_inside=0)
2460
2461         # in2out
2462         pkts = self.create_stream_in(self.pg0, self.pg3)
2463         self.pg0.add_stream(pkts)
2464         self.pg_enable_capture(self.pg_interfaces)
2465         self.pg_start()
2466         capture = self.pg3.get_capture(len(pkts))
2467         self.verify_capture_out(capture)
2468
2469         # out2in
2470         pkts = self.create_stream_out(self.pg3)
2471         self.pg3.add_stream(pkts)
2472         self.pg_enable_capture(self.pg_interfaces)
2473         self.pg_start()
2474         capture = self.pg0.get_capture(len(pkts))
2475         self.verify_capture_in(capture, self.pg0)
2476
2477         # from non-NAT interface to NAT inside interface
2478         pkts = self.create_stream_in(self.pg2, self.pg0)
2479         self.pg2.add_stream(pkts)
2480         self.pg_enable_capture(self.pg_interfaces)
2481         self.pg_start()
2482         capture = self.pg0.get_capture(len(pkts))
2483         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2484
2485     def test_output_feature_vrf_aware(self):
2486         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2487         nat_ip_vrf10 = "10.0.0.10"
2488         nat_ip_vrf20 = "10.0.0.20"
2489
2490         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2491                                    dst_address_length=32,
2492                                    next_hop_address=self.pg3.remote_ip4n,
2493                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2494                                    table_id=10)
2495         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2496                                    dst_address_length=32,
2497                                    next_hop_address=self.pg3.remote_ip4n,
2498                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2499                                    table_id=20)
2500
2501         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2502         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2503         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2504         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2505         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2506                                                          is_inside=0)
2507
2508         # in2out VRF 10
2509         pkts = self.create_stream_in(self.pg4, self.pg3)
2510         self.pg4.add_stream(pkts)
2511         self.pg_enable_capture(self.pg_interfaces)
2512         self.pg_start()
2513         capture = self.pg3.get_capture(len(pkts))
2514         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2515
2516         # out2in VRF 10
2517         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2518         self.pg3.add_stream(pkts)
2519         self.pg_enable_capture(self.pg_interfaces)
2520         self.pg_start()
2521         capture = self.pg4.get_capture(len(pkts))
2522         self.verify_capture_in(capture, self.pg4)
2523
2524         # in2out VRF 20
2525         pkts = self.create_stream_in(self.pg6, self.pg3)
2526         self.pg6.add_stream(pkts)
2527         self.pg_enable_capture(self.pg_interfaces)
2528         self.pg_start()
2529         capture = self.pg3.get_capture(len(pkts))
2530         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2531
2532         # out2in VRF 20
2533         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2534         self.pg3.add_stream(pkts)
2535         self.pg_enable_capture(self.pg_interfaces)
2536         self.pg_start()
2537         capture = self.pg6.get_capture(len(pkts))
2538         self.verify_capture_in(capture, self.pg6)
2539
2540     def test_output_feature_hairpinning(self):
2541         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2542         host = self.pg0.remote_hosts[0]
2543         server = self.pg0.remote_hosts[1]
2544         host_in_port = 1234
2545         host_out_port = 0
2546         server_in_port = 5678
2547         server_out_port = 8765
2548
2549         self.nat44_add_address(self.nat_addr)
2550         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2551         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2552                                                          is_inside=0)
2553
2554         # add static mapping for server
2555         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2556                                       server_in_port, server_out_port,
2557                                       proto=IP_PROTOS.tcp)
2558
2559         # send packet from host to server
2560         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2561              IP(src=host.ip4, dst=self.nat_addr) /
2562              TCP(sport=host_in_port, dport=server_out_port))
2563         self.pg0.add_stream(p)
2564         self.pg_enable_capture(self.pg_interfaces)
2565         self.pg_start()
2566         capture = self.pg0.get_capture(1)
2567         p = capture[0]
2568         try:
2569             ip = p[IP]
2570             tcp = p[TCP]
2571             self.assertEqual(ip.src, self.nat_addr)
2572             self.assertEqual(ip.dst, server.ip4)
2573             self.assertNotEqual(tcp.sport, host_in_port)
2574             self.assertEqual(tcp.dport, server_in_port)
2575             self.check_tcp_checksum(p)
2576             host_out_port = tcp.sport
2577         except:
2578             self.logger.error(ppp("Unexpected or invalid packet:", p))
2579             raise
2580
2581         # send reply from server to host
2582         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2583              IP(src=server.ip4, dst=self.nat_addr) /
2584              TCP(sport=server_in_port, dport=host_out_port))
2585         self.pg0.add_stream(p)
2586         self.pg_enable_capture(self.pg_interfaces)
2587         self.pg_start()
2588         capture = self.pg0.get_capture(1)
2589         p = capture[0]
2590         try:
2591             ip = p[IP]
2592             tcp = p[TCP]
2593             self.assertEqual(ip.src, self.nat_addr)
2594             self.assertEqual(ip.dst, host.ip4)
2595             self.assertEqual(tcp.sport, server_out_port)
2596             self.assertEqual(tcp.dport, host_in_port)
2597             self.check_tcp_checksum(p)
2598         except:
2599             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2600             raise
2601
2602     def test_one_armed_nat44(self):
2603         """ One armed NAT44 """
2604         remote_host = self.pg9.remote_hosts[0]
2605         local_host = self.pg9.remote_hosts[1]
2606         external_port = 0
2607
2608         self.nat44_add_address(self.nat_addr)
2609         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2610         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2611                                                   is_inside=0)
2612
2613         # in2out
2614         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2615              IP(src=local_host.ip4, dst=remote_host.ip4) /
2616              TCP(sport=12345, dport=80))
2617         self.pg9.add_stream(p)
2618         self.pg_enable_capture(self.pg_interfaces)
2619         self.pg_start()
2620         capture = self.pg9.get_capture(1)
2621         p = capture[0]
2622         try:
2623             ip = p[IP]
2624             tcp = p[TCP]
2625             self.assertEqual(ip.src, self.nat_addr)
2626             self.assertEqual(ip.dst, remote_host.ip4)
2627             self.assertNotEqual(tcp.sport, 12345)
2628             external_port = tcp.sport
2629             self.assertEqual(tcp.dport, 80)
2630             self.check_tcp_checksum(p)
2631             self.check_ip_checksum(p)
2632         except:
2633             self.logger.error(ppp("Unexpected or invalid packet:", p))
2634             raise
2635
2636         # out2in
2637         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2638              IP(src=remote_host.ip4, dst=self.nat_addr) /
2639              TCP(sport=80, dport=external_port))
2640         self.pg9.add_stream(p)
2641         self.pg_enable_capture(self.pg_interfaces)
2642         self.pg_start()
2643         capture = self.pg9.get_capture(1)
2644         p = capture[0]
2645         try:
2646             ip = p[IP]
2647             tcp = p[TCP]
2648             self.assertEqual(ip.src, remote_host.ip4)
2649             self.assertEqual(ip.dst, local_host.ip4)
2650             self.assertEqual(tcp.sport, 80)
2651             self.assertEqual(tcp.dport, 12345)
2652             self.check_tcp_checksum(p)
2653             self.check_ip_checksum(p)
2654         except:
2655             self.logger.error(ppp("Unexpected or invalid packet:", p))
2656             raise
2657
2658     def test_del_session(self):
2659         """ Delete NAT44 session """
2660         self.nat44_add_address(self.nat_addr)
2661         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2662         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2663                                                   is_inside=0)
2664
2665         pkts = self.create_stream_in(self.pg0, self.pg1)
2666         self.pg0.add_stream(pkts)
2667         self.pg_enable_capture(self.pg_interfaces)
2668         self.pg_start()
2669         capture = self.pg1.get_capture(len(pkts))
2670
2671         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2672         nsessions = len(sessions)
2673
2674         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2675                                     sessions[0].inside_port,
2676                                     sessions[0].protocol)
2677         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2678                                     sessions[1].outside_port,
2679                                     sessions[1].protocol,
2680                                     is_in=0)
2681
2682         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2683         self.assertEqual(nsessions - len(sessions), 2)
2684
2685     def test_set_get_reass(self):
2686         """ NAT44 set/get virtual fragmentation reassembly """
2687         reas_cfg1 = self.vapi.nat_get_reass()
2688
2689         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2690                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2691                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2692
2693         reas_cfg2 = self.vapi.nat_get_reass()
2694
2695         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2696         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2697         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2698
2699         self.vapi.nat_set_reass(drop_frag=1)
2700         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2701
2702     def test_frag_in_order(self):
2703         """ NAT44 translate fragments arriving in order """
2704         self.nat44_add_address(self.nat_addr)
2705         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2706         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2707                                                   is_inside=0)
2708
2709         data = "A" * 4 + "B" * 16 + "C" * 3
2710         self.tcp_port_in = random.randint(1025, 65535)
2711
2712         reass = self.vapi.nat_reass_dump()
2713         reass_n_start = len(reass)
2714
2715         # in2out
2716         pkts = self.create_stream_frag(self.pg0,
2717                                        self.pg1.remote_ip4,
2718                                        self.tcp_port_in,
2719                                        20,
2720                                        data)
2721         self.pg0.add_stream(pkts)
2722         self.pg_enable_capture(self.pg_interfaces)
2723         self.pg_start()
2724         frags = self.pg1.get_capture(len(pkts))
2725         p = self.reass_frags_and_verify(frags,
2726                                         self.nat_addr,
2727                                         self.pg1.remote_ip4)
2728         self.assertEqual(p[TCP].dport, 20)
2729         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2730         self.tcp_port_out = p[TCP].sport
2731         self.assertEqual(data, p[Raw].load)
2732
2733         # out2in
2734         pkts = self.create_stream_frag(self.pg1,
2735                                        self.nat_addr,
2736                                        20,
2737                                        self.tcp_port_out,
2738                                        data)
2739         self.pg1.add_stream(pkts)
2740         self.pg_enable_capture(self.pg_interfaces)
2741         self.pg_start()
2742         frags = self.pg0.get_capture(len(pkts))
2743         p = self.reass_frags_and_verify(frags,
2744                                         self.pg1.remote_ip4,
2745                                         self.pg0.remote_ip4)
2746         self.assertEqual(p[TCP].sport, 20)
2747         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2748         self.assertEqual(data, p[Raw].load)
2749
2750         reass = self.vapi.nat_reass_dump()
2751         reass_n_end = len(reass)
2752
2753         self.assertEqual(reass_n_end - reass_n_start, 2)
2754
2755     def test_reass_hairpinning(self):
2756         """ NAT44 fragments hairpinning """
2757         host = self.pg0.remote_hosts[0]
2758         server = self.pg0.remote_hosts[1]
2759         host_in_port = random.randint(1025, 65535)
2760         host_out_port = 0
2761         server_in_port = random.randint(1025, 65535)
2762         server_out_port = random.randint(1025, 65535)
2763         data = "A" * 4 + "B" * 16 + "C" * 3
2764
2765         self.nat44_add_address(self.nat_addr)
2766         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2767         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2768                                                   is_inside=0)
2769         # add static mapping for server
2770         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2771                                       server_in_port, server_out_port,
2772                                       proto=IP_PROTOS.tcp)
2773
2774         # send packet from host to server
2775         pkts = self.create_stream_frag(self.pg0,
2776                                        self.nat_addr,
2777                                        host_in_port,
2778                                        server_out_port,
2779                                        data)
2780         self.pg0.add_stream(pkts)
2781         self.pg_enable_capture(self.pg_interfaces)
2782         self.pg_start()
2783         frags = self.pg0.get_capture(len(pkts))
2784         p = self.reass_frags_and_verify(frags,
2785                                         self.nat_addr,
2786                                         server.ip4)
2787         self.assertNotEqual(p[TCP].sport, host_in_port)
2788         self.assertEqual(p[TCP].dport, server_in_port)
2789         self.assertEqual(data, p[Raw].load)
2790
2791     def test_frag_out_of_order(self):
2792         """ NAT44 translate fragments arriving out of order """
2793         self.nat44_add_address(self.nat_addr)
2794         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2796                                                   is_inside=0)
2797
2798         data = "A" * 4 + "B" * 16 + "C" * 3
2799         random.randint(1025, 65535)
2800
2801         # in2out
2802         pkts = self.create_stream_frag(self.pg0,
2803                                        self.pg1.remote_ip4,
2804                                        self.tcp_port_in,
2805                                        20,
2806                                        data)
2807         pkts.reverse()
2808         self.pg0.add_stream(pkts)
2809         self.pg_enable_capture(self.pg_interfaces)
2810         self.pg_start()
2811         frags = self.pg1.get_capture(len(pkts))
2812         p = self.reass_frags_and_verify(frags,
2813                                         self.nat_addr,
2814                                         self.pg1.remote_ip4)
2815         self.assertEqual(p[TCP].dport, 20)
2816         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2817         self.tcp_port_out = p[TCP].sport
2818         self.assertEqual(data, p[Raw].load)
2819
2820         # out2in
2821         pkts = self.create_stream_frag(self.pg1,
2822                                        self.nat_addr,
2823                                        20,
2824                                        self.tcp_port_out,
2825                                        data)
2826         pkts.reverse()
2827         self.pg1.add_stream(pkts)
2828         self.pg_enable_capture(self.pg_interfaces)
2829         self.pg_start()
2830         frags = self.pg0.get_capture(len(pkts))
2831         p = self.reass_frags_and_verify(frags,
2832                                         self.pg1.remote_ip4,
2833                                         self.pg0.remote_ip4)
2834         self.assertEqual(p[TCP].sport, 20)
2835         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2836         self.assertEqual(data, p[Raw].load)
2837
2838     def test_port_restricted(self):
2839         """ Port restricted NAT44 (MAP-E CE) """
2840         self.nat44_add_address(self.nat_addr)
2841         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2842         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2843                                                   is_inside=0)
2844         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
2845                       "psid-offset 6 psid-len 6")
2846
2847         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2848              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2849              TCP(sport=4567, dport=22))
2850         self.pg0.add_stream(p)
2851         self.pg_enable_capture(self.pg_interfaces)
2852         self.pg_start()
2853         capture = self.pg1.get_capture(1)
2854         p = capture[0]
2855         try:
2856             ip = p[IP]
2857             tcp = p[TCP]
2858             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2859             self.assertEqual(ip.src, self.nat_addr)
2860             self.assertEqual(tcp.dport, 22)
2861             self.assertNotEqual(tcp.sport, 4567)
2862             self.assertEqual((tcp.sport >> 6) & 63, 10)
2863             self.check_tcp_checksum(p)
2864             self.check_ip_checksum(p)
2865         except:
2866             self.logger.error(ppp("Unexpected or invalid packet:", p))
2867             raise
2868
2869     def test_twice_nat(self):
2870         """ Twice NAT44 """
2871         twice_nat_addr = '10.0.1.3'
2872         port_in = 8080
2873         port_out = 80
2874         eh_port_out = 4567
2875         eh_port_in = 0
2876         self.nat44_add_address(self.nat_addr)
2877         self.nat44_add_address(twice_nat_addr, twice_nat=1)
2878         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2879                                       port_in, port_out, proto=IP_PROTOS.tcp,
2880                                       twice_nat=1)
2881         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2882         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2883                                                   is_inside=0)
2884
2885         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2886              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2887              TCP(sport=eh_port_out, dport=port_out))
2888         self.pg1.add_stream(p)
2889         self.pg_enable_capture(self.pg_interfaces)
2890         self.pg_start()
2891         capture = self.pg0.get_capture(1)
2892         p = capture[0]
2893         try:
2894             ip = p[IP]
2895             tcp = p[TCP]
2896             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2897             self.assertEqual(ip.src, twice_nat_addr)
2898             self.assertEqual(tcp.dport, port_in)
2899             self.assertNotEqual(tcp.sport, eh_port_out)
2900             eh_port_in = tcp.sport
2901             self.check_tcp_checksum(p)
2902             self.check_ip_checksum(p)
2903         except:
2904             self.logger.error(ppp("Unexpected or invalid packet:", p))
2905             raise
2906
2907         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2908              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
2909              TCP(sport=port_in, dport=eh_port_in))
2910         self.pg0.add_stream(p)
2911         self.pg_enable_capture(self.pg_interfaces)
2912         self.pg_start()
2913         capture = self.pg1.get_capture(1)
2914         p = capture[0]
2915         try:
2916             ip = p[IP]
2917             tcp = p[TCP]
2918             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2919             self.assertEqual(ip.src, self.nat_addr)
2920             self.assertEqual(tcp.dport, eh_port_out)
2921             self.assertEqual(tcp.sport, port_out)
2922             self.check_tcp_checksum(p)
2923             self.check_ip_checksum(p)
2924         except:
2925             self.logger.error(ppp("Unexpected or invalid packet:", p))
2926             raise
2927
2928     def test_twice_nat_lb(self):
2929         """ Twice NAT44 local service load balancing """
2930         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
2931         twice_nat_addr = '10.0.1.3'
2932         local_port = 8080
2933         external_port = 80
2934         eh_port_out = 4567
2935         eh_port_in = 0
2936         server1 = self.pg0.remote_hosts[0]
2937         server2 = self.pg0.remote_hosts[1]
2938
2939         locals = [{'addr': server1.ip4n,
2940                    'port': local_port,
2941                    'probability': 50},
2942                   {'addr': server2.ip4n,
2943                    'port': local_port,
2944                    'probability': 50}]
2945
2946         self.nat44_add_address(self.nat_addr)
2947         self.nat44_add_address(twice_nat_addr, twice_nat=1)
2948
2949         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
2950                                                   external_port,
2951                                                   IP_PROTOS.tcp,
2952                                                   twice_nat=1,
2953                                                   local_num=len(locals),
2954                                                   locals=locals)
2955         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2956         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2957                                                   is_inside=0)
2958
2959         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2960              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2961              TCP(sport=eh_port_out, dport=external_port))
2962         self.pg1.add_stream(p)
2963         self.pg_enable_capture(self.pg_interfaces)
2964         self.pg_start()
2965         capture = self.pg0.get_capture(1)
2966         p = capture[0]
2967         server = None
2968         try:
2969             ip = p[IP]
2970             tcp = p[TCP]
2971             self.assertEqual(ip.src, twice_nat_addr)
2972             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
2973             if ip.dst == server1.ip4:
2974                 server = server1
2975             else:
2976                 server = server2
2977             self.assertNotEqual(tcp.sport, eh_port_out)
2978             eh_port_in = tcp.sport
2979             self.assertEqual(tcp.dport, local_port)
2980             self.check_tcp_checksum(p)
2981             self.check_ip_checksum(p)
2982         except:
2983             self.logger.error(ppp("Unexpected or invalid packet:", p))
2984             raise
2985
2986         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2987              IP(src=server.ip4, dst=twice_nat_addr) /
2988              TCP(sport=local_port, dport=eh_port_in))
2989         self.pg0.add_stream(p)
2990         self.pg_enable_capture(self.pg_interfaces)
2991         self.pg_start()
2992         capture = self.pg1.get_capture(1)
2993         p = capture[0]
2994         try:
2995             ip = p[IP]
2996             tcp = p[TCP]
2997             self.assertEqual(ip.src, self.nat_addr)
2998             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2999             self.assertEqual(tcp.sport, external_port)
3000             self.assertEqual(tcp.dport, eh_port_out)
3001             self.check_tcp_checksum(p)
3002             self.check_ip_checksum(p)
3003         except:
3004             self.logger.error(ppp("Unexpected or invalid packet:", p))
3005             raise
3006
3007     def test_twice_nat_interface_addr(self):
3008         """ Acquire twice NAT44 addresses from interface """
3009         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3010
3011         # no address in NAT pool
3012         adresses = self.vapi.nat44_address_dump()
3013         self.assertEqual(0, len(adresses))
3014
3015         # configure interface address and check NAT address pool
3016         self.pg7.config_ip4()
3017         adresses = self.vapi.nat44_address_dump()
3018         self.assertEqual(1, len(adresses))
3019         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3020         self.assertEqual(adresses[0].twice_nat, 1)
3021
3022         # remove interface address and check NAT address pool
3023         self.pg7.unconfig_ip4()
3024         adresses = self.vapi.nat44_address_dump()
3025         self.assertEqual(0, len(adresses))
3026
3027     def tearDown(self):
3028         super(TestNAT44, self).tearDown()
3029         if not self.vpp_dead:
3030             self.logger.info(self.vapi.cli("show nat44 verbose"))
3031             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3032             self.vapi.cli("nat addr-port-assignment-alg default")
3033             self.clear_nat44()
3034
3035
3036 class TestDeterministicNAT(MethodHolder):
3037     """ Deterministic NAT Test Cases """
3038
3039     @classmethod
3040     def setUpConstants(cls):
3041         super(TestDeterministicNAT, cls).setUpConstants()
3042         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3043
3044     @classmethod
3045     def setUpClass(cls):
3046         super(TestDeterministicNAT, cls).setUpClass()
3047
3048         try:
3049             cls.tcp_port_in = 6303
3050             cls.tcp_external_port = 6303
3051             cls.udp_port_in = 6304
3052             cls.udp_external_port = 6304
3053             cls.icmp_id_in = 6305
3054             cls.nat_addr = '10.0.0.3'
3055
3056             cls.create_pg_interfaces(range(3))
3057             cls.interfaces = list(cls.pg_interfaces)
3058
3059             for i in cls.interfaces:
3060                 i.admin_up()
3061                 i.config_ip4()
3062                 i.resolve_arp()
3063
3064             cls.pg0.generate_remote_hosts(2)
3065             cls.pg0.configure_ipv4_neighbors()
3066
3067         except Exception:
3068             super(TestDeterministicNAT, cls).tearDownClass()
3069             raise
3070
3071     def create_stream_in(self, in_if, out_if, ttl=64):
3072         """
3073         Create packet stream for inside network
3074
3075         :param in_if: Inside interface
3076         :param out_if: Outside interface
3077         :param ttl: TTL of generated packets
3078         """
3079         pkts = []
3080         # TCP
3081         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3082              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3083              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3084         pkts.append(p)
3085
3086         # UDP
3087         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3088              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3089              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3090         pkts.append(p)
3091
3092         # ICMP
3093         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3094              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3095              ICMP(id=self.icmp_id_in, type='echo-request'))
3096         pkts.append(p)
3097
3098         return pkts
3099
3100     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3101         """
3102         Create packet stream for outside network
3103
3104         :param out_if: Outside interface
3105         :param dst_ip: Destination IP address (Default use global NAT address)
3106         :param ttl: TTL of generated packets
3107         """
3108         if dst_ip is None:
3109             dst_ip = self.nat_addr
3110         pkts = []
3111         # TCP
3112         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3113              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3114              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3115         pkts.append(p)
3116
3117         # UDP
3118         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3119              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3120              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3121         pkts.append(p)
3122
3123         # ICMP
3124         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3125              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3126              ICMP(id=self.icmp_external_id, type='echo-reply'))
3127         pkts.append(p)
3128
3129         return pkts
3130
3131     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3132         """
3133         Verify captured packets on outside network
3134
3135         :param capture: Captured packets
3136         :param nat_ip: Translated IP address (Default use global NAT address)
3137         :param same_port: Sorce port number is not translated (Default False)
3138         :param packet_num: Expected number of packets (Default 3)
3139         """
3140         if nat_ip is None:
3141             nat_ip = self.nat_addr
3142         self.assertEqual(packet_num, len(capture))
3143         for packet in capture:
3144             try:
3145                 self.assertEqual(packet[IP].src, nat_ip)
3146                 if packet.haslayer(TCP):
3147                     self.tcp_port_out = packet[TCP].sport
3148                 elif packet.haslayer(UDP):
3149                     self.udp_port_out = packet[UDP].sport
3150                 else:
3151                     self.icmp_external_id = packet[ICMP].id
3152             except:
3153                 self.logger.error(ppp("Unexpected or invalid packet "
3154                                       "(outside network):", packet))
3155                 raise
3156
3157     def initiate_tcp_session(self, in_if, out_if):
3158         """
3159         Initiates TCP session
3160
3161         :param in_if: Inside interface
3162         :param out_if: Outside interface
3163         """
3164         try:
3165             # SYN packet in->out
3166             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3167                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3168                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3169                      flags="S"))
3170             in_if.add_stream(p)
3171             self.pg_enable_capture(self.pg_interfaces)
3172             self.pg_start()
3173             capture = out_if.get_capture(1)
3174             p = capture[0]
3175             self.tcp_port_out = p[TCP].sport
3176
3177             # SYN + ACK packet out->in
3178             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3179                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3180                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3181                      flags="SA"))
3182             out_if.add_stream(p)
3183             self.pg_enable_capture(self.pg_interfaces)
3184             self.pg_start()
3185             in_if.get_capture(1)
3186
3187             # ACK packet in->out
3188             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3189                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3190                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3191                      flags="A"))
3192             in_if.add_stream(p)
3193             self.pg_enable_capture(self.pg_interfaces)
3194             self.pg_start()
3195             out_if.get_capture(1)
3196
3197         except:
3198             self.logger.error("TCP 3 way handshake failed")
3199             raise
3200
3201     def verify_ipfix_max_entries_per_user(self, data):
3202         """
3203         Verify IPFIX maximum entries per user exceeded event
3204
3205         :param data: Decoded IPFIX data records
3206         """
3207         self.assertEqual(1, len(data))
3208         record = data[0]
3209         # natEvent
3210         self.assertEqual(ord(record[230]), 13)
3211         # natQuotaExceededEvent
3212         self.assertEqual('\x03\x00\x00\x00', record[466])
3213         # sourceIPv4Address
3214         self.assertEqual(self.pg0.remote_ip4n, record[8])
3215
3216     def test_deterministic_mode(self):
3217         """ NAT plugin run deterministic mode """
3218         in_addr = '172.16.255.0'
3219         out_addr = '172.17.255.50'
3220         in_addr_t = '172.16.255.20'
3221         in_addr_n = socket.inet_aton(in_addr)
3222         out_addr_n = socket.inet_aton(out_addr)
3223         in_addr_t_n = socket.inet_aton(in_addr_t)
3224         in_plen = 24
3225         out_plen = 32
3226
3227         nat_config = self.vapi.nat_show_config()
3228         self.assertEqual(1, nat_config.deterministic)
3229
3230         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3231
3232         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3233         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3234         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3235         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3236
3237         deterministic_mappings = self.vapi.nat_det_map_dump()
3238         self.assertEqual(len(deterministic_mappings), 1)
3239         dsm = deterministic_mappings[0]
3240         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3241         self.assertEqual(in_plen, dsm.in_plen)
3242         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3243         self.assertEqual(out_plen, dsm.out_plen)
3244
3245         self.clear_nat_det()
3246         deterministic_mappings = self.vapi.nat_det_map_dump()
3247         self.assertEqual(len(deterministic_mappings), 0)
3248
3249     def test_set_timeouts(self):
3250         """ Set deterministic NAT timeouts """
3251         timeouts_before = self.vapi.nat_det_get_timeouts()
3252
3253         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3254                                        timeouts_before.tcp_established + 10,
3255                                        timeouts_before.tcp_transitory + 10,
3256                                        timeouts_before.icmp + 10)
3257
3258         timeouts_after = self.vapi.nat_det_get_timeouts()
3259
3260         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3261         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3262         self.assertNotEqual(timeouts_before.tcp_established,
3263                             timeouts_after.tcp_established)
3264         self.assertNotEqual(timeouts_before.tcp_transitory,
3265                             timeouts_after.tcp_transitory)
3266
3267     def test_det_in(self):
3268         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3269
3270         nat_ip = "10.0.0.10"
3271
3272         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3273                                       32,
3274                                       socket.inet_aton(nat_ip),
3275                                       32)
3276         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3277         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3278                                                   is_inside=0)
3279
3280         # in2out
3281         pkts = self.create_stream_in(self.pg0, self.pg1)
3282         self.pg0.add_stream(pkts)
3283         self.pg_enable_capture(self.pg_interfaces)
3284         self.pg_start()
3285         capture = self.pg1.get_capture(len(pkts))
3286         self.verify_capture_out(capture, nat_ip)
3287
3288         # out2in
3289         pkts = self.create_stream_out(self.pg1, nat_ip)
3290         self.pg1.add_stream(pkts)
3291         self.pg_enable_capture(self.pg_interfaces)
3292         self.pg_start()
3293         capture = self.pg0.get_capture(len(pkts))
3294         self.verify_capture_in(capture, self.pg0)
3295
3296         # session dump test
3297         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3298         self.assertEqual(len(sessions), 3)
3299
3300         # TCP session
3301         s = sessions[0]
3302         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3303         self.assertEqual(s.in_port, self.tcp_port_in)
3304         self.assertEqual(s.out_port, self.tcp_port_out)
3305         self.assertEqual(s.ext_port, self.tcp_external_port)
3306
3307         # UDP session
3308         s = sessions[1]
3309         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3310         self.assertEqual(s.in_port, self.udp_port_in)
3311         self.assertEqual(s.out_port, self.udp_port_out)
3312         self.assertEqual(s.ext_port, self.udp_external_port)
3313
3314         # ICMP session
3315         s = sessions[2]
3316         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3317         self.assertEqual(s.in_port, self.icmp_id_in)
3318         self.assertEqual(s.out_port, self.icmp_external_id)
3319
3320     def test_multiple_users(self):
3321         """ Deterministic NAT multiple users """
3322
3323         nat_ip = "10.0.0.10"
3324         port_in = 80
3325         external_port = 6303
3326
3327         host0 = self.pg0.remote_hosts[0]
3328         host1 = self.pg0.remote_hosts[1]
3329
3330         self.vapi.nat_det_add_del_map(host0.ip4n,
3331                                       24,
3332                                       socket.inet_aton(nat_ip),
3333                                       32)
3334         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3335         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3336                                                   is_inside=0)
3337
3338         # host0 to out
3339         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3340              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3341              TCP(sport=port_in, dport=external_port))
3342         self.pg0.add_stream(p)
3343         self.pg_enable_capture(self.pg_interfaces)
3344         self.pg_start()
3345         capture = self.pg1.get_capture(1)
3346         p = capture[0]
3347         try:
3348             ip = p[IP]
3349             tcp = p[TCP]
3350             self.assertEqual(ip.src, nat_ip)
3351             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3352             self.assertEqual(tcp.dport, external_port)
3353             port_out0 = tcp.sport
3354         except:
3355             self.logger.error(ppp("Unexpected or invalid packet:", p))
3356             raise
3357
3358         # host1 to out
3359         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3360              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3361              TCP(sport=port_in, dport=external_port))
3362         self.pg0.add_stream(p)
3363         self.pg_enable_capture(self.pg_interfaces)
3364         self.pg_start()
3365         capture = self.pg1.get_capture(1)
3366         p = capture[0]
3367         try:
3368             ip = p[IP]
3369             tcp = p[TCP]
3370             self.assertEqual(ip.src, nat_ip)
3371             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3372             self.assertEqual(tcp.dport, external_port)
3373             port_out1 = tcp.sport
3374         except:
3375             self.logger.error(ppp("Unexpected or invalid packet:", p))
3376             raise
3377
3378         dms = self.vapi.nat_det_map_dump()
3379         self.assertEqual(1, len(dms))
3380         self.assertEqual(2, dms[0].ses_num)
3381
3382         # out to host0
3383         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3384              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3385              TCP(sport=external_port, dport=port_out0))
3386         self.pg1.add_stream(p)
3387         self.pg_enable_capture(self.pg_interfaces)
3388         self.pg_start()
3389         capture = self.pg0.get_capture(1)
3390         p = capture[0]
3391         try:
3392             ip = p[IP]
3393             tcp = p[TCP]
3394             self.assertEqual(ip.src, self.pg1.remote_ip4)
3395             self.assertEqual(ip.dst, host0.ip4)
3396             self.assertEqual(tcp.dport, port_in)
3397             self.assertEqual(tcp.sport, external_port)
3398         except:
3399             self.logger.error(ppp("Unexpected or invalid packet:", p))
3400             raise
3401
3402         # out to host1
3403         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3404              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3405              TCP(sport=external_port, dport=port_out1))
3406         self.pg1.add_stream(p)
3407         self.pg_enable_capture(self.pg_interfaces)
3408         self.pg_start()
3409         capture = self.pg0.get_capture(1)
3410         p = capture[0]
3411         try:
3412             ip = p[IP]
3413             tcp = p[TCP]
3414             self.assertEqual(ip.src, self.pg1.remote_ip4)
3415             self.assertEqual(ip.dst, host1.ip4)
3416             self.assertEqual(tcp.dport, port_in)
3417             self.assertEqual(tcp.sport, external_port)
3418         except:
3419             self.logger.error(ppp("Unexpected or invalid packet", p))
3420             raise
3421
3422         # session close api test
3423         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3424                                             port_out1,
3425                                             self.pg1.remote_ip4n,
3426                                             external_port)
3427         dms = self.vapi.nat_det_map_dump()
3428         self.assertEqual(dms[0].ses_num, 1)
3429
3430         self.vapi.nat_det_close_session_in(host0.ip4n,
3431                                            port_in,
3432                                            self.pg1.remote_ip4n,
3433                                            external_port)
3434         dms = self.vapi.nat_det_map_dump()
3435         self.assertEqual(dms[0].ses_num, 0)
3436
3437     def test_tcp_session_close_detection_in(self):
3438         """ Deterministic NAT TCP session close from inside network """
3439         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3440                                       32,
3441                                       socket.inet_aton(self.nat_addr),
3442                                       32)
3443         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3445                                                   is_inside=0)
3446
3447         self.initiate_tcp_session(self.pg0, self.pg1)
3448
3449         # close the session from inside
3450         try:
3451             # FIN packet in -> out
3452             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3453                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3454                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3455                      flags="F"))
3456             self.pg0.add_stream(p)
3457             self.pg_enable_capture(self.pg_interfaces)
3458             self.pg_start()
3459             self.pg1.get_capture(1)
3460
3461             pkts = []
3462
3463             # ACK packet out -> in
3464             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3465                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3466                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3467                      flags="A"))
3468             pkts.append(p)
3469
3470             # FIN packet out -> in
3471             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3472                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3473                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3474                      flags="F"))
3475             pkts.append(p)
3476
3477             self.pg1.add_stream(pkts)
3478             self.pg_enable_capture(self.pg_interfaces)
3479             self.pg_start()
3480             self.pg0.get_capture(2)
3481
3482             # ACK packet in -> out
3483             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3484                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3485                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3486                      flags="A"))
3487             self.pg0.add_stream(p)
3488             self.pg_enable_capture(self.pg_interfaces)
3489             self.pg_start()
3490             self.pg1.get_capture(1)
3491
3492             # Check if deterministic NAT44 closed the session
3493             dms = self.vapi.nat_det_map_dump()
3494             self.assertEqual(0, dms[0].ses_num)
3495         except:
3496             self.logger.error("TCP session termination failed")
3497             raise
3498
3499     def test_tcp_session_close_detection_out(self):
3500         """ Deterministic NAT TCP session close from outside network """
3501         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3502                                       32,
3503                                       socket.inet_aton(self.nat_addr),
3504                                       32)
3505         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3506         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3507                                                   is_inside=0)
3508
3509         self.initiate_tcp_session(self.pg0, self.pg1)
3510
3511         # close the session from outside
3512         try:
3513             # FIN packet out -> in
3514             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3515                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3516                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3517                      flags="F"))
3518             self.pg1.add_stream(p)
3519             self.pg_enable_capture(self.pg_interfaces)
3520             self.pg_start()
3521             self.pg0.get_capture(1)
3522
3523             pkts = []
3524
3525             # ACK packet in -> out
3526             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3527                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3528                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3529                      flags="A"))
3530             pkts.append(p)
3531
3532             # ACK packet in -> out
3533             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3534                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3535                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3536                      flags="F"))
3537             pkts.append(p)
3538
3539             self.pg0.add_stream(pkts)
3540             self.pg_enable_capture(self.pg_interfaces)
3541             self.pg_start()
3542             self.pg1.get_capture(2)
3543
3544             # ACK packet out -> in
3545             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3546                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3547                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3548                      flags="A"))
3549             self.pg1.add_stream(p)
3550             self.pg_enable_capture(self.pg_interfaces)
3551             self.pg_start()
3552             self.pg0.get_capture(1)
3553
3554             # Check if deterministic NAT44 closed the session
3555             dms = self.vapi.nat_det_map_dump()
3556             self.assertEqual(0, dms[0].ses_num)
3557         except:
3558             self.logger.error("TCP session termination failed")
3559             raise
3560
3561     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3562     def test_session_timeout(self):
3563         """ Deterministic NAT session timeouts """
3564         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3565                                       32,
3566                                       socket.inet_aton(self.nat_addr),
3567                                       32)
3568         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3569         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3570                                                   is_inside=0)
3571
3572         self.initiate_tcp_session(self.pg0, self.pg1)
3573         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3574         pkts = self.create_stream_in(self.pg0, self.pg1)
3575         self.pg0.add_stream(pkts)
3576         self.pg_enable_capture(self.pg_interfaces)
3577         self.pg_start()
3578         capture = self.pg1.get_capture(len(pkts))
3579         sleep(15)
3580
3581         dms = self.vapi.nat_det_map_dump()
3582         self.assertEqual(0, dms[0].ses_num)
3583
3584     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3585     def test_session_limit_per_user(self):
3586         """ Deterministic NAT maximum sessions per user limit """
3587         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3588                                       32,
3589                                       socket.inet_aton(self.nat_addr),
3590                                       32)
3591         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3592         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3593                                                   is_inside=0)
3594         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3595                                      src_address=self.pg2.local_ip4n,
3596                                      path_mtu=512,
3597                                      template_interval=10)
3598         self.vapi.nat_ipfix()
3599
3600         pkts = []
3601         for port in range(1025, 2025):
3602             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3603                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3604                  UDP(sport=port, dport=port))
3605             pkts.append(p)
3606
3607         self.pg0.add_stream(pkts)
3608         self.pg_enable_capture(self.pg_interfaces)
3609         self.pg_start()
3610         capture = self.pg1.get_capture(len(pkts))
3611
3612         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3613              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3614              UDP(sport=3001, dport=3002))
3615         self.pg0.add_stream(p)
3616         self.pg_enable_capture(self.pg_interfaces)
3617         self.pg_start()
3618         capture = self.pg1.assert_nothing_captured()
3619
3620         # verify ICMP error packet
3621         capture = self.pg0.get_capture(1)
3622         p = capture[0]
3623         self.assertTrue(p.haslayer(ICMP))
3624         icmp = p[ICMP]
3625         self.assertEqual(icmp.type, 3)
3626         self.assertEqual(icmp.code, 1)
3627         self.assertTrue(icmp.haslayer(IPerror))
3628         inner_ip = icmp[IPerror]
3629         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3630         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3631
3632         dms = self.vapi.nat_det_map_dump()
3633
3634         self.assertEqual(1000, dms[0].ses_num)
3635
3636         # verify IPFIX logging
3637         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3638         sleep(1)
3639         capture = self.pg2.get_capture(2)
3640         ipfix = IPFIXDecoder()
3641         # first load template
3642         for p in capture:
3643             self.assertTrue(p.haslayer(IPFIX))
3644             if p.haslayer(Template):
3645                 ipfix.add_template(p.getlayer(Template))
3646         # verify events in data set
3647         for p in capture:
3648             if p.haslayer(Data):
3649                 data = ipfix.decode_data_set(p.getlayer(Set))
3650                 self.verify_ipfix_max_entries_per_user(data)
3651
3652     def clear_nat_det(self):
3653         """
3654         Clear deterministic NAT configuration.
3655         """
3656         self.vapi.nat_ipfix(enable=0)
3657         self.vapi.nat_det_set_timeouts()
3658         deterministic_mappings = self.vapi.nat_det_map_dump()
3659         for dsm in deterministic_mappings:
3660             self.vapi.nat_det_add_del_map(dsm.in_addr,
3661                                           dsm.in_plen,
3662                                           dsm.out_addr,
3663                                           dsm.out_plen,
3664                                           is_add=0)
3665
3666         interfaces = self.vapi.nat44_interface_dump()
3667         for intf in interfaces:
3668             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3669                                                       intf.is_inside,
3670                                                       is_add=0)
3671
3672     def tearDown(self):
3673         super(TestDeterministicNAT, self).tearDown()
3674         if not self.vpp_dead:
3675             self.logger.info(self.vapi.cli("show nat44 detail"))
3676             self.clear_nat_det()
3677
3678
3679 class TestNAT64(MethodHolder):
3680     """ NAT64 Test Cases """
3681
3682     @classmethod
3683     def setUpClass(cls):
3684         super(TestNAT64, cls).setUpClass()
3685
3686         try:
3687             cls.tcp_port_in = 6303
3688             cls.tcp_port_out = 6303
3689             cls.udp_port_in = 6304
3690             cls.udp_port_out = 6304
3691             cls.icmp_id_in = 6305
3692             cls.icmp_id_out = 6305
3693             cls.nat_addr = '10.0.0.3'
3694             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3695             cls.vrf1_id = 10
3696             cls.vrf1_nat_addr = '10.0.10.3'
3697             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3698                                                    cls.vrf1_nat_addr)
3699
3700             cls.create_pg_interfaces(range(5))
3701             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3702             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3703             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3704
3705             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3706
3707             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3708
3709             cls.pg0.generate_remote_hosts(2)
3710
3711             for i in cls.ip6_interfaces:
3712                 i.admin_up()
3713                 i.config_ip6()
3714                 i.configure_ipv6_neighbors()
3715
3716             for i in cls.ip4_interfaces:
3717                 i.admin_up()
3718                 i.config_ip4()
3719                 i.resolve_arp()
3720
3721             cls.pg3.admin_up()
3722             cls.pg3.config_ip4()
3723             cls.pg3.resolve_arp()
3724             cls.pg3.config_ip6()
3725             cls.pg3.configure_ipv6_neighbors()
3726
3727         except Exception:
3728             super(TestNAT64, cls).tearDownClass()
3729             raise
3730
3731     def test_pool(self):
3732         """ Add/delete address to NAT64 pool """
3733         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3734
3735         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3736
3737         addresses = self.vapi.nat64_pool_addr_dump()
3738         self.assertEqual(len(addresses), 1)
3739         self.assertEqual(addresses[0].address, nat_addr)
3740
3741         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3742
3743         addresses = self.vapi.nat64_pool_addr_dump()
3744         self.assertEqual(len(addresses), 0)
3745
3746     def test_interface(self):
3747         """ Enable/disable NAT64 feature on the interface """
3748         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3749         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3750
3751         interfaces = self.vapi.nat64_interface_dump()
3752         self.assertEqual(len(interfaces), 2)
3753         pg0_found = False
3754         pg1_found = False
3755         for intf in interfaces:
3756             if intf.sw_if_index == self.pg0.sw_if_index:
3757                 self.assertEqual(intf.is_inside, 1)
3758                 pg0_found = True
3759             elif intf.sw_if_index == self.pg1.sw_if_index:
3760                 self.assertEqual(intf.is_inside, 0)
3761                 pg1_found = True
3762         self.assertTrue(pg0_found)
3763         self.assertTrue(pg1_found)
3764
3765         features = self.vapi.cli("show interface features pg0")
3766         self.assertNotEqual(features.find('nat64-in2out'), -1)
3767         features = self.vapi.cli("show interface features pg1")
3768         self.assertNotEqual(features.find('nat64-out2in'), -1)
3769
3770         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3771         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3772
3773         interfaces = self.vapi.nat64_interface_dump()
3774         self.assertEqual(len(interfaces), 0)
3775
3776     def test_static_bib(self):
3777         """ Add/delete static BIB entry """
3778         in_addr = socket.inet_pton(socket.AF_INET6,
3779                                    '2001:db8:85a3::8a2e:370:7334')
3780         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3781         in_port = 1234
3782         out_port = 5678
3783         proto = IP_PROTOS.tcp
3784
3785         self.vapi.nat64_add_del_static_bib(in_addr,
3786                                            out_addr,
3787                                            in_port,
3788                                            out_port,
3789                                            proto)
3790         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3791         static_bib_num = 0
3792         for bibe in bib:
3793             if bibe.is_static:
3794                 static_bib_num += 1
3795                 self.assertEqual(bibe.i_addr, in_addr)
3796                 self.assertEqual(bibe.o_addr, out_addr)
3797                 self.assertEqual(bibe.i_port, in_port)
3798                 self.assertEqual(bibe.o_port, out_port)
3799         self.assertEqual(static_bib_num, 1)
3800
3801         self.vapi.nat64_add_del_static_bib(in_addr,
3802                                            out_addr,
3803                                            in_port,
3804                                            out_port,
3805                                            proto,
3806                                            is_add=0)
3807         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3808         static_bib_num = 0
3809         for bibe in bib:
3810             if bibe.is_static:
3811                 static_bib_num += 1
3812         self.assertEqual(static_bib_num, 0)
3813
3814     def test_set_timeouts(self):
3815         """ Set NAT64 timeouts """
3816         # verify default values
3817         timeouts = self.vapi.nat64_get_timeouts()
3818         self.assertEqual(timeouts.udp, 300)
3819         self.assertEqual(timeouts.icmp, 60)
3820         self.assertEqual(timeouts.tcp_trans, 240)
3821         self.assertEqual(timeouts.tcp_est, 7440)
3822         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3823
3824         # set and verify custom values
3825         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3826                                      tcp_est=7450, tcp_incoming_syn=10)
3827         timeouts = self.vapi.nat64_get_timeouts()
3828         self.assertEqual(timeouts.udp, 200)
3829         self.assertEqual(timeouts.icmp, 30)
3830         self.assertEqual(timeouts.tcp_trans, 250)
3831         self.assertEqual(timeouts.tcp_est, 7450)
3832         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3833
3834     def test_dynamic(self):
3835         """ NAT64 dynamic translation test """
3836         self.tcp_port_in = 6303
3837         self.udp_port_in = 6304
3838         self.icmp_id_in = 6305
3839
3840         ses_num_start = self.nat64_get_ses_num()
3841
3842         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3843                                                 self.nat_addr_n)
3844         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3845         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3846
3847         # in2out
3848         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3849         self.pg0.add_stream(pkts)
3850         self.pg_enable_capture(self.pg_interfaces)
3851         self.pg_start()
3852         capture = self.pg1.get_capture(len(pkts))
3853         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3854                                 dst_ip=self.pg1.remote_ip4)
3855
3856         # out2in
3857         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3858         self.pg1.add_stream(pkts)
3859         self.pg_enable_capture(self.pg_interfaces)
3860         self.pg_start()
3861         capture = self.pg0.get_capture(len(pkts))
3862         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3863         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3864
3865         # in2out
3866         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3867         self.pg0.add_stream(pkts)
3868         self.pg_enable_capture(self.pg_interfaces)
3869         self.pg_start()
3870         capture = self.pg1.get_capture(len(pkts))
3871         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3872                                 dst_ip=self.pg1.remote_ip4)
3873
3874         # out2in
3875         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3876         self.pg1.add_stream(pkts)
3877         self.pg_enable_capture(self.pg_interfaces)
3878         self.pg_start()
3879         capture = self.pg0.get_capture(len(pkts))
3880         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3881
3882         ses_num_end = self.nat64_get_ses_num()
3883
3884         self.assertEqual(ses_num_end - ses_num_start, 3)
3885
3886         # tenant with specific VRF
3887         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3888                                                 self.vrf1_nat_addr_n,
3889                                                 vrf_id=self.vrf1_id)
3890         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3891
3892         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3893         self.pg2.add_stream(pkts)
3894         self.pg_enable_capture(self.pg_interfaces)
3895         self.pg_start()
3896         capture = self.pg1.get_capture(len(pkts))
3897         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3898                                 dst_ip=self.pg1.remote_ip4)
3899
3900         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3901         self.pg1.add_stream(pkts)
3902         self.pg_enable_capture(self.pg_interfaces)
3903         self.pg_start()
3904         capture = self.pg2.get_capture(len(pkts))
3905         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3906
3907     def test_static(self):
3908         """ NAT64 static translation test """
3909         self.tcp_port_in = 60303
3910         self.udp_port_in = 60304
3911         self.icmp_id_in = 60305
3912         self.tcp_port_out = 60303
3913         self.udp_port_out = 60304
3914         self.icmp_id_out = 60305
3915
3916         ses_num_start = self.nat64_get_ses_num()
3917
3918         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3919                                                 self.nat_addr_n)
3920         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3921         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3922
3923         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3924                                            self.nat_addr_n,
3925                                            self.tcp_port_in,
3926                                            self.tcp_port_out,
3927                                            IP_PROTOS.tcp)
3928         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3929                                            self.nat_addr_n,
3930                                            self.udp_port_in,
3931                                            self.udp_port_out,
3932                                            IP_PROTOS.udp)
3933         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3934                                            self.nat_addr_n,
3935                                            self.icmp_id_in,
3936                                            self.icmp_id_out,
3937                                            IP_PROTOS.icmp)
3938
3939         # in2out
3940         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3941         self.pg0.add_stream(pkts)
3942         self.pg_enable_capture(self.pg_interfaces)
3943         self.pg_start()
3944         capture = self.pg1.get_capture(len(pkts))
3945         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3946                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3947
3948         # out2in
3949         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3950         self.pg1.add_stream(pkts)
3951         self.pg_enable_capture(self.pg_interfaces)
3952         self.pg_start()
3953         capture = self.pg0.get_capture(len(pkts))
3954         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3955         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3956
3957         ses_num_end = self.nat64_get_ses_num()
3958
3959         self.assertEqual(ses_num_end - ses_num_start, 3)
3960
3961     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3962     def test_session_timeout(self):
3963         """ NAT64 session timeout """
3964         self.icmp_id_in = 1234
3965         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3966                                                 self.nat_addr_n)
3967         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3968         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3969         self.vapi.nat64_set_timeouts(icmp=5)
3970
3971         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3972         self.pg0.add_stream(pkts)
3973         self.pg_enable_capture(self.pg_interfaces)
3974         self.pg_start()
3975         capture = self.pg1.get_capture(len(pkts))
3976
3977         ses_num_before_timeout = self.nat64_get_ses_num()
3978
3979         sleep(15)
3980
3981         # ICMP session after timeout
3982         ses_num_after_timeout = self.nat64_get_ses_num()
3983         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3984
3985     def test_icmp_error(self):
3986         """ NAT64 ICMP Error message translation """
3987         self.tcp_port_in = 6303
3988         self.udp_port_in = 6304
3989         self.icmp_id_in = 6305
3990
3991         ses_num_start = self.nat64_get_ses_num()
3992
3993         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3994                                                 self.nat_addr_n)
3995         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3996         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3997
3998         # send some packets to create sessions
3999         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4000         self.pg0.add_stream(pkts)
4001         self.pg_enable_capture(self.pg_interfaces)
4002         self.pg_start()
4003         capture_ip4 = self.pg1.get_capture(len(pkts))
4004         self.verify_capture_out(capture_ip4,
4005                                 nat_ip=self.nat_addr,
4006                                 dst_ip=self.pg1.remote_ip4)
4007
4008         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4009         self.pg1.add_stream(pkts)
4010         self.pg_enable_capture(self.pg_interfaces)
4011         self.pg_start()
4012         capture_ip6 = self.pg0.get_capture(len(pkts))
4013         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4014         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4015                                    self.pg0.remote_ip6)
4016
4017         # in2out
4018         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4019                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4020                 ICMPv6DestUnreach(code=1) /
4021                 packet[IPv6] for packet in capture_ip6]
4022         self.pg0.add_stream(pkts)
4023         self.pg_enable_capture(self.pg_interfaces)
4024         self.pg_start()
4025         capture = self.pg1.get_capture(len(pkts))
4026         for packet in capture:
4027             try:
4028                 self.assertEqual(packet[IP].src, self.nat_addr)
4029                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4030                 self.assertEqual(packet[ICMP].type, 3)
4031                 self.assertEqual(packet[ICMP].code, 13)
4032                 inner = packet[IPerror]
4033                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4034                 self.assertEqual(inner.dst, self.nat_addr)
4035                 self.check_icmp_checksum(packet)
4036                 if inner.haslayer(TCPerror):
4037                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4038                 elif inner.haslayer(UDPerror):
4039                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4040                 else:
4041                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4042             except:
4043                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4044                 raise
4045
4046         # out2in
4047         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4048                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4049                 ICMP(type=3, code=13) /
4050                 packet[IP] for packet in capture_ip4]
4051         self.pg1.add_stream(pkts)
4052         self.pg_enable_capture(self.pg_interfaces)
4053         self.pg_start()
4054         capture = self.pg0.get_capture(len(pkts))
4055         for packet in capture:
4056             try:
4057                 self.assertEqual(packet[IPv6].src, ip.src)
4058                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4059                 icmp = packet[ICMPv6DestUnreach]
4060                 self.assertEqual(icmp.code, 1)
4061                 inner = icmp[IPerror6]
4062                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4063                 self.assertEqual(inner.dst, ip.src)
4064                 self.check_icmpv6_checksum(packet)
4065                 if inner.haslayer(TCPerror):
4066                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4067                 elif inner.haslayer(UDPerror):
4068                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4069                 else:
4070                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4071                                      self.icmp_id_in)
4072             except:
4073                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4074                 raise
4075
4076     def test_hairpinning(self):
4077         """ NAT64 hairpinning """
4078
4079         client = self.pg0.remote_hosts[0]
4080         server = self.pg0.remote_hosts[1]
4081         server_tcp_in_port = 22
4082         server_tcp_out_port = 4022
4083         server_udp_in_port = 23
4084         server_udp_out_port = 4023
4085         client_tcp_in_port = 1234
4086         client_udp_in_port = 1235
4087         client_tcp_out_port = 0
4088         client_udp_out_port = 0
4089         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4090         nat_addr_ip6 = ip.src
4091
4092         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4093                                                 self.nat_addr_n)
4094         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4095         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4096
4097         self.vapi.nat64_add_del_static_bib(server.ip6n,
4098                                            self.nat_addr_n,
4099                                            server_tcp_in_port,
4100                                            server_tcp_out_port,
4101                                            IP_PROTOS.tcp)
4102         self.vapi.nat64_add_del_static_bib(server.ip6n,
4103                                            self.nat_addr_n,
4104                                            server_udp_in_port,
4105                                            server_udp_out_port,
4106                                            IP_PROTOS.udp)
4107
4108         # client to server
4109         pkts = []
4110         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4111              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4112              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4113         pkts.append(p)
4114         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4115              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4116              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4117         pkts.append(p)
4118         self.pg0.add_stream(pkts)
4119         self.pg_enable_capture(self.pg_interfaces)
4120         self.pg_start()
4121         capture = self.pg0.get_capture(len(pkts))
4122         for packet in capture:
4123             try:
4124                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4125                 self.assertEqual(packet[IPv6].dst, server.ip6)
4126                 if packet.haslayer(TCP):
4127                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4128                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4129                     self.check_tcp_checksum(packet)
4130                     client_tcp_out_port = packet[TCP].sport
4131                 else:
4132                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4133                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4134                     self.check_udp_checksum(packet)
4135                     client_udp_out_port = packet[UDP].sport
4136             except:
4137                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4138                 raise
4139
4140         # server to client
4141         pkts = []
4142         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4143              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4144              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4145         pkts.append(p)
4146         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4147              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4148              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4149         pkts.append(p)
4150         self.pg0.add_stream(pkts)
4151         self.pg_enable_capture(self.pg_interfaces)
4152         self.pg_start()
4153         capture = self.pg0.get_capture(len(pkts))
4154         for packet in capture:
4155             try:
4156                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4157                 self.assertEqual(packet[IPv6].dst, client.ip6)
4158                 if packet.haslayer(TCP):
4159                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4160                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4161                     self.check_tcp_checksum(packet)
4162                 else:
4163                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4164                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4165                     self.check_udp_checksum(packet)
4166             except:
4167                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4168                 raise
4169
4170         # ICMP error
4171         pkts = []
4172         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4173                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4174                 ICMPv6DestUnreach(code=1) /
4175                 packet[IPv6] for packet in capture]
4176         self.pg0.add_stream(pkts)
4177         self.pg_enable_capture(self.pg_interfaces)
4178         self.pg_start()
4179         capture = self.pg0.get_capture(len(pkts))
4180         for packet in capture:
4181             try:
4182                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4183                 self.assertEqual(packet[IPv6].dst, server.ip6)
4184                 icmp = packet[ICMPv6DestUnreach]
4185                 self.assertEqual(icmp.code, 1)
4186                 inner = icmp[IPerror6]
4187                 self.assertEqual(inner.src, server.ip6)
4188                 self.assertEqual(inner.dst, nat_addr_ip6)
4189                 self.check_icmpv6_checksum(packet)
4190                 if inner.haslayer(TCPerror):
4191                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4192                     self.assertEqual(inner[TCPerror].dport,
4193                                      client_tcp_out_port)
4194                 else:
4195                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4196                     self.assertEqual(inner[UDPerror].dport,
4197                                      client_udp_out_port)
4198             except:
4199                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4200                 raise
4201
4202     def test_prefix(self):
4203         """ NAT64 Network-Specific Prefix """
4204
4205         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4206                                                 self.nat_addr_n)
4207         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4208         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4209         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4210                                                 self.vrf1_nat_addr_n,
4211                                                 vrf_id=self.vrf1_id)
4212         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4213
4214         # Add global prefix
4215         global_pref64 = "2001:db8::"
4216         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4217         global_pref64_len = 32
4218         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4219
4220         prefix = self.vapi.nat64_prefix_dump()
4221         self.assertEqual(len(prefix), 1)
4222         self.assertEqual(prefix[0].prefix, global_pref64_n)
4223         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4224         self.assertEqual(prefix[0].vrf_id, 0)
4225
4226         # Add tenant specific prefix
4227         vrf1_pref64 = "2001:db8:122:300::"
4228         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4229         vrf1_pref64_len = 56
4230         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4231                                        vrf1_pref64_len,
4232                                        vrf_id=self.vrf1_id)
4233         prefix = self.vapi.nat64_prefix_dump()
4234         self.assertEqual(len(prefix), 2)
4235
4236         # Global prefix
4237         pkts = self.create_stream_in_ip6(self.pg0,
4238                                          self.pg1,
4239                                          pref=global_pref64,
4240                                          plen=global_pref64_len)
4241         self.pg0.add_stream(pkts)
4242         self.pg_enable_capture(self.pg_interfaces)
4243         self.pg_start()
4244         capture = self.pg1.get_capture(len(pkts))
4245         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4246                                 dst_ip=self.pg1.remote_ip4)
4247
4248         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4249         self.pg1.add_stream(pkts)
4250         self.pg_enable_capture(self.pg_interfaces)
4251         self.pg_start()
4252         capture = self.pg0.get_capture(len(pkts))
4253         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4254                                   global_pref64,
4255                                   global_pref64_len)
4256         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4257
4258         # Tenant specific prefix
4259         pkts = self.create_stream_in_ip6(self.pg2,
4260                                          self.pg1,
4261                                          pref=vrf1_pref64,
4262                                          plen=vrf1_pref64_len)
4263         self.pg2.add_stream(pkts)
4264         self.pg_enable_capture(self.pg_interfaces)
4265         self.pg_start()
4266         capture = self.pg1.get_capture(len(pkts))
4267         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4268                                 dst_ip=self.pg1.remote_ip4)
4269
4270         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4271         self.pg1.add_stream(pkts)
4272         self.pg_enable_capture(self.pg_interfaces)
4273         self.pg_start()
4274         capture = self.pg2.get_capture(len(pkts))
4275         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4276                                   vrf1_pref64,
4277                                   vrf1_pref64_len)
4278         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4279
4280     def test_unknown_proto(self):
4281         """ NAT64 translate packet with unknown protocol """
4282
4283         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4284                                                 self.nat_addr_n)
4285         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4286         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4287         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4288
4289         # in2out
4290         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4291              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4292              TCP(sport=self.tcp_port_in, dport=20))
4293         self.pg0.add_stream(p)
4294         self.pg_enable_capture(self.pg_interfaces)
4295         self.pg_start()
4296         p = self.pg1.get_capture(1)
4297
4298         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4299              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4300              GRE() /
4301              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4302              TCP(sport=1234, dport=1234))
4303         self.pg0.add_stream(p)
4304         self.pg_enable_capture(self.pg_interfaces)
4305         self.pg_start()
4306         p = self.pg1.get_capture(1)
4307         packet = p[0]
4308         try:
4309             self.assertEqual(packet[IP].src, self.nat_addr)
4310             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4311             self.assertTrue(packet.haslayer(GRE))
4312             self.check_ip_checksum(packet)
4313         except:
4314             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4315             raise
4316
4317         # out2in
4318         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4319              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4320              GRE() /
4321              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4322              TCP(sport=1234, dport=1234))
4323         self.pg1.add_stream(p)
4324         self.pg_enable_capture(self.pg_interfaces)
4325         self.pg_start()
4326         p = self.pg0.get_capture(1)
4327         packet = p[0]
4328         try:
4329             self.assertEqual(packet[IPv6].src, remote_ip6)
4330             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4331             self.assertEqual(packet[IPv6].nh, 47)
4332         except:
4333             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4334             raise
4335
4336     def test_hairpinning_unknown_proto(self):
4337         """ NAT64 translate packet with unknown protocol - hairpinning """
4338
4339         client = self.pg0.remote_hosts[0]
4340         server = self.pg0.remote_hosts[1]
4341         server_tcp_in_port = 22
4342         server_tcp_out_port = 4022
4343         client_tcp_in_port = 1234
4344         client_tcp_out_port = 1235
4345         server_nat_ip = "10.0.0.100"
4346         client_nat_ip = "10.0.0.110"
4347         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4348         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4349         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4350         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4351
4352         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4353                                                 client_nat_ip_n)
4354         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4355         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4356
4357         self.vapi.nat64_add_del_static_bib(server.ip6n,
4358                                            server_nat_ip_n,
4359                                            server_tcp_in_port,
4360                                            server_tcp_out_port,
4361                                            IP_PROTOS.tcp)
4362
4363         self.vapi.nat64_add_del_static_bib(server.ip6n,
4364                                            server_nat_ip_n,
4365                                            0,
4366                                            0,
4367                                            IP_PROTOS.gre)
4368
4369         self.vapi.nat64_add_del_static_bib(client.ip6n,
4370                                            client_nat_ip_n,
4371                                            client_tcp_in_port,
4372                                            client_tcp_out_port,
4373                                            IP_PROTOS.tcp)
4374
4375         # client to server
4376         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4377              IPv6(src=client.ip6, dst=server_nat_ip6) /
4378              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4379         self.pg0.add_stream(p)
4380         self.pg_enable_capture(self.pg_interfaces)
4381         self.pg_start()
4382         p = self.pg0.get_capture(1)
4383
4384         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4385              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4386              GRE() /
4387              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4388              TCP(sport=1234, dport=1234))
4389         self.pg0.add_stream(p)
4390         self.pg_enable_capture(self.pg_interfaces)
4391         self.pg_start()
4392         p = self.pg0.get_capture(1)
4393         packet = p[0]
4394         try:
4395             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4396             self.assertEqual(packet[IPv6].dst, server.ip6)
4397             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4398         except:
4399             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4400             raise
4401
4402         # server to client
4403         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4404              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4405              GRE() /
4406              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4407              TCP(sport=1234, dport=1234))
4408         self.pg0.add_stream(p)
4409         self.pg_enable_capture(self.pg_interfaces)
4410         self.pg_start()
4411         p = self.pg0.get_capture(1)
4412         packet = p[0]
4413         try:
4414             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4415             self.assertEqual(packet[IPv6].dst, client.ip6)
4416             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4417         except:
4418             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4419             raise
4420
4421     def test_one_armed_nat64(self):
4422         """ One armed NAT64 """
4423         external_port = 0
4424         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4425                                            '64:ff9b::',
4426                                            96)
4427
4428         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4429                                                 self.nat_addr_n)
4430         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4431         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4432
4433         # in2out
4434         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4435              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4436              TCP(sport=12345, dport=80))
4437         self.pg3.add_stream(p)
4438         self.pg_enable_capture(self.pg_interfaces)
4439         self.pg_start()
4440         capture = self.pg3.get_capture(1)
4441         p = capture[0]
4442         try:
4443             ip = p[IP]
4444             tcp = p[TCP]
4445             self.assertEqual(ip.src, self.nat_addr)
4446             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4447             self.assertNotEqual(tcp.sport, 12345)
4448             external_port = tcp.sport
4449             self.assertEqual(tcp.dport, 80)
4450             self.check_tcp_checksum(p)
4451             self.check_ip_checksum(p)
4452         except:
4453             self.logger.error(ppp("Unexpected or invalid packet:", p))
4454             raise
4455
4456         # out2in
4457         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4458              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4459              TCP(sport=80, dport=external_port))
4460         self.pg3.add_stream(p)
4461         self.pg_enable_capture(self.pg_interfaces)
4462         self.pg_start()
4463         capture = self.pg3.get_capture(1)
4464         p = capture[0]
4465         try:
4466             ip = p[IPv6]
4467             tcp = p[TCP]
4468             self.assertEqual(ip.src, remote_host_ip6)
4469             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4470             self.assertEqual(tcp.sport, 80)
4471             self.assertEqual(tcp.dport, 12345)
4472             self.check_tcp_checksum(p)
4473         except:
4474             self.logger.error(ppp("Unexpected or invalid packet:", p))
4475             raise
4476
4477     def test_frag_in_order(self):
4478         """ NAT64 translate fragments arriving in order """
4479         self.tcp_port_in = random.randint(1025, 65535)
4480
4481         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4482                                                 self.nat_addr_n)
4483         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4484         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4485
4486         reass = self.vapi.nat_reass_dump()
4487         reass_n_start = len(reass)
4488
4489         # in2out
4490         data = 'a' * 200
4491         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4492                                            self.tcp_port_in, 20, data)
4493         self.pg0.add_stream(pkts)
4494         self.pg_enable_capture(self.pg_interfaces)
4495         self.pg_start()
4496         frags = self.pg1.get_capture(len(pkts))
4497         p = self.reass_frags_and_verify(frags,
4498                                         self.nat_addr,
4499                                         self.pg1.remote_ip4)
4500         self.assertEqual(p[TCP].dport, 20)
4501         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4502         self.tcp_port_out = p[TCP].sport
4503         self.assertEqual(data, p[Raw].load)
4504
4505         # out2in
4506         data = "A" * 4 + "b" * 16 + "C" * 3
4507         pkts = self.create_stream_frag(self.pg1,
4508                                        self.nat_addr,
4509                                        20,
4510                                        self.tcp_port_out,
4511                                        data)
4512         self.pg1.add_stream(pkts)
4513         self.pg_enable_capture(self.pg_interfaces)
4514         self.pg_start()
4515         frags = self.pg0.get_capture(len(pkts))
4516         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4517         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4518         self.assertEqual(p[TCP].sport, 20)
4519         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4520         self.assertEqual(data, p[Raw].load)
4521
4522         reass = self.vapi.nat_reass_dump()
4523         reass_n_end = len(reass)
4524
4525         self.assertEqual(reass_n_end - reass_n_start, 2)
4526
4527     def test_reass_hairpinning(self):
4528         """ NAT64 fragments hairpinning """
4529         data = 'a' * 200
4530         client = self.pg0.remote_hosts[0]
4531         server = self.pg0.remote_hosts[1]
4532         server_in_port = random.randint(1025, 65535)
4533         server_out_port = random.randint(1025, 65535)
4534         client_in_port = random.randint(1025, 65535)
4535         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4536         nat_addr_ip6 = ip.src
4537
4538         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4539                                                 self.nat_addr_n)
4540         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4541         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4542
4543         # add static BIB entry for server
4544         self.vapi.nat64_add_del_static_bib(server.ip6n,
4545                                            self.nat_addr_n,
4546                                            server_in_port,
4547                                            server_out_port,
4548                                            IP_PROTOS.tcp)
4549
4550         # send packet from host to server
4551         pkts = self.create_stream_frag_ip6(self.pg0,
4552                                            self.nat_addr,
4553                                            client_in_port,
4554                                            server_out_port,
4555                                            data)
4556         self.pg0.add_stream(pkts)
4557         self.pg_enable_capture(self.pg_interfaces)
4558         self.pg_start()
4559         frags = self.pg0.get_capture(len(pkts))
4560         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4561         self.assertNotEqual(p[TCP].sport, client_in_port)
4562         self.assertEqual(p[TCP].dport, server_in_port)
4563         self.assertEqual(data, p[Raw].load)
4564
4565     def test_frag_out_of_order(self):
4566         """ NAT64 translate fragments arriving out of order """
4567         self.tcp_port_in = random.randint(1025, 65535)
4568
4569         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4570                                                 self.nat_addr_n)
4571         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4572         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4573
4574         # in2out
4575         data = 'a' * 200
4576         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4577                                            self.tcp_port_in, 20, data)
4578         pkts.reverse()
4579         self.pg0.add_stream(pkts)
4580         self.pg_enable_capture(self.pg_interfaces)
4581         self.pg_start()
4582         frags = self.pg1.get_capture(len(pkts))
4583         p = self.reass_frags_and_verify(frags,
4584                                         self.nat_addr,
4585                                         self.pg1.remote_ip4)
4586         self.assertEqual(p[TCP].dport, 20)
4587         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4588         self.tcp_port_out = p[TCP].sport
4589         self.assertEqual(data, p[Raw].load)
4590
4591         # out2in
4592         data = "A" * 4 + "B" * 16 + "C" * 3
4593         pkts = self.create_stream_frag(self.pg1,
4594                                        self.nat_addr,
4595                                        20,
4596                                        self.tcp_port_out,
4597                                        data)
4598         pkts.reverse()
4599         self.pg1.add_stream(pkts)
4600         self.pg_enable_capture(self.pg_interfaces)
4601         self.pg_start()
4602         frags = self.pg0.get_capture(len(pkts))
4603         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4604         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4605         self.assertEqual(p[TCP].sport, 20)
4606         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4607         self.assertEqual(data, p[Raw].load)
4608
4609     def test_interface_addr(self):
4610         """ Acquire NAT64 pool addresses from interface """
4611         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4612
4613         # no address in NAT64 pool
4614         adresses = self.vapi.nat44_address_dump()
4615         self.assertEqual(0, len(adresses))
4616
4617         # configure interface address and check NAT64 address pool
4618         self.pg4.config_ip4()
4619         addresses = self.vapi.nat64_pool_addr_dump()
4620         self.assertEqual(len(addresses), 1)
4621         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4622
4623         # remove interface address and check NAT64 address pool
4624         self.pg4.unconfig_ip4()
4625         addresses = self.vapi.nat64_pool_addr_dump()
4626         self.assertEqual(0, len(adresses))
4627
4628     def nat64_get_ses_num(self):
4629         """
4630         Return number of active NAT64 sessions.
4631         """
4632         st = self.vapi.nat64_st_dump()
4633         return len(st)
4634
4635     def clear_nat64(self):
4636         """
4637         Clear NAT64 configuration.
4638         """
4639         self.vapi.nat64_set_timeouts()
4640
4641         interfaces = self.vapi.nat64_interface_dump()
4642         for intf in interfaces:
4643             if intf.is_inside > 1:
4644                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4645                                                   0,
4646                                                   is_add=0)
4647             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4648                                               intf.is_inside,
4649                                               is_add=0)
4650
4651         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4652         for bibe in bib:
4653             if bibe.is_static:
4654                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4655                                                    bibe.o_addr,
4656                                                    bibe.i_port,
4657                                                    bibe.o_port,
4658                                                    bibe.proto,
4659                                                    bibe.vrf_id,
4660                                                    is_add=0)
4661
4662         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4663         for bibe in bib:
4664             if bibe.is_static:
4665                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4666                                                    bibe.o_addr,
4667                                                    bibe.i_port,
4668                                                    bibe.o_port,
4669                                                    bibe.proto,
4670                                                    bibe.vrf_id,
4671                                                    is_add=0)
4672
4673         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4674         for bibe in bib:
4675             if bibe.is_static:
4676                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4677                                                    bibe.o_addr,
4678                                                    bibe.i_port,
4679                                                    bibe.o_port,
4680                                                    bibe.proto,
4681                                                    bibe.vrf_id,
4682                                                    is_add=0)
4683
4684         adresses = self.vapi.nat64_pool_addr_dump()
4685         for addr in adresses:
4686             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4687                                                     addr.address,
4688                                                     vrf_id=addr.vrf_id,
4689                                                     is_add=0)
4690
4691         prefixes = self.vapi.nat64_prefix_dump()
4692         for prefix in prefixes:
4693             self.vapi.nat64_add_del_prefix(prefix.prefix,
4694                                            prefix.prefix_len,
4695                                            vrf_id=prefix.vrf_id,
4696                                            is_add=0)
4697
4698     def tearDown(self):
4699         super(TestNAT64, self).tearDown()
4700         if not self.vpp_dead:
4701             self.logger.info(self.vapi.cli("show nat64 pool"))
4702             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4703             self.logger.info(self.vapi.cli("show nat64 prefix"))
4704             self.logger.info(self.vapi.cli("show nat64 bib all"))
4705             self.logger.info(self.vapi.cli("show nat64 session table all"))
4706             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4707             self.clear_nat64()
4708
4709
4710 class TestDSlite(MethodHolder):
4711     """ DS-Lite Test Cases """
4712
4713     @classmethod
4714     def setUpClass(cls):
4715         super(TestDSlite, cls).setUpClass()
4716
4717         try:
4718             cls.nat_addr = '10.0.0.3'
4719             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4720
4721             cls.create_pg_interfaces(range(2))
4722             cls.pg0.admin_up()
4723             cls.pg0.config_ip4()
4724             cls.pg0.resolve_arp()
4725             cls.pg1.admin_up()
4726             cls.pg1.config_ip6()
4727             cls.pg1.generate_remote_hosts(2)
4728             cls.pg1.configure_ipv6_neighbors()
4729
4730         except Exception:
4731             super(TestDSlite, cls).tearDownClass()
4732             raise
4733
4734     def test_dslite(self):
4735         """ Test DS-Lite """
4736         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4737                                                  self.nat_addr_n)
4738         aftr_ip4 = '192.0.0.1'
4739         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4740         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4741         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4742         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4743
4744         # UDP
4745         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4746              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4747              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4748              UDP(sport=20000, dport=10000))
4749         self.pg1.add_stream(p)
4750         self.pg_enable_capture(self.pg_interfaces)
4751         self.pg_start()
4752         capture = self.pg0.get_capture(1)
4753         capture = capture[0]
4754         self.assertFalse(capture.haslayer(IPv6))
4755         self.assertEqual(capture[IP].src, self.nat_addr)
4756         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4757         self.assertNotEqual(capture[UDP].sport, 20000)
4758         self.assertEqual(capture[UDP].dport, 10000)
4759         self.check_ip_checksum(capture)
4760         out_port = capture[UDP].sport
4761
4762         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4763              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4764              UDP(sport=10000, dport=out_port))
4765         self.pg0.add_stream(p)
4766         self.pg_enable_capture(self.pg_interfaces)
4767         self.pg_start()
4768         capture = self.pg1.get_capture(1)
4769         capture = capture[0]
4770         self.assertEqual(capture[IPv6].src, aftr_ip6)
4771         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4772         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4773         self.assertEqual(capture[IP].dst, '192.168.1.1')
4774         self.assertEqual(capture[UDP].sport, 10000)
4775         self.assertEqual(capture[UDP].dport, 20000)
4776         self.check_ip_checksum(capture)
4777
4778         # TCP
4779         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4780              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4781              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4782              TCP(sport=20001, dport=10001))
4783         self.pg1.add_stream(p)
4784         self.pg_enable_capture(self.pg_interfaces)
4785         self.pg_start()
4786         capture = self.pg0.get_capture(1)
4787         capture = capture[0]
4788         self.assertFalse(capture.haslayer(IPv6))
4789         self.assertEqual(capture[IP].src, self.nat_addr)
4790         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4791         self.assertNotEqual(capture[TCP].sport, 20001)
4792         self.assertEqual(capture[TCP].dport, 10001)
4793         self.check_ip_checksum(capture)
4794         self.check_tcp_checksum(capture)
4795         out_port = capture[TCP].sport
4796
4797         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4798              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4799              TCP(sport=10001, dport=out_port))
4800         self.pg0.add_stream(p)
4801         self.pg_enable_capture(self.pg_interfaces)
4802         self.pg_start()
4803         capture = self.pg1.get_capture(1)
4804         capture = capture[0]
4805         self.assertEqual(capture[IPv6].src, aftr_ip6)
4806         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4807         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4808         self.assertEqual(capture[IP].dst, '192.168.1.1')
4809         self.assertEqual(capture[TCP].sport, 10001)
4810         self.assertEqual(capture[TCP].dport, 20001)
4811         self.check_ip_checksum(capture)
4812         self.check_tcp_checksum(capture)
4813
4814         # ICMP
4815         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4816              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4817              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4818              ICMP(id=4000, type='echo-request'))
4819         self.pg1.add_stream(p)
4820         self.pg_enable_capture(self.pg_interfaces)
4821         self.pg_start()
4822         capture = self.pg0.get_capture(1)
4823         capture = capture[0]
4824         self.assertFalse(capture.haslayer(IPv6))
4825         self.assertEqual(capture[IP].src, self.nat_addr)
4826         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4827         self.assertNotEqual(capture[ICMP].id, 4000)
4828         self.check_ip_checksum(capture)
4829         self.check_icmp_checksum(capture)
4830         out_id = capture[ICMP].id
4831
4832         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4833              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4834              ICMP(id=out_id, type='echo-reply'))
4835         self.pg0.add_stream(p)
4836         self.pg_enable_capture(self.pg_interfaces)
4837         self.pg_start()
4838         capture = self.pg1.get_capture(1)
4839         capture = capture[0]
4840         self.assertEqual(capture[IPv6].src, aftr_ip6)
4841         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4842         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4843         self.assertEqual(capture[IP].dst, '192.168.1.1')
4844         self.assertEqual(capture[ICMP].id, 4000)
4845         self.check_ip_checksum(capture)
4846         self.check_icmp_checksum(capture)
4847
4848         # ping DS-Lite AFTR tunnel endpoint address
4849         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4850              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
4851              ICMPv6EchoRequest())
4852         self.pg1.add_stream(p)
4853         self.pg_enable_capture(self.pg_interfaces)
4854         self.pg_start()
4855         capture = self.pg1.get_capture(1)
4856         self.assertEqual(1, len(capture))
4857         capture = capture[0]
4858         self.assertEqual(capture[IPv6].src, aftr_ip6)
4859         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4860         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
4861
4862     def tearDown(self):
4863         super(TestDSlite, self).tearDown()
4864         if not self.vpp_dead:
4865             self.logger.info(self.vapi.cli("show dslite pool"))
4866             self.logger.info(
4867                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4868             self.logger.info(self.vapi.cli("show dslite sessions"))
4869
4870 if __name__ == '__main__':
4871     unittest.main(testRunner=VppTestRunner)