Translate matching packets using NAT (VPP-1069)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6 import StringIO
7 import random
8
9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
19 from util import ppp
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
24
25
26 class MethodHolder(VppTestCase):
27     """ NAT create capture and verify method holder """
28
29     @classmethod
30     def setUpClass(cls):
31         super(MethodHolder, cls).setUpClass()
32
33     def tearDown(self):
34         super(MethodHolder, self).tearDown()
35
36     def check_ip_checksum(self, pkt):
37         """
38         Check IP checksum of the packet
39
40         :param pkt: Packet to check IP checksum
41         """
42         new = pkt.__class__(str(pkt))
43         del new['IP'].chksum
44         new = new.__class__(str(new))
45         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
46
47     def check_tcp_checksum(self, pkt):
48         """
49         Check TCP checksum in IP packet
50
51         :param pkt: Packet to check TCP checksum
52         """
53         new = pkt.__class__(str(pkt))
54         del new['TCP'].chksum
55         new = new.__class__(str(new))
56         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
57
58     def check_udp_checksum(self, pkt):
59         """
60         Check UDP checksum in IP packet
61
62         :param pkt: Packet to check UDP checksum
63         """
64         new = pkt.__class__(str(pkt))
65         del new['UDP'].chksum
66         new = new.__class__(str(new))
67         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
68
69     def check_icmp_errror_embedded(self, pkt):
70         """
71         Check ICMP error embeded packet checksum
72
73         :param pkt: Packet to check ICMP error embeded packet checksum
74         """
75         if pkt.haslayer(IPerror):
76             new = pkt.__class__(str(pkt))
77             del new['IPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
80
81         if pkt.haslayer(TCPerror):
82             new = pkt.__class__(str(pkt))
83             del new['TCPerror'].chksum
84             new = new.__class__(str(new))
85             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
86
87         if pkt.haslayer(UDPerror):
88             if pkt['UDPerror'].chksum != 0:
89                 new = pkt.__class__(str(pkt))
90                 del new['UDPerror'].chksum
91                 new = new.__class__(str(new))
92                 self.assertEqual(new['UDPerror'].chksum,
93                                  pkt['UDPerror'].chksum)
94
95         if pkt.haslayer(ICMPerror):
96             del new['ICMPerror'].chksum
97             new = new.__class__(str(new))
98             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
99
100     def check_icmp_checksum(self, pkt):
101         """
102         Check ICMP checksum in IPv4 packet
103
104         :param pkt: Packet to check ICMP checksum
105         """
106         new = pkt.__class__(str(pkt))
107         del new['ICMP'].chksum
108         new = new.__class__(str(new))
109         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110         if pkt.haslayer(IPerror):
111             self.check_icmp_errror_embedded(pkt)
112
113     def check_icmpv6_checksum(self, pkt):
114         """
115         Check ICMPv6 checksum in IPv4 packet
116
117         :param pkt: Packet to check ICMPv6 checksum
118         """
119         new = pkt.__class__(str(pkt))
120         if pkt.haslayer(ICMPv6DestUnreach):
121             del new['ICMPv6DestUnreach'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124                              pkt['ICMPv6DestUnreach'].cksum)
125             self.check_icmp_errror_embedded(pkt)
126         if pkt.haslayer(ICMPv6EchoRequest):
127             del new['ICMPv6EchoRequest'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130                              pkt['ICMPv6EchoRequest'].cksum)
131         if pkt.haslayer(ICMPv6EchoReply):
132             del new['ICMPv6EchoReply'].cksum
133             new = new.__class__(str(new))
134             self.assertEqual(new['ICMPv6EchoReply'].cksum,
135                              pkt['ICMPv6EchoReply'].cksum)
136
137     def create_stream_in(self, in_if, out_if, ttl=64):
138         """
139         Create packet stream for inside network
140
141         :param in_if: Inside interface
142         :param out_if: Outside interface
143         :param ttl: TTL of generated packets
144         """
145         pkts = []
146         # TCP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              TCP(sport=self.tcp_port_in, dport=20))
150         pkts.append(p)
151
152         # UDP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              UDP(sport=self.udp_port_in, dport=20))
156         pkts.append(p)
157
158         # ICMP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
161              ICMP(id=self.icmp_id_in, type='echo-request'))
162         pkts.append(p)
163
164         return pkts
165
166     def compose_ip6(self, ip4, pref, plen):
167         """
168         Compose IPv4-embedded IPv6 addresses
169
170         :param ip4: IPv4 address
171         :param pref: IPv6 prefix
172         :param plen: IPv6 prefix length
173         :returns: IPv4-embedded IPv6 addresses
174         """
175         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
176         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
177         if plen == 32:
178             pref_n[4] = ip4_n[0]
179             pref_n[5] = ip4_n[1]
180             pref_n[6] = ip4_n[2]
181             pref_n[7] = ip4_n[3]
182         elif plen == 40:
183             pref_n[5] = ip4_n[0]
184             pref_n[6] = ip4_n[1]
185             pref_n[7] = ip4_n[2]
186             pref_n[9] = ip4_n[3]
187         elif plen == 48:
188             pref_n[6] = ip4_n[0]
189             pref_n[7] = ip4_n[1]
190             pref_n[9] = ip4_n[2]
191             pref_n[10] = ip4_n[3]
192         elif plen == 56:
193             pref_n[7] = ip4_n[0]
194             pref_n[9] = ip4_n[1]
195             pref_n[10] = ip4_n[2]
196             pref_n[11] = ip4_n[3]
197         elif plen == 64:
198             pref_n[9] = ip4_n[0]
199             pref_n[10] = ip4_n[1]
200             pref_n[11] = ip4_n[2]
201             pref_n[12] = ip4_n[3]
202         elif plen == 96:
203             pref_n[12] = ip4_n[0]
204             pref_n[13] = ip4_n[1]
205             pref_n[14] = ip4_n[2]
206             pref_n[15] = ip4_n[3]
207         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
208
209     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
210         """
211         Create IPv6 packet stream for inside network
212
213         :param in_if: Inside interface
214         :param out_if: Outside interface
215         :param ttl: Hop Limit of generated packets
216         :param pref: NAT64 prefix
217         :param plen: NAT64 prefix length
218         """
219         pkts = []
220         if pref is None:
221             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
222         else:
223             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
224
225         # TCP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              TCP(sport=self.tcp_port_in, dport=20))
229         pkts.append(p)
230
231         # UDP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              UDP(sport=self.udp_port_in, dport=20))
235         pkts.append(p)
236
237         # ICMP
238         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
239              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
240              ICMPv6EchoRequest(id=self.icmp_id_in))
241         pkts.append(p)
242
243         return pkts
244
245     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
246                           use_inside_ports=False):
247         """
248         Create packet stream for outside network
249
250         :param out_if: Outside interface
251         :param dst_ip: Destination IP address (Default use global NAT address)
252         :param ttl: TTL of generated packets
253         :param use_inside_ports: Use inside NAT ports as destination ports
254                instead of outside ports
255         """
256         if dst_ip is None:
257             dst_ip = self.nat_addr
258         if not use_inside_ports:
259             tcp_port = self.tcp_port_out
260             udp_port = self.udp_port_out
261             icmp_id = self.icmp_id_out
262         else:
263             tcp_port = self.tcp_port_in
264             udp_port = self.udp_port_in
265             icmp_id = self.icmp_id_in
266         pkts = []
267         # TCP
268         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
269              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
270              TCP(dport=tcp_port, sport=20))
271         pkts.append(p)
272
273         # UDP
274         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
275              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
276              UDP(dport=udp_port, sport=20))
277         pkts.append(p)
278
279         # ICMP
280         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
281              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
282              ICMP(id=icmp_id, type='echo-reply'))
283         pkts.append(p)
284
285         return pkts
286
287     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
288                            packet_num=3, dst_ip=None):
289         """
290         Verify captured packets on outside network
291
292         :param capture: Captured packets
293         :param nat_ip: Translated IP address (Default use global NAT address)
294         :param same_port: Sorce port number is not translated (Default False)
295         :param packet_num: Expected number of packets (Default 3)
296         :param dst_ip: Destination IP address (Default do not verify)
297         """
298         if nat_ip is None:
299             nat_ip = self.nat_addr
300         self.assertEqual(packet_num, len(capture))
301         for packet in capture:
302             try:
303                 self.check_ip_checksum(packet)
304                 self.assertEqual(packet[IP].src, nat_ip)
305                 if dst_ip is not None:
306                     self.assertEqual(packet[IP].dst, dst_ip)
307                 if packet.haslayer(TCP):
308                     if same_port:
309                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
310                     else:
311                         self.assertNotEqual(
312                             packet[TCP].sport, self.tcp_port_in)
313                     self.tcp_port_out = packet[TCP].sport
314                     self.check_tcp_checksum(packet)
315                 elif packet.haslayer(UDP):
316                     if same_port:
317                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
318                     else:
319                         self.assertNotEqual(
320                             packet[UDP].sport, self.udp_port_in)
321                     self.udp_port_out = packet[UDP].sport
322                 else:
323                     if same_port:
324                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
325                     else:
326                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
327                     self.icmp_id_out = packet[ICMP].id
328                     self.check_icmp_checksum(packet)
329             except:
330                 self.logger.error(ppp("Unexpected or invalid packet "
331                                       "(outside network):", packet))
332                 raise
333
334     def verify_capture_in(self, capture, in_if, packet_num=3):
335         """
336         Verify captured packets on inside network
337
338         :param capture: Captured packets
339         :param in_if: Inside interface
340         :param packet_num: Expected number of packets (Default 3)
341         """
342         self.assertEqual(packet_num, len(capture))
343         for packet in capture:
344             try:
345                 self.check_ip_checksum(packet)
346                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
347                 if packet.haslayer(TCP):
348                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
349                     self.check_tcp_checksum(packet)
350                 elif packet.haslayer(UDP):
351                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
352                 else:
353                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
354                     self.check_icmp_checksum(packet)
355             except:
356                 self.logger.error(ppp("Unexpected or invalid packet "
357                                       "(inside network):", packet))
358                 raise
359
360     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
361         """
362         Verify captured IPv6 packets on inside network
363
364         :param capture: Captured packets
365         :param src_ip: Source IP
366         :param dst_ip: Destination IP address
367         :param packet_num: Expected number of packets (Default 3)
368         """
369         self.assertEqual(packet_num, len(capture))
370         for packet in capture:
371             try:
372                 self.assertEqual(packet[IPv6].src, src_ip)
373                 self.assertEqual(packet[IPv6].dst, dst_ip)
374                 if packet.haslayer(TCP):
375                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
376                     self.check_tcp_checksum(packet)
377                 elif packet.haslayer(UDP):
378                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
379                     self.check_udp_checksum(packet)
380                 else:
381                     self.assertEqual(packet[ICMPv6EchoReply].id,
382                                      self.icmp_id_in)
383                     self.check_icmpv6_checksum(packet)
384             except:
385                 self.logger.error(ppp("Unexpected or invalid packet "
386                                       "(inside network):", packet))
387                 raise
388
389     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
390         """
391         Verify captured packet that don't have to be translated
392
393         :param capture: Captured packets
394         :param ingress_if: Ingress interface
395         :param egress_if: Egress interface
396         """
397         for packet in capture:
398             try:
399                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
400                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
401                 if packet.haslayer(TCP):
402                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
403                 elif packet.haslayer(UDP):
404                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
405                 else:
406                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
407             except:
408                 self.logger.error(ppp("Unexpected or invalid packet "
409                                       "(inside network):", packet))
410                 raise
411
412     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
413                                             packet_num=3, icmp_type=11):
414         """
415         Verify captured packets with ICMP errors on outside network
416
417         :param capture: Captured packets
418         :param src_ip: Translated IP address or IP address of VPP
419                        (Default use global NAT address)
420         :param packet_num: Expected number of packets (Default 3)
421         :param icmp_type: Type of error ICMP packet
422                           we are expecting (Default 11)
423         """
424         if src_ip is None:
425             src_ip = self.nat_addr
426         self.assertEqual(packet_num, len(capture))
427         for packet in capture:
428             try:
429                 self.assertEqual(packet[IP].src, src_ip)
430                 self.assertTrue(packet.haslayer(ICMP))
431                 icmp = packet[ICMP]
432                 self.assertEqual(icmp.type, icmp_type)
433                 self.assertTrue(icmp.haslayer(IPerror))
434                 inner_ip = icmp[IPerror]
435                 if inner_ip.haslayer(TCPerror):
436                     self.assertEqual(inner_ip[TCPerror].dport,
437                                      self.tcp_port_out)
438                 elif inner_ip.haslayer(UDPerror):
439                     self.assertEqual(inner_ip[UDPerror].dport,
440                                      self.udp_port_out)
441                 else:
442                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
443             except:
444                 self.logger.error(ppp("Unexpected or invalid packet "
445                                       "(outside network):", packet))
446                 raise
447
448     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
449                                            icmp_type=11):
450         """
451         Verify captured packets with ICMP errors on inside network
452
453         :param capture: Captured packets
454         :param in_if: Inside interface
455         :param packet_num: Expected number of packets (Default 3)
456         :param icmp_type: Type of error ICMP packet
457                           we are expecting (Default 11)
458         """
459         self.assertEqual(packet_num, len(capture))
460         for packet in capture:
461             try:
462                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
463                 self.assertTrue(packet.haslayer(ICMP))
464                 icmp = packet[ICMP]
465                 self.assertEqual(icmp.type, icmp_type)
466                 self.assertTrue(icmp.haslayer(IPerror))
467                 inner_ip = icmp[IPerror]
468                 if inner_ip.haslayer(TCPerror):
469                     self.assertEqual(inner_ip[TCPerror].sport,
470                                      self.tcp_port_in)
471                 elif inner_ip.haslayer(UDPerror):
472                     self.assertEqual(inner_ip[UDPerror].sport,
473                                      self.udp_port_in)
474                 else:
475                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
476             except:
477                 self.logger.error(ppp("Unexpected or invalid packet "
478                                       "(inside network):", packet))
479                 raise
480
481     def create_stream_frag(self, src_if, dst, sport, dport, data):
482         """
483         Create fragmented packet stream
484
485         :param src_if: Source interface
486         :param dst: Destination IPv4 address
487         :param sport: Source TCP port
488         :param dport: Destination TCP port
489         :param data: Payload data
490         :returns: Fragmets
491         """
492         id = random.randint(0, 65535)
493         p = (IP(src=src_if.remote_ip4, dst=dst) /
494              TCP(sport=sport, dport=dport) /
495              Raw(data))
496         p = p.__class__(str(p))
497         chksum = p['TCP'].chksum
498         pkts = []
499         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
500              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
501              TCP(sport=sport, dport=dport, chksum=chksum) /
502              Raw(data[0:4]))
503         pkts.append(p)
504         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
505              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
506                 proto=IP_PROTOS.tcp) /
507              Raw(data[4:20]))
508         pkts.append(p)
509         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
510              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
511                 id=id) /
512              Raw(data[20:]))
513         pkts.append(p)
514         return pkts
515
516     def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
517                                pref=None, plen=0, frag_size=128):
518         """
519         Create fragmented packet stream
520
521         :param src_if: Source interface
522         :param dst: Destination IPv4 address
523         :param sport: Source TCP port
524         :param dport: Destination TCP port
525         :param data: Payload data
526         :param pref: NAT64 prefix
527         :param plen: NAT64 prefix length
528         :param fragsize: size of fragments
529         :returns: Fragmets
530         """
531         if pref is None:
532             dst_ip6 = ''.join(['64:ff9b::', dst])
533         else:
534             dst_ip6 = self.compose_ip6(dst, pref, plen)
535
536         p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
537              IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
538              IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
539              TCP(sport=sport, dport=dport) /
540              Raw(data))
541
542         return fragment6(p, frag_size)
543
544     def reass_frags_and_verify(self, frags, src, dst):
545         """
546         Reassemble and verify fragmented packet
547
548         :param frags: Captured fragments
549         :param src: Source IPv4 address to verify
550         :param dst: Destination IPv4 address to verify
551
552         :returns: Reassembled IPv4 packet
553         """
554         buffer = StringIO.StringIO()
555         for p in frags:
556             self.assertEqual(p[IP].src, src)
557             self.assertEqual(p[IP].dst, dst)
558             self.check_ip_checksum(p)
559             buffer.seek(p[IP].frag * 8)
560             buffer.write(p[IP].payload)
561         ip = frags[0].getlayer(IP)
562         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
563                 proto=frags[0][IP].proto)
564         if ip.proto == IP_PROTOS.tcp:
565             p = (ip / TCP(buffer.getvalue()))
566             self.check_tcp_checksum(p)
567         elif ip.proto == IP_PROTOS.udp:
568             p = (ip / UDP(buffer.getvalue()))
569         return p
570
571     def reass_frags_and_verify_ip6(self, frags, src, dst):
572         """
573         Reassemble and verify fragmented packet
574
575         :param frags: Captured fragments
576         :param src: Source IPv6 address to verify
577         :param dst: Destination IPv6 address to verify
578
579         :returns: Reassembled IPv6 packet
580         """
581         buffer = StringIO.StringIO()
582         for p in frags:
583             self.assertEqual(p[IPv6].src, src)
584             self.assertEqual(p[IPv6].dst, dst)
585             buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
586             buffer.write(p[IPv6ExtHdrFragment].payload)
587         ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
588                   nh=frags[0][IPv6ExtHdrFragment].nh)
589         if ip.nh == IP_PROTOS.tcp:
590             p = (ip / TCP(buffer.getvalue()))
591             self.check_tcp_checksum(p)
592         elif ip.nh == IP_PROTOS.udp:
593             p = (ip / UDP(buffer.getvalue()))
594         return p
595
596     def verify_ipfix_nat44_ses(self, data):
597         """
598         Verify IPFIX NAT44 session create/delete event
599
600         :param data: Decoded IPFIX data records
601         """
602         nat44_ses_create_num = 0
603         nat44_ses_delete_num = 0
604         self.assertEqual(6, len(data))
605         for record in data:
606             # natEvent
607             self.assertIn(ord(record[230]), [4, 5])
608             if ord(record[230]) == 4:
609                 nat44_ses_create_num += 1
610             else:
611                 nat44_ses_delete_num += 1
612             # sourceIPv4Address
613             self.assertEqual(self.pg0.remote_ip4n, record[8])
614             # postNATSourceIPv4Address
615             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
616                              record[225])
617             # ingressVRFID
618             self.assertEqual(struct.pack("!I", 0), record[234])
619             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
620             if IP_PROTOS.icmp == ord(record[4]):
621                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
622                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
623                                  record[227])
624             elif IP_PROTOS.tcp == ord(record[4]):
625                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
626                                  record[7])
627                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
628                                  record[227])
629             elif IP_PROTOS.udp == ord(record[4]):
630                 self.assertEqual(struct.pack("!H", self.udp_port_in),
631                                  record[7])
632                 self.assertEqual(struct.pack("!H", self.udp_port_out),
633                                  record[227])
634             else:
635                 self.fail("Invalid protocol")
636         self.assertEqual(3, nat44_ses_create_num)
637         self.assertEqual(3, nat44_ses_delete_num)
638
639     def verify_ipfix_addr_exhausted(self, data):
640         """
641         Verify IPFIX NAT addresses event
642
643         :param data: Decoded IPFIX data records
644         """
645         self.assertEqual(1, len(data))
646         record = data[0]
647         # natEvent
648         self.assertEqual(ord(record[230]), 3)
649         # natPoolID
650         self.assertEqual(struct.pack("!I", 0), record[283])
651
652
653 class TestNAT44(MethodHolder):
654     """ NAT44 Test Cases """
655
656     @classmethod
657     def setUpClass(cls):
658         super(TestNAT44, cls).setUpClass()
659
660         try:
661             cls.tcp_port_in = 6303
662             cls.tcp_port_out = 6303
663             cls.udp_port_in = 6304
664             cls.udp_port_out = 6304
665             cls.icmp_id_in = 6305
666             cls.icmp_id_out = 6305
667             cls.nat_addr = '10.0.0.3'
668             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
669             cls.ipfix_src_port = 4739
670             cls.ipfix_domain_id = 1
671
672             cls.create_pg_interfaces(range(10))
673             cls.interfaces = list(cls.pg_interfaces[0:4])
674
675             for i in cls.interfaces:
676                 i.admin_up()
677                 i.config_ip4()
678                 i.resolve_arp()
679
680             cls.pg0.generate_remote_hosts(3)
681             cls.pg0.configure_ipv4_neighbors()
682
683             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
684             cls.vapi.ip_table_add_del(10, is_add=1)
685             cls.vapi.ip_table_add_del(20, is_add=1)
686
687             cls.pg4._local_ip4 = "172.16.255.1"
688             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
689             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
690             cls.pg4.set_table_ip4(10)
691             cls.pg5._local_ip4 = "172.17.255.3"
692             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
693             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
694             cls.pg5.set_table_ip4(10)
695             cls.pg6._local_ip4 = "172.16.255.1"
696             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
697             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
698             cls.pg6.set_table_ip4(20)
699             for i in cls.overlapping_interfaces:
700                 i.config_ip4()
701                 i.admin_up()
702                 i.resolve_arp()
703
704             cls.pg7.admin_up()
705             cls.pg8.admin_up()
706
707             cls.pg9.generate_remote_hosts(2)
708             cls.pg9.config_ip4()
709             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
710             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
711                                                   ip_addr_n,
712                                                   24)
713             cls.pg9.admin_up()
714             cls.pg9.resolve_arp()
715             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
716             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
717             cls.pg9.resolve_arp()
718
719         except Exception:
720             super(TestNAT44, cls).tearDownClass()
721             raise
722
723     def clear_nat44(self):
724         """
725         Clear NAT44 configuration.
726         """
727         # I found no elegant way to do this
728         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
729                                    dst_address_length=32,
730                                    next_hop_address=self.pg7.remote_ip4n,
731                                    next_hop_sw_if_index=self.pg7.sw_if_index,
732                                    is_add=0)
733         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
734                                    dst_address_length=32,
735                                    next_hop_address=self.pg8.remote_ip4n,
736                                    next_hop_sw_if_index=self.pg8.sw_if_index,
737                                    is_add=0)
738
739         for intf in [self.pg7, self.pg8]:
740             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
741             for n in neighbors:
742                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
743                                               n.mac_address,
744                                               n.ip_address,
745                                               is_add=0)
746
747         if self.pg7.has_ip4_config:
748             self.pg7.unconfig_ip4()
749
750         interfaces = self.vapi.nat44_interface_addr_dump()
751         for intf in interfaces:
752             self.vapi.nat44_add_interface_addr(intf.sw_if_index,
753                                                twice_nat=intf.twice_nat,
754                                                is_add=0)
755
756         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
757                             domain_id=self.ipfix_domain_id)
758         self.ipfix_src_port = 4739
759         self.ipfix_domain_id = 1
760
761         interfaces = self.vapi.nat44_interface_dump()
762         for intf in interfaces:
763             if intf.is_inside > 1:
764                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
765                                                           0,
766                                                           is_add=0)
767             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
768                                                       intf.is_inside,
769                                                       is_add=0)
770
771         interfaces = self.vapi.nat44_interface_output_feature_dump()
772         for intf in interfaces:
773             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
774                                                              intf.is_inside,
775                                                              is_add=0)
776
777         static_mappings = self.vapi.nat44_static_mapping_dump()
778         for sm in static_mappings:
779             self.vapi.nat44_add_del_static_mapping(
780                 sm.local_ip_address,
781                 sm.external_ip_address,
782                 local_port=sm.local_port,
783                 external_port=sm.external_port,
784                 addr_only=sm.addr_only,
785                 vrf_id=sm.vrf_id,
786                 protocol=sm.protocol,
787                 twice_nat=sm.twice_nat,
788                 is_add=0)
789
790         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
791         for lb_sm in lb_static_mappings:
792             self.vapi.nat44_add_del_lb_static_mapping(
793                 lb_sm.external_addr,
794                 lb_sm.external_port,
795                 lb_sm.protocol,
796                 vrf_id=lb_sm.vrf_id,
797                 twice_nat=lb_sm.twice_nat,
798                 is_add=0,
799                 local_num=0,
800                 locals=[])
801
802         identity_mappings = self.vapi.nat44_identity_mapping_dump()
803         for id_m in identity_mappings:
804             self.vapi.nat44_add_del_identity_mapping(
805                 addr_only=id_m.addr_only,
806                 ip=id_m.ip_address,
807                 port=id_m.port,
808                 sw_if_index=id_m.sw_if_index,
809                 vrf_id=id_m.vrf_id,
810                 protocol=id_m.protocol,
811                 is_add=0)
812
813         adresses = self.vapi.nat44_address_dump()
814         for addr in adresses:
815             self.vapi.nat44_add_del_address_range(addr.ip_address,
816                                                   addr.ip_address,
817                                                   twice_nat=addr.twice_nat,
818                                                   is_add=0)
819
820         self.vapi.nat_set_reass()
821         self.vapi.nat_set_reass(is_ip6=1)
822
823     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
824                                  local_port=0, external_port=0, vrf_id=0,
825                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
826                                  proto=0, twice_nat=0):
827         """
828         Add/delete NAT44 static mapping
829
830         :param local_ip: Local IP address
831         :param external_ip: External IP address
832         :param local_port: Local port number (Optional)
833         :param external_port: External port number (Optional)
834         :param vrf_id: VRF ID (Default 0)
835         :param is_add: 1 if add, 0 if delete (Default add)
836         :param external_sw_if_index: External interface instead of IP address
837         :param proto: IP protocol (Mandatory if port specified)
838         :param twice_nat: 1 if translate external host address and port
839         """
840         addr_only = 1
841         if local_port and external_port:
842             addr_only = 0
843         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
844         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
845         self.vapi.nat44_add_del_static_mapping(
846             l_ip,
847             e_ip,
848             external_sw_if_index,
849             local_port,
850             external_port,
851             addr_only,
852             vrf_id,
853             proto,
854             twice_nat,
855             is_add)
856
857     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
858         """
859         Add/delete NAT44 address
860
861         :param ip: IP address
862         :param is_add: 1 if add, 0 if delete (Default add)
863         :param twice_nat: twice NAT address for extenal hosts
864         """
865         nat_addr = socket.inet_pton(socket.AF_INET, ip)
866         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
867                                               vrf_id=vrf_id,
868                                               twice_nat=twice_nat)
869
870     def test_dynamic(self):
871         """ NAT44 dynamic translation test """
872
873         self.nat44_add_address(self.nat_addr)
874         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
875         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
876                                                   is_inside=0)
877
878         # in2out
879         pkts = self.create_stream_in(self.pg0, self.pg1)
880         self.pg0.add_stream(pkts)
881         self.pg_enable_capture(self.pg_interfaces)
882         self.pg_start()
883         capture = self.pg1.get_capture(len(pkts))
884         self.verify_capture_out(capture)
885
886         # out2in
887         pkts = self.create_stream_out(self.pg1)
888         self.pg1.add_stream(pkts)
889         self.pg_enable_capture(self.pg_interfaces)
890         self.pg_start()
891         capture = self.pg0.get_capture(len(pkts))
892         self.verify_capture_in(capture, self.pg0)
893
894     def test_dynamic_icmp_errors_in2out_ttl_1(self):
895         """ NAT44 handling of client packets with TTL=1 """
896
897         self.nat44_add_address(self.nat_addr)
898         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
899         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
900                                                   is_inside=0)
901
902         # Client side - generate traffic
903         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
904         self.pg0.add_stream(pkts)
905         self.pg_enable_capture(self.pg_interfaces)
906         self.pg_start()
907
908         # Client side - verify ICMP type 11 packets
909         capture = self.pg0.get_capture(len(pkts))
910         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
911
912     def test_dynamic_icmp_errors_out2in_ttl_1(self):
913         """ NAT44 handling of server packets with TTL=1 """
914
915         self.nat44_add_address(self.nat_addr)
916         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
917         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
918                                                   is_inside=0)
919
920         # Client side - create sessions
921         pkts = self.create_stream_in(self.pg0, self.pg1)
922         self.pg0.add_stream(pkts)
923         self.pg_enable_capture(self.pg_interfaces)
924         self.pg_start()
925
926         # Server side - generate traffic
927         capture = self.pg1.get_capture(len(pkts))
928         self.verify_capture_out(capture)
929         pkts = self.create_stream_out(self.pg1, ttl=1)
930         self.pg1.add_stream(pkts)
931         self.pg_enable_capture(self.pg_interfaces)
932         self.pg_start()
933
934         # Server side - verify ICMP type 11 packets
935         capture = self.pg1.get_capture(len(pkts))
936         self.verify_capture_out_with_icmp_errors(capture,
937                                                  src_ip=self.pg1.local_ip4)
938
939     def test_dynamic_icmp_errors_in2out_ttl_2(self):
940         """ NAT44 handling of error responses to client packets with TTL=2 """
941
942         self.nat44_add_address(self.nat_addr)
943         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
944         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
945                                                   is_inside=0)
946
947         # Client side - generate traffic
948         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
949         self.pg0.add_stream(pkts)
950         self.pg_enable_capture(self.pg_interfaces)
951         self.pg_start()
952
953         # Server side - simulate ICMP type 11 response
954         capture = self.pg1.get_capture(len(pkts))
955         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
956                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
957                 ICMP(type=11) / packet[IP] for packet in capture]
958         self.pg1.add_stream(pkts)
959         self.pg_enable_capture(self.pg_interfaces)
960         self.pg_start()
961
962         # Client side - verify ICMP type 11 packets
963         capture = self.pg0.get_capture(len(pkts))
964         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
965
966     def test_dynamic_icmp_errors_out2in_ttl_2(self):
967         """ NAT44 handling of error responses to server packets with TTL=2 """
968
969         self.nat44_add_address(self.nat_addr)
970         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
971         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
972                                                   is_inside=0)
973
974         # Client side - create sessions
975         pkts = self.create_stream_in(self.pg0, self.pg1)
976         self.pg0.add_stream(pkts)
977         self.pg_enable_capture(self.pg_interfaces)
978         self.pg_start()
979
980         # Server side - generate traffic
981         capture = self.pg1.get_capture(len(pkts))
982         self.verify_capture_out(capture)
983         pkts = self.create_stream_out(self.pg1, ttl=2)
984         self.pg1.add_stream(pkts)
985         self.pg_enable_capture(self.pg_interfaces)
986         self.pg_start()
987
988         # Client side - simulate ICMP type 11 response
989         capture = self.pg0.get_capture(len(pkts))
990         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
991                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
992                 ICMP(type=11) / packet[IP] for packet in capture]
993         self.pg0.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996
997         # Server side - verify ICMP type 11 packets
998         capture = self.pg1.get_capture(len(pkts))
999         self.verify_capture_out_with_icmp_errors(capture)
1000
1001     def test_ping_out_interface_from_outside(self):
1002         """ Ping NAT44 out interface from outside network """
1003
1004         self.nat44_add_address(self.nat_addr)
1005         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1006         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1007                                                   is_inside=0)
1008
1009         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1010              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1011              ICMP(id=self.icmp_id_out, type='echo-request'))
1012         pkts = [p]
1013         self.pg1.add_stream(pkts)
1014         self.pg_enable_capture(self.pg_interfaces)
1015         self.pg_start()
1016         capture = self.pg1.get_capture(len(pkts))
1017         self.assertEqual(1, len(capture))
1018         packet = capture[0]
1019         try:
1020             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1021             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1022             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1023             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1024         except:
1025             self.logger.error(ppp("Unexpected or invalid packet "
1026                                   "(outside network):", packet))
1027             raise
1028
1029     def test_ping_internal_host_from_outside(self):
1030         """ Ping internal host from outside network """
1031
1032         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1033         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1034         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1035                                                   is_inside=0)
1036
1037         # out2in
1038         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1039                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1040                ICMP(id=self.icmp_id_out, type='echo-request'))
1041         self.pg1.add_stream(pkt)
1042         self.pg_enable_capture(self.pg_interfaces)
1043         self.pg_start()
1044         capture = self.pg0.get_capture(1)
1045         self.verify_capture_in(capture, self.pg0, packet_num=1)
1046         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1047
1048         # in2out
1049         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1050                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1051                ICMP(id=self.icmp_id_in, type='echo-reply'))
1052         self.pg0.add_stream(pkt)
1053         self.pg_enable_capture(self.pg_interfaces)
1054         self.pg_start()
1055         capture = self.pg1.get_capture(1)
1056         self.verify_capture_out(capture, same_port=True, packet_num=1)
1057         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1058
1059     def test_forwarding(self):
1060         """ NAT44 forwarding test """
1061
1062         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1063         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1064                                                   is_inside=0)
1065         self.vapi.nat44_forwarding_enable_disable(1)
1066
1067         real_ip = self.pg0.remote_ip4n
1068         alias_ip = self.nat_addr_n
1069         self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1070                                                external_ip=alias_ip)
1071
1072         try:
1073             # in2out - static mapping match
1074
1075             pkts = self.create_stream_out(self.pg1)
1076             self.pg1.add_stream(pkts)
1077             self.pg_enable_capture(self.pg_interfaces)
1078             self.pg_start()
1079             capture = self.pg0.get_capture(len(pkts))
1080             self.verify_capture_in(capture, self.pg0)
1081
1082             pkts = self.create_stream_in(self.pg0, self.pg1)
1083             self.pg0.add_stream(pkts)
1084             self.pg_enable_capture(self.pg_interfaces)
1085             self.pg_start()
1086             capture = self.pg1.get_capture(len(pkts))
1087             self.verify_capture_out(capture, same_port=True)
1088
1089             # in2out - no static mapping match
1090
1091             host0 = self.pg0.remote_hosts[0]
1092             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1093             try:
1094                 pkts = self.create_stream_out(self.pg1,
1095                                               dst_ip=self.pg0.remote_ip4,
1096                                               use_inside_ports=True)
1097                 self.pg1.add_stream(pkts)
1098                 self.pg_enable_capture(self.pg_interfaces)
1099                 self.pg_start()
1100                 capture = self.pg0.get_capture(len(pkts))
1101                 self.verify_capture_in(capture, self.pg0)
1102
1103                 pkts = self.create_stream_in(self.pg0, self.pg1)
1104                 self.pg0.add_stream(pkts)
1105                 self.pg_enable_capture(self.pg_interfaces)
1106                 self.pg_start()
1107                 capture = self.pg1.get_capture(len(pkts))
1108                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1109                                         same_port=True)
1110             finally:
1111                 self.pg0.remote_hosts[0] = host0
1112
1113         finally:
1114             self.vapi.nat44_forwarding_enable_disable(0)
1115             self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1116                                                    external_ip=alias_ip,
1117                                                    is_add=0)
1118
1119     def test_static_in(self):
1120         """ 1:1 NAT initialized from inside network """
1121
1122         nat_ip = "10.0.0.10"
1123         self.tcp_port_out = 6303
1124         self.udp_port_out = 6304
1125         self.icmp_id_out = 6305
1126
1127         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1128         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1129         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1130                                                   is_inside=0)
1131
1132         # in2out
1133         pkts = self.create_stream_in(self.pg0, self.pg1)
1134         self.pg0.add_stream(pkts)
1135         self.pg_enable_capture(self.pg_interfaces)
1136         self.pg_start()
1137         capture = self.pg1.get_capture(len(pkts))
1138         self.verify_capture_out(capture, nat_ip, True)
1139
1140         # out2in
1141         pkts = self.create_stream_out(self.pg1, nat_ip)
1142         self.pg1.add_stream(pkts)
1143         self.pg_enable_capture(self.pg_interfaces)
1144         self.pg_start()
1145         capture = self.pg0.get_capture(len(pkts))
1146         self.verify_capture_in(capture, self.pg0)
1147
1148     def test_static_out(self):
1149         """ 1:1 NAT initialized from outside network """
1150
1151         nat_ip = "10.0.0.20"
1152         self.tcp_port_out = 6303
1153         self.udp_port_out = 6304
1154         self.icmp_id_out = 6305
1155
1156         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1157         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1158         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1159                                                   is_inside=0)
1160
1161         # out2in
1162         pkts = self.create_stream_out(self.pg1, nat_ip)
1163         self.pg1.add_stream(pkts)
1164         self.pg_enable_capture(self.pg_interfaces)
1165         self.pg_start()
1166         capture = self.pg0.get_capture(len(pkts))
1167         self.verify_capture_in(capture, self.pg0)
1168
1169         # in2out
1170         pkts = self.create_stream_in(self.pg0, self.pg1)
1171         self.pg0.add_stream(pkts)
1172         self.pg_enable_capture(self.pg_interfaces)
1173         self.pg_start()
1174         capture = self.pg1.get_capture(len(pkts))
1175         self.verify_capture_out(capture, nat_ip, True)
1176
1177     def test_static_with_port_in(self):
1178         """ 1:1 NAPT initialized from inside network """
1179
1180         self.tcp_port_out = 3606
1181         self.udp_port_out = 3607
1182         self.icmp_id_out = 3608
1183
1184         self.nat44_add_address(self.nat_addr)
1185         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1186                                       self.tcp_port_in, self.tcp_port_out,
1187                                       proto=IP_PROTOS.tcp)
1188         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1189                                       self.udp_port_in, self.udp_port_out,
1190                                       proto=IP_PROTOS.udp)
1191         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1192                                       self.icmp_id_in, self.icmp_id_out,
1193                                       proto=IP_PROTOS.icmp)
1194         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1195         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1196                                                   is_inside=0)
1197
1198         # in2out
1199         pkts = self.create_stream_in(self.pg0, self.pg1)
1200         self.pg0.add_stream(pkts)
1201         self.pg_enable_capture(self.pg_interfaces)
1202         self.pg_start()
1203         capture = self.pg1.get_capture(len(pkts))
1204         self.verify_capture_out(capture)
1205
1206         # out2in
1207         pkts = self.create_stream_out(self.pg1)
1208         self.pg1.add_stream(pkts)
1209         self.pg_enable_capture(self.pg_interfaces)
1210         self.pg_start()
1211         capture = self.pg0.get_capture(len(pkts))
1212         self.verify_capture_in(capture, self.pg0)
1213
1214     def test_static_with_port_out(self):
1215         """ 1:1 NAPT initialized from outside network """
1216
1217         self.tcp_port_out = 30606
1218         self.udp_port_out = 30607
1219         self.icmp_id_out = 30608
1220
1221         self.nat44_add_address(self.nat_addr)
1222         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1223                                       self.tcp_port_in, self.tcp_port_out,
1224                                       proto=IP_PROTOS.tcp)
1225         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1226                                       self.udp_port_in, self.udp_port_out,
1227                                       proto=IP_PROTOS.udp)
1228         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1229                                       self.icmp_id_in, self.icmp_id_out,
1230                                       proto=IP_PROTOS.icmp)
1231         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1232         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1233                                                   is_inside=0)
1234
1235         # out2in
1236         pkts = self.create_stream_out(self.pg1)
1237         self.pg1.add_stream(pkts)
1238         self.pg_enable_capture(self.pg_interfaces)
1239         self.pg_start()
1240         capture = self.pg0.get_capture(len(pkts))
1241         self.verify_capture_in(capture, self.pg0)
1242
1243         # in2out
1244         pkts = self.create_stream_in(self.pg0, self.pg1)
1245         self.pg0.add_stream(pkts)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248         capture = self.pg1.get_capture(len(pkts))
1249         self.verify_capture_out(capture)
1250
1251     def test_static_vrf_aware(self):
1252         """ 1:1 NAT VRF awareness """
1253
1254         nat_ip1 = "10.0.0.30"
1255         nat_ip2 = "10.0.0.40"
1256         self.tcp_port_out = 6303
1257         self.udp_port_out = 6304
1258         self.icmp_id_out = 6305
1259
1260         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1261                                       vrf_id=10)
1262         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1263                                       vrf_id=10)
1264         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1265                                                   is_inside=0)
1266         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1267         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1268
1269         # inside interface VRF match NAT44 static mapping VRF
1270         pkts = self.create_stream_in(self.pg4, self.pg3)
1271         self.pg4.add_stream(pkts)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg3.get_capture(len(pkts))
1275         self.verify_capture_out(capture, nat_ip1, True)
1276
1277         # inside interface VRF don't match NAT44 static mapping VRF (packets
1278         # are dropped)
1279         pkts = self.create_stream_in(self.pg0, self.pg3)
1280         self.pg0.add_stream(pkts)
1281         self.pg_enable_capture(self.pg_interfaces)
1282         self.pg_start()
1283         self.pg3.assert_nothing_captured()
1284
1285     def test_identity_nat(self):
1286         """ Identity NAT """
1287
1288         self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1289         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1290         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1291                                                   is_inside=0)
1292
1293         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1294              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1295              TCP(sport=12345, dport=56789))
1296         self.pg1.add_stream(p)
1297         self.pg_enable_capture(self.pg_interfaces)
1298         self.pg_start()
1299         capture = self.pg0.get_capture(1)
1300         p = capture[0]
1301         try:
1302             ip = p[IP]
1303             tcp = p[TCP]
1304             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1305             self.assertEqual(ip.src, self.pg1.remote_ip4)
1306             self.assertEqual(tcp.dport, 56789)
1307             self.assertEqual(tcp.sport, 12345)
1308             self.check_tcp_checksum(p)
1309             self.check_ip_checksum(p)
1310         except:
1311             self.logger.error(ppp("Unexpected or invalid packet:", p))
1312             raise
1313
1314     def test_static_lb(self):
1315         """ NAT44 local service load balancing """
1316         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1317         external_port = 80
1318         local_port = 8080
1319         server1 = self.pg0.remote_hosts[0]
1320         server2 = self.pg0.remote_hosts[1]
1321
1322         locals = [{'addr': server1.ip4n,
1323                    'port': local_port,
1324                    'probability': 70},
1325                   {'addr': server2.ip4n,
1326                    'port': local_port,
1327                    'probability': 30}]
1328
1329         self.nat44_add_address(self.nat_addr)
1330         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1331                                                   external_port,
1332                                                   IP_PROTOS.tcp,
1333                                                   local_num=len(locals),
1334                                                   locals=locals)
1335         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1336         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1337                                                   is_inside=0)
1338
1339         # from client to service
1340         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1341              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1342              TCP(sport=12345, dport=external_port))
1343         self.pg1.add_stream(p)
1344         self.pg_enable_capture(self.pg_interfaces)
1345         self.pg_start()
1346         capture = self.pg0.get_capture(1)
1347         p = capture[0]
1348         server = None
1349         try:
1350             ip = p[IP]
1351             tcp = p[TCP]
1352             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1353             if ip.dst == server1.ip4:
1354                 server = server1
1355             else:
1356                 server = server2
1357             self.assertEqual(tcp.dport, local_port)
1358             self.check_tcp_checksum(p)
1359             self.check_ip_checksum(p)
1360         except:
1361             self.logger.error(ppp("Unexpected or invalid packet:", p))
1362             raise
1363
1364         # from service back to client
1365         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1366              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1367              TCP(sport=local_port, dport=12345))
1368         self.pg0.add_stream(p)
1369         self.pg_enable_capture(self.pg_interfaces)
1370         self.pg_start()
1371         capture = self.pg1.get_capture(1)
1372         p = capture[0]
1373         try:
1374             ip = p[IP]
1375             tcp = p[TCP]
1376             self.assertEqual(ip.src, self.nat_addr)
1377             self.assertEqual(tcp.sport, external_port)
1378             self.check_tcp_checksum(p)
1379             self.check_ip_checksum(p)
1380         except:
1381             self.logger.error(ppp("Unexpected or invalid packet:", p))
1382             raise
1383
1384         # multiple clients
1385         server1_n = 0
1386         server2_n = 0
1387         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1388         pkts = []
1389         for client in clients:
1390             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1391                  IP(src=client, dst=self.nat_addr) /
1392                  TCP(sport=12345, dport=external_port))
1393             pkts.append(p)
1394         self.pg1.add_stream(pkts)
1395         self.pg_enable_capture(self.pg_interfaces)
1396         self.pg_start()
1397         capture = self.pg0.get_capture(len(pkts))
1398         for p in capture:
1399             if p[IP].dst == server1.ip4:
1400                 server1_n += 1
1401             else:
1402                 server2_n += 1
1403         self.assertTrue(server1_n > server2_n)
1404
1405     def test_multiple_inside_interfaces(self):
1406         """ NAT44 multiple non-overlapping address space inside interfaces """
1407
1408         self.nat44_add_address(self.nat_addr)
1409         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1410         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1411         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1412                                                   is_inside=0)
1413
1414         # between two NAT44 inside interfaces (no translation)
1415         pkts = self.create_stream_in(self.pg0, self.pg1)
1416         self.pg0.add_stream(pkts)
1417         self.pg_enable_capture(self.pg_interfaces)
1418         self.pg_start()
1419         capture = self.pg1.get_capture(len(pkts))
1420         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1421
1422         # from NAT44 inside to interface without NAT44 feature (no translation)
1423         pkts = self.create_stream_in(self.pg0, self.pg2)
1424         self.pg0.add_stream(pkts)
1425         self.pg_enable_capture(self.pg_interfaces)
1426         self.pg_start()
1427         capture = self.pg2.get_capture(len(pkts))
1428         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1429
1430         # in2out 1st interface
1431         pkts = self.create_stream_in(self.pg0, self.pg3)
1432         self.pg0.add_stream(pkts)
1433         self.pg_enable_capture(self.pg_interfaces)
1434         self.pg_start()
1435         capture = self.pg3.get_capture(len(pkts))
1436         self.verify_capture_out(capture)
1437
1438         # out2in 1st interface
1439         pkts = self.create_stream_out(self.pg3)
1440         self.pg3.add_stream(pkts)
1441         self.pg_enable_capture(self.pg_interfaces)
1442         self.pg_start()
1443         capture = self.pg0.get_capture(len(pkts))
1444         self.verify_capture_in(capture, self.pg0)
1445
1446         # in2out 2nd interface
1447         pkts = self.create_stream_in(self.pg1, self.pg3)
1448         self.pg1.add_stream(pkts)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg3.get_capture(len(pkts))
1452         self.verify_capture_out(capture)
1453
1454         # out2in 2nd interface
1455         pkts = self.create_stream_out(self.pg3)
1456         self.pg3.add_stream(pkts)
1457         self.pg_enable_capture(self.pg_interfaces)
1458         self.pg_start()
1459         capture = self.pg1.get_capture(len(pkts))
1460         self.verify_capture_in(capture, self.pg1)
1461
1462     def test_inside_overlapping_interfaces(self):
1463         """ NAT44 multiple inside interfaces with overlapping address space """
1464
1465         static_nat_ip = "10.0.0.10"
1466         self.nat44_add_address(self.nat_addr)
1467         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1468                                                   is_inside=0)
1469         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1470         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1471         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1472         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1473                                       vrf_id=20)
1474
1475         # between NAT44 inside interfaces with same VRF (no translation)
1476         pkts = self.create_stream_in(self.pg4, self.pg5)
1477         self.pg4.add_stream(pkts)
1478         self.pg_enable_capture(self.pg_interfaces)
1479         self.pg_start()
1480         capture = self.pg5.get_capture(len(pkts))
1481         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1482
1483         # between NAT44 inside interfaces with different VRF (hairpinning)
1484         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1485              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1486              TCP(sport=1234, dport=5678))
1487         self.pg4.add_stream(p)
1488         self.pg_enable_capture(self.pg_interfaces)
1489         self.pg_start()
1490         capture = self.pg6.get_capture(1)
1491         p = capture[0]
1492         try:
1493             ip = p[IP]
1494             tcp = p[TCP]
1495             self.assertEqual(ip.src, self.nat_addr)
1496             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1497             self.assertNotEqual(tcp.sport, 1234)
1498             self.assertEqual(tcp.dport, 5678)
1499         except:
1500             self.logger.error(ppp("Unexpected or invalid packet:", p))
1501             raise
1502
1503         # in2out 1st interface
1504         pkts = self.create_stream_in(self.pg4, self.pg3)
1505         self.pg4.add_stream(pkts)
1506         self.pg_enable_capture(self.pg_interfaces)
1507         self.pg_start()
1508         capture = self.pg3.get_capture(len(pkts))
1509         self.verify_capture_out(capture)
1510
1511         # out2in 1st interface
1512         pkts = self.create_stream_out(self.pg3)
1513         self.pg3.add_stream(pkts)
1514         self.pg_enable_capture(self.pg_interfaces)
1515         self.pg_start()
1516         capture = self.pg4.get_capture(len(pkts))
1517         self.verify_capture_in(capture, self.pg4)
1518
1519         # in2out 2nd interface
1520         pkts = self.create_stream_in(self.pg5, self.pg3)
1521         self.pg5.add_stream(pkts)
1522         self.pg_enable_capture(self.pg_interfaces)
1523         self.pg_start()
1524         capture = self.pg3.get_capture(len(pkts))
1525         self.verify_capture_out(capture)
1526
1527         # out2in 2nd interface
1528         pkts = self.create_stream_out(self.pg3)
1529         self.pg3.add_stream(pkts)
1530         self.pg_enable_capture(self.pg_interfaces)
1531         self.pg_start()
1532         capture = self.pg5.get_capture(len(pkts))
1533         self.verify_capture_in(capture, self.pg5)
1534
1535         # pg5 session dump
1536         addresses = self.vapi.nat44_address_dump()
1537         self.assertEqual(len(addresses), 1)
1538         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1539         self.assertEqual(len(sessions), 3)
1540         for session in sessions:
1541             self.assertFalse(session.is_static)
1542             self.assertEqual(session.inside_ip_address[0:4],
1543                              self.pg5.remote_ip4n)
1544             self.assertEqual(session.outside_ip_address,
1545                              addresses[0].ip_address)
1546         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1547         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1548         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1549         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1550         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1551         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1552         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1553         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1554         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1555
1556         # in2out 3rd interface
1557         pkts = self.create_stream_in(self.pg6, self.pg3)
1558         self.pg6.add_stream(pkts)
1559         self.pg_enable_capture(self.pg_interfaces)
1560         self.pg_start()
1561         capture = self.pg3.get_capture(len(pkts))
1562         self.verify_capture_out(capture, static_nat_ip, True)
1563
1564         # out2in 3rd interface
1565         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1566         self.pg3.add_stream(pkts)
1567         self.pg_enable_capture(self.pg_interfaces)
1568         self.pg_start()
1569         capture = self.pg6.get_capture(len(pkts))
1570         self.verify_capture_in(capture, self.pg6)
1571
1572         # general user and session dump verifications
1573         users = self.vapi.nat44_user_dump()
1574         self.assertTrue(len(users) >= 3)
1575         addresses = self.vapi.nat44_address_dump()
1576         self.assertEqual(len(addresses), 1)
1577         for user in users:
1578             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1579                                                          user.vrf_id)
1580             for session in sessions:
1581                 self.assertEqual(user.ip_address, session.inside_ip_address)
1582                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1583                 self.assertTrue(session.protocol in
1584                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1585                                  IP_PROTOS.icmp])
1586
1587         # pg4 session dump
1588         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1589         self.assertTrue(len(sessions) >= 4)
1590         for session in sessions:
1591             self.assertFalse(session.is_static)
1592             self.assertEqual(session.inside_ip_address[0:4],
1593                              self.pg4.remote_ip4n)
1594             self.assertEqual(session.outside_ip_address,
1595                              addresses[0].ip_address)
1596
1597         # pg6 session dump
1598         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1599         self.assertTrue(len(sessions) >= 3)
1600         for session in sessions:
1601             self.assertTrue(session.is_static)
1602             self.assertEqual(session.inside_ip_address[0:4],
1603                              self.pg6.remote_ip4n)
1604             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1605                              map(int, static_nat_ip.split('.')))
1606             self.assertTrue(session.inside_port in
1607                             [self.tcp_port_in, self.udp_port_in,
1608                              self.icmp_id_in])
1609
1610     def test_hairpinning(self):
1611         """ NAT44 hairpinning - 1:1 NAPT """
1612
1613         host = self.pg0.remote_hosts[0]
1614         server = self.pg0.remote_hosts[1]
1615         host_in_port = 1234
1616         host_out_port = 0
1617         server_in_port = 5678
1618         server_out_port = 8765
1619
1620         self.nat44_add_address(self.nat_addr)
1621         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1622         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1623                                                   is_inside=0)
1624         # add static mapping for server
1625         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1626                                       server_in_port, server_out_port,
1627                                       proto=IP_PROTOS.tcp)
1628
1629         # send packet from host to server
1630         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1631              IP(src=host.ip4, dst=self.nat_addr) /
1632              TCP(sport=host_in_port, dport=server_out_port))
1633         self.pg0.add_stream(p)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         capture = self.pg0.get_capture(1)
1637         p = capture[0]
1638         try:
1639             ip = p[IP]
1640             tcp = p[TCP]
1641             self.assertEqual(ip.src, self.nat_addr)
1642             self.assertEqual(ip.dst, server.ip4)
1643             self.assertNotEqual(tcp.sport, host_in_port)
1644             self.assertEqual(tcp.dport, server_in_port)
1645             self.check_tcp_checksum(p)
1646             host_out_port = tcp.sport
1647         except:
1648             self.logger.error(ppp("Unexpected or invalid packet:", p))
1649             raise
1650
1651         # send reply from server to host
1652         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1653              IP(src=server.ip4, dst=self.nat_addr) /
1654              TCP(sport=server_in_port, dport=host_out_port))
1655         self.pg0.add_stream(p)
1656         self.pg_enable_capture(self.pg_interfaces)
1657         self.pg_start()
1658         capture = self.pg0.get_capture(1)
1659         p = capture[0]
1660         try:
1661             ip = p[IP]
1662             tcp = p[TCP]
1663             self.assertEqual(ip.src, self.nat_addr)
1664             self.assertEqual(ip.dst, host.ip4)
1665             self.assertEqual(tcp.sport, server_out_port)
1666             self.assertEqual(tcp.dport, host_in_port)
1667             self.check_tcp_checksum(p)
1668         except:
1669             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1670             raise
1671
1672     def test_hairpinning2(self):
1673         """ NAT44 hairpinning - 1:1 NAT"""
1674
1675         server1_nat_ip = "10.0.0.10"
1676         server2_nat_ip = "10.0.0.11"
1677         host = self.pg0.remote_hosts[0]
1678         server1 = self.pg0.remote_hosts[1]
1679         server2 = self.pg0.remote_hosts[2]
1680         server_tcp_port = 22
1681         server_udp_port = 20
1682
1683         self.nat44_add_address(self.nat_addr)
1684         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1685         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1686                                                   is_inside=0)
1687
1688         # add static mapping for servers
1689         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1690         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1691
1692         # host to server1
1693         pkts = []
1694         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1695              IP(src=host.ip4, dst=server1_nat_ip) /
1696              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1697         pkts.append(p)
1698         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1699              IP(src=host.ip4, dst=server1_nat_ip) /
1700              UDP(sport=self.udp_port_in, dport=server_udp_port))
1701         pkts.append(p)
1702         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1703              IP(src=host.ip4, dst=server1_nat_ip) /
1704              ICMP(id=self.icmp_id_in, type='echo-request'))
1705         pkts.append(p)
1706         self.pg0.add_stream(pkts)
1707         self.pg_enable_capture(self.pg_interfaces)
1708         self.pg_start()
1709         capture = self.pg0.get_capture(len(pkts))
1710         for packet in capture:
1711             try:
1712                 self.assertEqual(packet[IP].src, self.nat_addr)
1713                 self.assertEqual(packet[IP].dst, server1.ip4)
1714                 if packet.haslayer(TCP):
1715                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1716                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1717                     self.tcp_port_out = packet[TCP].sport
1718                     self.check_tcp_checksum(packet)
1719                 elif packet.haslayer(UDP):
1720                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1721                     self.assertEqual(packet[UDP].dport, server_udp_port)
1722                     self.udp_port_out = packet[UDP].sport
1723                 else:
1724                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1725                     self.icmp_id_out = packet[ICMP].id
1726             except:
1727                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1728                 raise
1729
1730         # server1 to host
1731         pkts = []
1732         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1733              IP(src=server1.ip4, dst=self.nat_addr) /
1734              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1735         pkts.append(p)
1736         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1737              IP(src=server1.ip4, dst=self.nat_addr) /
1738              UDP(sport=server_udp_port, dport=self.udp_port_out))
1739         pkts.append(p)
1740         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1741              IP(src=server1.ip4, dst=self.nat_addr) /
1742              ICMP(id=self.icmp_id_out, type='echo-reply'))
1743         pkts.append(p)
1744         self.pg0.add_stream(pkts)
1745         self.pg_enable_capture(self.pg_interfaces)
1746         self.pg_start()
1747         capture = self.pg0.get_capture(len(pkts))
1748         for packet in capture:
1749             try:
1750                 self.assertEqual(packet[IP].src, server1_nat_ip)
1751                 self.assertEqual(packet[IP].dst, host.ip4)
1752                 if packet.haslayer(TCP):
1753                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1754                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1755                     self.check_tcp_checksum(packet)
1756                 elif packet.haslayer(UDP):
1757                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1758                     self.assertEqual(packet[UDP].sport, server_udp_port)
1759                 else:
1760                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1761             except:
1762                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1763                 raise
1764
1765         # server2 to server1
1766         pkts = []
1767         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1768              IP(src=server2.ip4, dst=server1_nat_ip) /
1769              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1770         pkts.append(p)
1771         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1772              IP(src=server2.ip4, dst=server1_nat_ip) /
1773              UDP(sport=self.udp_port_in, dport=server_udp_port))
1774         pkts.append(p)
1775         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1776              IP(src=server2.ip4, dst=server1_nat_ip) /
1777              ICMP(id=self.icmp_id_in, type='echo-request'))
1778         pkts.append(p)
1779         self.pg0.add_stream(pkts)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782         capture = self.pg0.get_capture(len(pkts))
1783         for packet in capture:
1784             try:
1785                 self.assertEqual(packet[IP].src, server2_nat_ip)
1786                 self.assertEqual(packet[IP].dst, server1.ip4)
1787                 if packet.haslayer(TCP):
1788                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1789                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1790                     self.tcp_port_out = packet[TCP].sport
1791                     self.check_tcp_checksum(packet)
1792                 elif packet.haslayer(UDP):
1793                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1794                     self.assertEqual(packet[UDP].dport, server_udp_port)
1795                     self.udp_port_out = packet[UDP].sport
1796                 else:
1797                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1798                     self.icmp_id_out = packet[ICMP].id
1799             except:
1800                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1801                 raise
1802
1803         # server1 to server2
1804         pkts = []
1805         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1806              IP(src=server1.ip4, dst=server2_nat_ip) /
1807              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1808         pkts.append(p)
1809         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1810              IP(src=server1.ip4, dst=server2_nat_ip) /
1811              UDP(sport=server_udp_port, dport=self.udp_port_out))
1812         pkts.append(p)
1813         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1814              IP(src=server1.ip4, dst=server2_nat_ip) /
1815              ICMP(id=self.icmp_id_out, type='echo-reply'))
1816         pkts.append(p)
1817         self.pg0.add_stream(pkts)
1818         self.pg_enable_capture(self.pg_interfaces)
1819         self.pg_start()
1820         capture = self.pg0.get_capture(len(pkts))
1821         for packet in capture:
1822             try:
1823                 self.assertEqual(packet[IP].src, server1_nat_ip)
1824                 self.assertEqual(packet[IP].dst, server2.ip4)
1825                 if packet.haslayer(TCP):
1826                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1827                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1828                     self.check_tcp_checksum(packet)
1829                 elif packet.haslayer(UDP):
1830                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1831                     self.assertEqual(packet[UDP].sport, server_udp_port)
1832                 else:
1833                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1834             except:
1835                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1836                 raise
1837
1838     def test_max_translations_per_user(self):
1839         """ MAX translations per user - recycle the least recently used """
1840
1841         self.nat44_add_address(self.nat_addr)
1842         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1843         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1844                                                   is_inside=0)
1845
1846         # get maximum number of translations per user
1847         nat44_config = self.vapi.nat_show_config()
1848
1849         # send more than maximum number of translations per user packets
1850         pkts_num = nat44_config.max_translations_per_user + 5
1851         pkts = []
1852         for port in range(0, pkts_num):
1853             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1854                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1855                  TCP(sport=1025 + port))
1856             pkts.append(p)
1857         self.pg0.add_stream(pkts)
1858         self.pg_enable_capture(self.pg_interfaces)
1859         self.pg_start()
1860
1861         # verify number of translated packet
1862         self.pg1.get_capture(pkts_num)
1863
1864     def test_interface_addr(self):
1865         """ Acquire NAT44 addresses from interface """
1866         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1867
1868         # no address in NAT pool
1869         adresses = self.vapi.nat44_address_dump()
1870         self.assertEqual(0, len(adresses))
1871
1872         # configure interface address and check NAT address pool
1873         self.pg7.config_ip4()
1874         adresses = self.vapi.nat44_address_dump()
1875         self.assertEqual(1, len(adresses))
1876         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1877
1878         # remove interface address and check NAT address pool
1879         self.pg7.unconfig_ip4()
1880         adresses = self.vapi.nat44_address_dump()
1881         self.assertEqual(0, len(adresses))
1882
1883     def test_interface_addr_static_mapping(self):
1884         """ Static mapping with addresses from interface """
1885         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1886         self.nat44_add_static_mapping(
1887             '1.2.3.4',
1888             external_sw_if_index=self.pg7.sw_if_index)
1889
1890         # static mappings with external interface
1891         static_mappings = self.vapi.nat44_static_mapping_dump()
1892         self.assertEqual(1, len(static_mappings))
1893         self.assertEqual(self.pg7.sw_if_index,
1894                          static_mappings[0].external_sw_if_index)
1895
1896         # configure interface address and check static mappings
1897         self.pg7.config_ip4()
1898         static_mappings = self.vapi.nat44_static_mapping_dump()
1899         self.assertEqual(1, len(static_mappings))
1900         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1901                          self.pg7.local_ip4n)
1902         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1903
1904         # remove interface address and check static mappings
1905         self.pg7.unconfig_ip4()
1906         static_mappings = self.vapi.nat44_static_mapping_dump()
1907         self.assertEqual(0, len(static_mappings))
1908
1909     def test_interface_addr_identity_nat(self):
1910         """ Identity NAT with addresses from interface """
1911
1912         port = 53053
1913         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1914         self.vapi.nat44_add_del_identity_mapping(
1915             sw_if_index=self.pg7.sw_if_index,
1916             port=port,
1917             protocol=IP_PROTOS.tcp,
1918             addr_only=0)
1919
1920         # identity mappings with external interface
1921         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1922         self.assertEqual(1, len(identity_mappings))
1923         self.assertEqual(self.pg7.sw_if_index,
1924                          identity_mappings[0].sw_if_index)
1925
1926         # configure interface address and check identity mappings
1927         self.pg7.config_ip4()
1928         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1929         self.assertEqual(1, len(identity_mappings))
1930         self.assertEqual(identity_mappings[0].ip_address,
1931                          self.pg7.local_ip4n)
1932         self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
1933         self.assertEqual(port, identity_mappings[0].port)
1934         self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
1935
1936         # remove interface address and check identity mappings
1937         self.pg7.unconfig_ip4()
1938         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1939         self.assertEqual(0, len(identity_mappings))
1940
1941     def test_ipfix_nat44_sess(self):
1942         """ IPFIX logging NAT44 session created/delted """
1943         self.ipfix_domain_id = 10
1944         self.ipfix_src_port = 20202
1945         colector_port = 30303
1946         bind_layers(UDP, IPFIX, dport=30303)
1947         self.nat44_add_address(self.nat_addr)
1948         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1949         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1950                                                   is_inside=0)
1951         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1952                                      src_address=self.pg3.local_ip4n,
1953                                      path_mtu=512,
1954                                      template_interval=10,
1955                                      collector_port=colector_port)
1956         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1957                             src_port=self.ipfix_src_port)
1958
1959         pkts = self.create_stream_in(self.pg0, self.pg1)
1960         self.pg0.add_stream(pkts)
1961         self.pg_enable_capture(self.pg_interfaces)
1962         self.pg_start()
1963         capture = self.pg1.get_capture(len(pkts))
1964         self.verify_capture_out(capture)
1965         self.nat44_add_address(self.nat_addr, is_add=0)
1966         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1967         capture = self.pg3.get_capture(3)
1968         ipfix = IPFIXDecoder()
1969         # first load template
1970         for p in capture:
1971             self.assertTrue(p.haslayer(IPFIX))
1972             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1973             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1974             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1975             self.assertEqual(p[UDP].dport, colector_port)
1976             self.assertEqual(p[IPFIX].observationDomainID,
1977                              self.ipfix_domain_id)
1978             if p.haslayer(Template):
1979                 ipfix.add_template(p.getlayer(Template))
1980         # verify events in data set
1981         for p in capture:
1982             if p.haslayer(Data):
1983                 data = ipfix.decode_data_set(p.getlayer(Set))
1984                 self.verify_ipfix_nat44_ses(data)
1985
1986     def test_ipfix_addr_exhausted(self):
1987         """ IPFIX logging NAT addresses exhausted """
1988         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1989         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1990                                                   is_inside=0)
1991         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1992                                      src_address=self.pg3.local_ip4n,
1993                                      path_mtu=512,
1994                                      template_interval=10)
1995         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1996                             src_port=self.ipfix_src_port)
1997
1998         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1999              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2000              TCP(sport=3025))
2001         self.pg0.add_stream(p)
2002         self.pg_enable_capture(self.pg_interfaces)
2003         self.pg_start()
2004         capture = self.pg1.get_capture(0)
2005         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2006         capture = self.pg3.get_capture(3)
2007         ipfix = IPFIXDecoder()
2008         # first load template
2009         for p in capture:
2010             self.assertTrue(p.haslayer(IPFIX))
2011             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2012             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2013             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2014             self.assertEqual(p[UDP].dport, 4739)
2015             self.assertEqual(p[IPFIX].observationDomainID,
2016                              self.ipfix_domain_id)
2017             if p.haslayer(Template):
2018                 ipfix.add_template(p.getlayer(Template))
2019         # verify events in data set
2020         for p in capture:
2021             if p.haslayer(Data):
2022                 data = ipfix.decode_data_set(p.getlayer(Set))
2023                 self.verify_ipfix_addr_exhausted(data)
2024
2025     def test_pool_addr_fib(self):
2026         """ NAT44 add pool addresses to FIB """
2027         static_addr = '10.0.0.10'
2028         self.nat44_add_address(self.nat_addr)
2029         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2030         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2031                                                   is_inside=0)
2032         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2033
2034         # NAT44 address
2035         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2036              ARP(op=ARP.who_has, pdst=self.nat_addr,
2037                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2038         self.pg1.add_stream(p)
2039         self.pg_enable_capture(self.pg_interfaces)
2040         self.pg_start()
2041         capture = self.pg1.get_capture(1)
2042         self.assertTrue(capture[0].haslayer(ARP))
2043         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2044
2045         # 1:1 NAT address
2046         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2047              ARP(op=ARP.who_has, pdst=static_addr,
2048                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2049         self.pg1.add_stream(p)
2050         self.pg_enable_capture(self.pg_interfaces)
2051         self.pg_start()
2052         capture = self.pg1.get_capture(1)
2053         self.assertTrue(capture[0].haslayer(ARP))
2054         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2055
2056         # send ARP to non-NAT44 interface
2057         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2058              ARP(op=ARP.who_has, pdst=self.nat_addr,
2059                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2060         self.pg2.add_stream(p)
2061         self.pg_enable_capture(self.pg_interfaces)
2062         self.pg_start()
2063         capture = self.pg1.get_capture(0)
2064
2065         # remove addresses and verify
2066         self.nat44_add_address(self.nat_addr, is_add=0)
2067         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2068                                       is_add=0)
2069
2070         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2071              ARP(op=ARP.who_has, pdst=self.nat_addr,
2072                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2073         self.pg1.add_stream(p)
2074         self.pg_enable_capture(self.pg_interfaces)
2075         self.pg_start()
2076         capture = self.pg1.get_capture(0)
2077
2078         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2079              ARP(op=ARP.who_has, pdst=static_addr,
2080                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2081         self.pg1.add_stream(p)
2082         self.pg_enable_capture(self.pg_interfaces)
2083         self.pg_start()
2084         capture = self.pg1.get_capture(0)
2085
2086     def test_vrf_mode(self):
2087         """ NAT44 tenant VRF aware address pool mode """
2088
2089         vrf_id1 = 1
2090         vrf_id2 = 2
2091         nat_ip1 = "10.0.0.10"
2092         nat_ip2 = "10.0.0.11"
2093
2094         self.pg0.unconfig_ip4()
2095         self.pg1.unconfig_ip4()
2096         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2097         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2098         self.pg0.set_table_ip4(vrf_id1)
2099         self.pg1.set_table_ip4(vrf_id2)
2100         self.pg0.config_ip4()
2101         self.pg1.config_ip4()
2102
2103         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2104         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2105         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2106         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2107         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2108                                                   is_inside=0)
2109
2110         # first VRF
2111         pkts = self.create_stream_in(self.pg0, self.pg2)
2112         self.pg0.add_stream(pkts)
2113         self.pg_enable_capture(self.pg_interfaces)
2114         self.pg_start()
2115         capture = self.pg2.get_capture(len(pkts))
2116         self.verify_capture_out(capture, nat_ip1)
2117
2118         # second VRF
2119         pkts = self.create_stream_in(self.pg1, self.pg2)
2120         self.pg1.add_stream(pkts)
2121         self.pg_enable_capture(self.pg_interfaces)
2122         self.pg_start()
2123         capture = self.pg2.get_capture(len(pkts))
2124         self.verify_capture_out(capture, nat_ip2)
2125
2126         self.pg0.unconfig_ip4()
2127         self.pg1.unconfig_ip4()
2128         self.pg0.set_table_ip4(0)
2129         self.pg1.set_table_ip4(0)
2130         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2131         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2132
2133     def test_vrf_feature_independent(self):
2134         """ NAT44 tenant VRF independent address pool mode """
2135
2136         nat_ip1 = "10.0.0.10"
2137         nat_ip2 = "10.0.0.11"
2138
2139         self.nat44_add_address(nat_ip1)
2140         self.nat44_add_address(nat_ip2, vrf_id=99)
2141         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2142         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2143         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2144                                                   is_inside=0)
2145
2146         # first VRF
2147         pkts = self.create_stream_in(self.pg0, self.pg2)
2148         self.pg0.add_stream(pkts)
2149         self.pg_enable_capture(self.pg_interfaces)
2150         self.pg_start()
2151         capture = self.pg2.get_capture(len(pkts))
2152         self.verify_capture_out(capture, nat_ip1)
2153
2154         # second VRF
2155         pkts = self.create_stream_in(self.pg1, self.pg2)
2156         self.pg1.add_stream(pkts)
2157         self.pg_enable_capture(self.pg_interfaces)
2158         self.pg_start()
2159         capture = self.pg2.get_capture(len(pkts))
2160         self.verify_capture_out(capture, nat_ip1)
2161
2162     def test_dynamic_ipless_interfaces(self):
2163         """ NAT44 interfaces without configured IP address """
2164
2165         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2166                                       mactobinary(self.pg7.remote_mac),
2167                                       self.pg7.remote_ip4n,
2168                                       is_static=1)
2169         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2170                                       mactobinary(self.pg8.remote_mac),
2171                                       self.pg8.remote_ip4n,
2172                                       is_static=1)
2173
2174         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2175                                    dst_address_length=32,
2176                                    next_hop_address=self.pg7.remote_ip4n,
2177                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2178         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2179                                    dst_address_length=32,
2180                                    next_hop_address=self.pg8.remote_ip4n,
2181                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2182
2183         self.nat44_add_address(self.nat_addr)
2184         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2185         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2186                                                   is_inside=0)
2187
2188         # in2out
2189         pkts = self.create_stream_in(self.pg7, self.pg8)
2190         self.pg7.add_stream(pkts)
2191         self.pg_enable_capture(self.pg_interfaces)
2192         self.pg_start()
2193         capture = self.pg8.get_capture(len(pkts))
2194         self.verify_capture_out(capture)
2195
2196         # out2in
2197         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2198         self.pg8.add_stream(pkts)
2199         self.pg_enable_capture(self.pg_interfaces)
2200         self.pg_start()
2201         capture = self.pg7.get_capture(len(pkts))
2202         self.verify_capture_in(capture, self.pg7)
2203
2204     def test_static_ipless_interfaces(self):
2205         """ NAT44 interfaces without configured IP address - 1:1 NAT """
2206
2207         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2208                                       mactobinary(self.pg7.remote_mac),
2209                                       self.pg7.remote_ip4n,
2210                                       is_static=1)
2211         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2212                                       mactobinary(self.pg8.remote_mac),
2213                                       self.pg8.remote_ip4n,
2214                                       is_static=1)
2215
2216         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2217                                    dst_address_length=32,
2218                                    next_hop_address=self.pg7.remote_ip4n,
2219                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2220         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2221                                    dst_address_length=32,
2222                                    next_hop_address=self.pg8.remote_ip4n,
2223                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2224
2225         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2226         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2227         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2228                                                   is_inside=0)
2229
2230         # out2in
2231         pkts = self.create_stream_out(self.pg8)
2232         self.pg8.add_stream(pkts)
2233         self.pg_enable_capture(self.pg_interfaces)
2234         self.pg_start()
2235         capture = self.pg7.get_capture(len(pkts))
2236         self.verify_capture_in(capture, self.pg7)
2237
2238         # in2out
2239         pkts = self.create_stream_in(self.pg7, self.pg8)
2240         self.pg7.add_stream(pkts)
2241         self.pg_enable_capture(self.pg_interfaces)
2242         self.pg_start()
2243         capture = self.pg8.get_capture(len(pkts))
2244         self.verify_capture_out(capture, self.nat_addr, True)
2245
2246     def test_static_with_port_ipless_interfaces(self):
2247         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2248
2249         self.tcp_port_out = 30606
2250         self.udp_port_out = 30607
2251         self.icmp_id_out = 30608
2252
2253         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2254                                       mactobinary(self.pg7.remote_mac),
2255                                       self.pg7.remote_ip4n,
2256                                       is_static=1)
2257         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2258                                       mactobinary(self.pg8.remote_mac),
2259                                       self.pg8.remote_ip4n,
2260                                       is_static=1)
2261
2262         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2263                                    dst_address_length=32,
2264                                    next_hop_address=self.pg7.remote_ip4n,
2265                                    next_hop_sw_if_index=self.pg7.sw_if_index)
2266         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2267                                    dst_address_length=32,
2268                                    next_hop_address=self.pg8.remote_ip4n,
2269                                    next_hop_sw_if_index=self.pg8.sw_if_index)
2270
2271         self.nat44_add_address(self.nat_addr)
2272         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2273                                       self.tcp_port_in, self.tcp_port_out,
2274                                       proto=IP_PROTOS.tcp)
2275         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2276                                       self.udp_port_in, self.udp_port_out,
2277                                       proto=IP_PROTOS.udp)
2278         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2279                                       self.icmp_id_in, self.icmp_id_out,
2280                                       proto=IP_PROTOS.icmp)
2281         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2282         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2283                                                   is_inside=0)
2284
2285         # out2in
2286         pkts = self.create_stream_out(self.pg8)
2287         self.pg8.add_stream(pkts)
2288         self.pg_enable_capture(self.pg_interfaces)
2289         self.pg_start()
2290         capture = self.pg7.get_capture(len(pkts))
2291         self.verify_capture_in(capture, self.pg7)
2292
2293         # in2out
2294         pkts = self.create_stream_in(self.pg7, self.pg8)
2295         self.pg7.add_stream(pkts)
2296         self.pg_enable_capture(self.pg_interfaces)
2297         self.pg_start()
2298         capture = self.pg8.get_capture(len(pkts))
2299         self.verify_capture_out(capture)
2300
2301     def test_static_unknown_proto(self):
2302         """ 1:1 NAT translate packet with unknown protocol """
2303         nat_ip = "10.0.0.10"
2304         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2305         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2307                                                   is_inside=0)
2308
2309         # in2out
2310         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2311              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2312              GRE() /
2313              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2314              TCP(sport=1234, dport=1234))
2315         self.pg0.add_stream(p)
2316         self.pg_enable_capture(self.pg_interfaces)
2317         self.pg_start()
2318         p = self.pg1.get_capture(1)
2319         packet = p[0]
2320         try:
2321             self.assertEqual(packet[IP].src, nat_ip)
2322             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2323             self.assertTrue(packet.haslayer(GRE))
2324             self.check_ip_checksum(packet)
2325         except:
2326             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2327             raise
2328
2329         # out2in
2330         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2331              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2332              GRE() /
2333              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2334              TCP(sport=1234, dport=1234))
2335         self.pg1.add_stream(p)
2336         self.pg_enable_capture(self.pg_interfaces)
2337         self.pg_start()
2338         p = self.pg0.get_capture(1)
2339         packet = p[0]
2340         try:
2341             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2342             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2343             self.assertTrue(packet.haslayer(GRE))
2344             self.check_ip_checksum(packet)
2345         except:
2346             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2347             raise
2348
2349     def test_hairpinning_static_unknown_proto(self):
2350         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2351
2352         host = self.pg0.remote_hosts[0]
2353         server = self.pg0.remote_hosts[1]
2354
2355         host_nat_ip = "10.0.0.10"
2356         server_nat_ip = "10.0.0.11"
2357
2358         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2359         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2360         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2361         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2362                                                   is_inside=0)
2363
2364         # host to server
2365         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2366              IP(src=host.ip4, dst=server_nat_ip) /
2367              GRE() /
2368              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2369              TCP(sport=1234, dport=1234))
2370         self.pg0.add_stream(p)
2371         self.pg_enable_capture(self.pg_interfaces)
2372         self.pg_start()
2373         p = self.pg0.get_capture(1)
2374         packet = p[0]
2375         try:
2376             self.assertEqual(packet[IP].src, host_nat_ip)
2377             self.assertEqual(packet[IP].dst, server.ip4)
2378             self.assertTrue(packet.haslayer(GRE))
2379             self.check_ip_checksum(packet)
2380         except:
2381             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2382             raise
2383
2384         # server to host
2385         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2386              IP(src=server.ip4, dst=host_nat_ip) /
2387              GRE() /
2388              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2389              TCP(sport=1234, dport=1234))
2390         self.pg0.add_stream(p)
2391         self.pg_enable_capture(self.pg_interfaces)
2392         self.pg_start()
2393         p = self.pg0.get_capture(1)
2394         packet = p[0]
2395         try:
2396             self.assertEqual(packet[IP].src, server_nat_ip)
2397             self.assertEqual(packet[IP].dst, host.ip4)
2398             self.assertTrue(packet.haslayer(GRE))
2399             self.check_ip_checksum(packet)
2400         except:
2401             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2402             raise
2403
2404     def test_unknown_proto(self):
2405         """ NAT44 translate packet with unknown protocol """
2406         self.nat44_add_address(self.nat_addr)
2407         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2408         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2409                                                   is_inside=0)
2410
2411         # in2out
2412         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2413              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2414              TCP(sport=self.tcp_port_in, dport=20))
2415         self.pg0.add_stream(p)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         p = self.pg1.get_capture(1)
2419
2420         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2421              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2422              GRE() /
2423              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2424              TCP(sport=1234, dport=1234))
2425         self.pg0.add_stream(p)
2426         self.pg_enable_capture(self.pg_interfaces)
2427         self.pg_start()
2428         p = self.pg1.get_capture(1)
2429         packet = p[0]
2430         try:
2431             self.assertEqual(packet[IP].src, self.nat_addr)
2432             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2433             self.assertTrue(packet.haslayer(GRE))
2434             self.check_ip_checksum(packet)
2435         except:
2436             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2437             raise
2438
2439         # out2in
2440         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2441              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2442              GRE() /
2443              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2444              TCP(sport=1234, dport=1234))
2445         self.pg1.add_stream(p)
2446         self.pg_enable_capture(self.pg_interfaces)
2447         self.pg_start()
2448         p = self.pg0.get_capture(1)
2449         packet = p[0]
2450         try:
2451             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2452             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2453             self.assertTrue(packet.haslayer(GRE))
2454             self.check_ip_checksum(packet)
2455         except:
2456             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2457             raise
2458
2459     def test_hairpinning_unknown_proto(self):
2460         """ NAT44 translate packet with unknown protocol - hairpinning """
2461         host = self.pg0.remote_hosts[0]
2462         server = self.pg0.remote_hosts[1]
2463         host_in_port = 1234
2464         host_out_port = 0
2465         server_in_port = 5678
2466         server_out_port = 8765
2467         server_nat_ip = "10.0.0.11"
2468
2469         self.nat44_add_address(self.nat_addr)
2470         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2471         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2472                                                   is_inside=0)
2473
2474         # add static mapping for server
2475         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2476
2477         # host to server
2478         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2479              IP(src=host.ip4, dst=server_nat_ip) /
2480              TCP(sport=host_in_port, dport=server_out_port))
2481         self.pg0.add_stream(p)
2482         self.pg_enable_capture(self.pg_interfaces)
2483         self.pg_start()
2484         capture = self.pg0.get_capture(1)
2485
2486         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2487              IP(src=host.ip4, dst=server_nat_ip) /
2488              GRE() /
2489              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2490              TCP(sport=1234, dport=1234))
2491         self.pg0.add_stream(p)
2492         self.pg_enable_capture(self.pg_interfaces)
2493         self.pg_start()
2494         p = self.pg0.get_capture(1)
2495         packet = p[0]
2496         try:
2497             self.assertEqual(packet[IP].src, self.nat_addr)
2498             self.assertEqual(packet[IP].dst, server.ip4)
2499             self.assertTrue(packet.haslayer(GRE))
2500             self.check_ip_checksum(packet)
2501         except:
2502             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2503             raise
2504
2505         # server to host
2506         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2507              IP(src=server.ip4, dst=self.nat_addr) /
2508              GRE() /
2509              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2510              TCP(sport=1234, dport=1234))
2511         self.pg0.add_stream(p)
2512         self.pg_enable_capture(self.pg_interfaces)
2513         self.pg_start()
2514         p = self.pg0.get_capture(1)
2515         packet = p[0]
2516         try:
2517             self.assertEqual(packet[IP].src, server_nat_ip)
2518             self.assertEqual(packet[IP].dst, host.ip4)
2519             self.assertTrue(packet.haslayer(GRE))
2520             self.check_ip_checksum(packet)
2521         except:
2522             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2523             raise
2524
2525     def test_output_feature(self):
2526         """ NAT44 interface output feature (in2out postrouting) """
2527         self.nat44_add_address(self.nat_addr)
2528         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2529         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2530         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2531                                                          is_inside=0)
2532
2533         # in2out
2534         pkts = self.create_stream_in(self.pg0, self.pg3)
2535         self.pg0.add_stream(pkts)
2536         self.pg_enable_capture(self.pg_interfaces)
2537         self.pg_start()
2538         capture = self.pg3.get_capture(len(pkts))
2539         self.verify_capture_out(capture)
2540
2541         # out2in
2542         pkts = self.create_stream_out(self.pg3)
2543         self.pg3.add_stream(pkts)
2544         self.pg_enable_capture(self.pg_interfaces)
2545         self.pg_start()
2546         capture = self.pg0.get_capture(len(pkts))
2547         self.verify_capture_in(capture, self.pg0)
2548
2549         # from non-NAT interface to NAT inside interface
2550         pkts = self.create_stream_in(self.pg2, self.pg0)
2551         self.pg2.add_stream(pkts)
2552         self.pg_enable_capture(self.pg_interfaces)
2553         self.pg_start()
2554         capture = self.pg0.get_capture(len(pkts))
2555         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2556
2557     def test_output_feature_vrf_aware(self):
2558         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2559         nat_ip_vrf10 = "10.0.0.10"
2560         nat_ip_vrf20 = "10.0.0.20"
2561
2562         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2563                                    dst_address_length=32,
2564                                    next_hop_address=self.pg3.remote_ip4n,
2565                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2566                                    table_id=10)
2567         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2568                                    dst_address_length=32,
2569                                    next_hop_address=self.pg3.remote_ip4n,
2570                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2571                                    table_id=20)
2572
2573         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2574         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2575         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2576         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2577         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2578                                                          is_inside=0)
2579
2580         # in2out VRF 10
2581         pkts = self.create_stream_in(self.pg4, self.pg3)
2582         self.pg4.add_stream(pkts)
2583         self.pg_enable_capture(self.pg_interfaces)
2584         self.pg_start()
2585         capture = self.pg3.get_capture(len(pkts))
2586         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2587
2588         # out2in VRF 10
2589         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2590         self.pg3.add_stream(pkts)
2591         self.pg_enable_capture(self.pg_interfaces)
2592         self.pg_start()
2593         capture = self.pg4.get_capture(len(pkts))
2594         self.verify_capture_in(capture, self.pg4)
2595
2596         # in2out VRF 20
2597         pkts = self.create_stream_in(self.pg6, self.pg3)
2598         self.pg6.add_stream(pkts)
2599         self.pg_enable_capture(self.pg_interfaces)
2600         self.pg_start()
2601         capture = self.pg3.get_capture(len(pkts))
2602         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2603
2604         # out2in VRF 20
2605         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2606         self.pg3.add_stream(pkts)
2607         self.pg_enable_capture(self.pg_interfaces)
2608         self.pg_start()
2609         capture = self.pg6.get_capture(len(pkts))
2610         self.verify_capture_in(capture, self.pg6)
2611
2612     def test_output_feature_hairpinning(self):
2613         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2614         host = self.pg0.remote_hosts[0]
2615         server = self.pg0.remote_hosts[1]
2616         host_in_port = 1234
2617         host_out_port = 0
2618         server_in_port = 5678
2619         server_out_port = 8765
2620
2621         self.nat44_add_address(self.nat_addr)
2622         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2623         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2624                                                          is_inside=0)
2625
2626         # add static mapping for server
2627         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2628                                       server_in_port, server_out_port,
2629                                       proto=IP_PROTOS.tcp)
2630
2631         # send packet from host to server
2632         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2633              IP(src=host.ip4, dst=self.nat_addr) /
2634              TCP(sport=host_in_port, dport=server_out_port))
2635         self.pg0.add_stream(p)
2636         self.pg_enable_capture(self.pg_interfaces)
2637         self.pg_start()
2638         capture = self.pg0.get_capture(1)
2639         p = capture[0]
2640         try:
2641             ip = p[IP]
2642             tcp = p[TCP]
2643             self.assertEqual(ip.src, self.nat_addr)
2644             self.assertEqual(ip.dst, server.ip4)
2645             self.assertNotEqual(tcp.sport, host_in_port)
2646             self.assertEqual(tcp.dport, server_in_port)
2647             self.check_tcp_checksum(p)
2648             host_out_port = tcp.sport
2649         except:
2650             self.logger.error(ppp("Unexpected or invalid packet:", p))
2651             raise
2652
2653         # send reply from server to host
2654         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2655              IP(src=server.ip4, dst=self.nat_addr) /
2656              TCP(sport=server_in_port, dport=host_out_port))
2657         self.pg0.add_stream(p)
2658         self.pg_enable_capture(self.pg_interfaces)
2659         self.pg_start()
2660         capture = self.pg0.get_capture(1)
2661         p = capture[0]
2662         try:
2663             ip = p[IP]
2664             tcp = p[TCP]
2665             self.assertEqual(ip.src, self.nat_addr)
2666             self.assertEqual(ip.dst, host.ip4)
2667             self.assertEqual(tcp.sport, server_out_port)
2668             self.assertEqual(tcp.dport, host_in_port)
2669             self.check_tcp_checksum(p)
2670         except:
2671             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2672             raise
2673
2674     def test_one_armed_nat44(self):
2675         """ One armed NAT44 """
2676         remote_host = self.pg9.remote_hosts[0]
2677         local_host = self.pg9.remote_hosts[1]
2678         external_port = 0
2679
2680         self.nat44_add_address(self.nat_addr)
2681         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2682         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2683                                                   is_inside=0)
2684
2685         # in2out
2686         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2687              IP(src=local_host.ip4, dst=remote_host.ip4) /
2688              TCP(sport=12345, dport=80))
2689         self.pg9.add_stream(p)
2690         self.pg_enable_capture(self.pg_interfaces)
2691         self.pg_start()
2692         capture = self.pg9.get_capture(1)
2693         p = capture[0]
2694         try:
2695             ip = p[IP]
2696             tcp = p[TCP]
2697             self.assertEqual(ip.src, self.nat_addr)
2698             self.assertEqual(ip.dst, remote_host.ip4)
2699             self.assertNotEqual(tcp.sport, 12345)
2700             external_port = tcp.sport
2701             self.assertEqual(tcp.dport, 80)
2702             self.check_tcp_checksum(p)
2703             self.check_ip_checksum(p)
2704         except:
2705             self.logger.error(ppp("Unexpected or invalid packet:", p))
2706             raise
2707
2708         # out2in
2709         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2710              IP(src=remote_host.ip4, dst=self.nat_addr) /
2711              TCP(sport=80, dport=external_port))
2712         self.pg9.add_stream(p)
2713         self.pg_enable_capture(self.pg_interfaces)
2714         self.pg_start()
2715         capture = self.pg9.get_capture(1)
2716         p = capture[0]
2717         try:
2718             ip = p[IP]
2719             tcp = p[TCP]
2720             self.assertEqual(ip.src, remote_host.ip4)
2721             self.assertEqual(ip.dst, local_host.ip4)
2722             self.assertEqual(tcp.sport, 80)
2723             self.assertEqual(tcp.dport, 12345)
2724             self.check_tcp_checksum(p)
2725             self.check_ip_checksum(p)
2726         except:
2727             self.logger.error(ppp("Unexpected or invalid packet:", p))
2728             raise
2729
2730     def test_del_session(self):
2731         """ Delete NAT44 session """
2732         self.nat44_add_address(self.nat_addr)
2733         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2734         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2735                                                   is_inside=0)
2736
2737         pkts = self.create_stream_in(self.pg0, self.pg1)
2738         self.pg0.add_stream(pkts)
2739         self.pg_enable_capture(self.pg_interfaces)
2740         self.pg_start()
2741         capture = self.pg1.get_capture(len(pkts))
2742
2743         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2744         nsessions = len(sessions)
2745
2746         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2747                                     sessions[0].inside_port,
2748                                     sessions[0].protocol)
2749         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2750                                     sessions[1].outside_port,
2751                                     sessions[1].protocol,
2752                                     is_in=0)
2753
2754         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2755         self.assertEqual(nsessions - len(sessions), 2)
2756
2757     def test_set_get_reass(self):
2758         """ NAT44 set/get virtual fragmentation reassembly """
2759         reas_cfg1 = self.vapi.nat_get_reass()
2760
2761         self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2762                                 max_reass=reas_cfg1.ip4_max_reass * 2,
2763                                 max_frag=reas_cfg1.ip4_max_frag * 2)
2764
2765         reas_cfg2 = self.vapi.nat_get_reass()
2766
2767         self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2768         self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2769         self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2770
2771         self.vapi.nat_set_reass(drop_frag=1)
2772         self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2773
2774     def test_frag_in_order(self):
2775         """ NAT44 translate fragments arriving in order """
2776         self.nat44_add_address(self.nat_addr)
2777         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2778         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2779                                                   is_inside=0)
2780
2781         data = "A" * 4 + "B" * 16 + "C" * 3
2782         self.tcp_port_in = random.randint(1025, 65535)
2783
2784         reass = self.vapi.nat_reass_dump()
2785         reass_n_start = len(reass)
2786
2787         # in2out
2788         pkts = self.create_stream_frag(self.pg0,
2789                                        self.pg1.remote_ip4,
2790                                        self.tcp_port_in,
2791                                        20,
2792                                        data)
2793         self.pg0.add_stream(pkts)
2794         self.pg_enable_capture(self.pg_interfaces)
2795         self.pg_start()
2796         frags = self.pg1.get_capture(len(pkts))
2797         p = self.reass_frags_and_verify(frags,
2798                                         self.nat_addr,
2799                                         self.pg1.remote_ip4)
2800         self.assertEqual(p[TCP].dport, 20)
2801         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2802         self.tcp_port_out = p[TCP].sport
2803         self.assertEqual(data, p[Raw].load)
2804
2805         # out2in
2806         pkts = self.create_stream_frag(self.pg1,
2807                                        self.nat_addr,
2808                                        20,
2809                                        self.tcp_port_out,
2810                                        data)
2811         self.pg1.add_stream(pkts)
2812         self.pg_enable_capture(self.pg_interfaces)
2813         self.pg_start()
2814         frags = self.pg0.get_capture(len(pkts))
2815         p = self.reass_frags_and_verify(frags,
2816                                         self.pg1.remote_ip4,
2817                                         self.pg0.remote_ip4)
2818         self.assertEqual(p[TCP].sport, 20)
2819         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2820         self.assertEqual(data, p[Raw].load)
2821
2822         reass = self.vapi.nat_reass_dump()
2823         reass_n_end = len(reass)
2824
2825         self.assertEqual(reass_n_end - reass_n_start, 2)
2826
2827     def test_reass_hairpinning(self):
2828         """ NAT44 fragments hairpinning """
2829         host = self.pg0.remote_hosts[0]
2830         server = self.pg0.remote_hosts[1]
2831         host_in_port = random.randint(1025, 65535)
2832         host_out_port = 0
2833         server_in_port = random.randint(1025, 65535)
2834         server_out_port = random.randint(1025, 65535)
2835         data = "A" * 4 + "B" * 16 + "C" * 3
2836
2837         self.nat44_add_address(self.nat_addr)
2838         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2839         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2840                                                   is_inside=0)
2841         # add static mapping for server
2842         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2843                                       server_in_port, server_out_port,
2844                                       proto=IP_PROTOS.tcp)
2845
2846         # send packet from host to server
2847         pkts = self.create_stream_frag(self.pg0,
2848                                        self.nat_addr,
2849                                        host_in_port,
2850                                        server_out_port,
2851                                        data)
2852         self.pg0.add_stream(pkts)
2853         self.pg_enable_capture(self.pg_interfaces)
2854         self.pg_start()
2855         frags = self.pg0.get_capture(len(pkts))
2856         p = self.reass_frags_and_verify(frags,
2857                                         self.nat_addr,
2858                                         server.ip4)
2859         self.assertNotEqual(p[TCP].sport, host_in_port)
2860         self.assertEqual(p[TCP].dport, server_in_port)
2861         self.assertEqual(data, p[Raw].load)
2862
2863     def test_frag_out_of_order(self):
2864         """ NAT44 translate fragments arriving out of order """
2865         self.nat44_add_address(self.nat_addr)
2866         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2867         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2868                                                   is_inside=0)
2869
2870         data = "A" * 4 + "B" * 16 + "C" * 3
2871         random.randint(1025, 65535)
2872
2873         # in2out
2874         pkts = self.create_stream_frag(self.pg0,
2875                                        self.pg1.remote_ip4,
2876                                        self.tcp_port_in,
2877                                        20,
2878                                        data)
2879         pkts.reverse()
2880         self.pg0.add_stream(pkts)
2881         self.pg_enable_capture(self.pg_interfaces)
2882         self.pg_start()
2883         frags = self.pg1.get_capture(len(pkts))
2884         p = self.reass_frags_and_verify(frags,
2885                                         self.nat_addr,
2886                                         self.pg1.remote_ip4)
2887         self.assertEqual(p[TCP].dport, 20)
2888         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2889         self.tcp_port_out = p[TCP].sport
2890         self.assertEqual(data, p[Raw].load)
2891
2892         # out2in
2893         pkts = self.create_stream_frag(self.pg1,
2894                                        self.nat_addr,
2895                                        20,
2896                                        self.tcp_port_out,
2897                                        data)
2898         pkts.reverse()
2899         self.pg1.add_stream(pkts)
2900         self.pg_enable_capture(self.pg_interfaces)
2901         self.pg_start()
2902         frags = self.pg0.get_capture(len(pkts))
2903         p = self.reass_frags_and_verify(frags,
2904                                         self.pg1.remote_ip4,
2905                                         self.pg0.remote_ip4)
2906         self.assertEqual(p[TCP].sport, 20)
2907         self.assertEqual(p[TCP].dport, self.tcp_port_in)
2908         self.assertEqual(data, p[Raw].load)
2909
2910     def test_port_restricted(self):
2911         """ Port restricted NAT44 (MAP-E CE) """
2912         self.nat44_add_address(self.nat_addr)
2913         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2914         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2915                                                   is_inside=0)
2916         self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
2917                       "psid-offset 6 psid-len 6")
2918
2919         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2920              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2921              TCP(sport=4567, dport=22))
2922         self.pg0.add_stream(p)
2923         self.pg_enable_capture(self.pg_interfaces)
2924         self.pg_start()
2925         capture = self.pg1.get_capture(1)
2926         p = capture[0]
2927         try:
2928             ip = p[IP]
2929             tcp = p[TCP]
2930             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2931             self.assertEqual(ip.src, self.nat_addr)
2932             self.assertEqual(tcp.dport, 22)
2933             self.assertNotEqual(tcp.sport, 4567)
2934             self.assertEqual((tcp.sport >> 6) & 63, 10)
2935             self.check_tcp_checksum(p)
2936             self.check_ip_checksum(p)
2937         except:
2938             self.logger.error(ppp("Unexpected or invalid packet:", p))
2939             raise
2940
2941     def test_twice_nat(self):
2942         """ Twice NAT44 """
2943         twice_nat_addr = '10.0.1.3'
2944         port_in = 8080
2945         port_out = 80
2946         eh_port_out = 4567
2947         eh_port_in = 0
2948         self.nat44_add_address(self.nat_addr)
2949         self.nat44_add_address(twice_nat_addr, twice_nat=1)
2950         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2951                                       port_in, port_out, proto=IP_PROTOS.tcp,
2952                                       twice_nat=1)
2953         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2954         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2955                                                   is_inside=0)
2956
2957         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2958              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2959              TCP(sport=eh_port_out, dport=port_out))
2960         self.pg1.add_stream(p)
2961         self.pg_enable_capture(self.pg_interfaces)
2962         self.pg_start()
2963         capture = self.pg0.get_capture(1)
2964         p = capture[0]
2965         try:
2966             ip = p[IP]
2967             tcp = p[TCP]
2968             self.assertEqual(ip.dst, self.pg0.remote_ip4)
2969             self.assertEqual(ip.src, twice_nat_addr)
2970             self.assertEqual(tcp.dport, port_in)
2971             self.assertNotEqual(tcp.sport, eh_port_out)
2972             eh_port_in = tcp.sport
2973             self.check_tcp_checksum(p)
2974             self.check_ip_checksum(p)
2975         except:
2976             self.logger.error(ppp("Unexpected or invalid packet:", p))
2977             raise
2978
2979         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2980              IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
2981              TCP(sport=port_in, dport=eh_port_in))
2982         self.pg0.add_stream(p)
2983         self.pg_enable_capture(self.pg_interfaces)
2984         self.pg_start()
2985         capture = self.pg1.get_capture(1)
2986         p = capture[0]
2987         try:
2988             ip = p[IP]
2989             tcp = p[TCP]
2990             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2991             self.assertEqual(ip.src, self.nat_addr)
2992             self.assertEqual(tcp.dport, eh_port_out)
2993             self.assertEqual(tcp.sport, port_out)
2994             self.check_tcp_checksum(p)
2995             self.check_ip_checksum(p)
2996         except:
2997             self.logger.error(ppp("Unexpected or invalid packet:", p))
2998             raise
2999
3000     def test_twice_nat_lb(self):
3001         """ Twice NAT44 local service load balancing """
3002         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3003         twice_nat_addr = '10.0.1.3'
3004         local_port = 8080
3005         external_port = 80
3006         eh_port_out = 4567
3007         eh_port_in = 0
3008         server1 = self.pg0.remote_hosts[0]
3009         server2 = self.pg0.remote_hosts[1]
3010
3011         locals = [{'addr': server1.ip4n,
3012                    'port': local_port,
3013                    'probability': 50},
3014                   {'addr': server2.ip4n,
3015                    'port': local_port,
3016                    'probability': 50}]
3017
3018         self.nat44_add_address(self.nat_addr)
3019         self.nat44_add_address(twice_nat_addr, twice_nat=1)
3020
3021         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3022                                                   external_port,
3023                                                   IP_PROTOS.tcp,
3024                                                   twice_nat=1,
3025                                                   local_num=len(locals),
3026                                                   locals=locals)
3027         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3028         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3029                                                   is_inside=0)
3030
3031         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3032              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3033              TCP(sport=eh_port_out, dport=external_port))
3034         self.pg1.add_stream(p)
3035         self.pg_enable_capture(self.pg_interfaces)
3036         self.pg_start()
3037         capture = self.pg0.get_capture(1)
3038         p = capture[0]
3039         server = None
3040         try:
3041             ip = p[IP]
3042             tcp = p[TCP]
3043             self.assertEqual(ip.src, twice_nat_addr)
3044             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3045             if ip.dst == server1.ip4:
3046                 server = server1
3047             else:
3048                 server = server2
3049             self.assertNotEqual(tcp.sport, eh_port_out)
3050             eh_port_in = tcp.sport
3051             self.assertEqual(tcp.dport, local_port)
3052             self.check_tcp_checksum(p)
3053             self.check_ip_checksum(p)
3054         except:
3055             self.logger.error(ppp("Unexpected or invalid packet:", p))
3056             raise
3057
3058         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3059              IP(src=server.ip4, dst=twice_nat_addr) /
3060              TCP(sport=local_port, dport=eh_port_in))
3061         self.pg0.add_stream(p)
3062         self.pg_enable_capture(self.pg_interfaces)
3063         self.pg_start()
3064         capture = self.pg1.get_capture(1)
3065         p = capture[0]
3066         try:
3067             ip = p[IP]
3068             tcp = p[TCP]
3069             self.assertEqual(ip.src, self.nat_addr)
3070             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3071             self.assertEqual(tcp.sport, external_port)
3072             self.assertEqual(tcp.dport, eh_port_out)
3073             self.check_tcp_checksum(p)
3074             self.check_ip_checksum(p)
3075         except:
3076             self.logger.error(ppp("Unexpected or invalid packet:", p))
3077             raise
3078
3079     def test_twice_nat_interface_addr(self):
3080         """ Acquire twice NAT44 addresses from interface """
3081         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3082
3083         # no address in NAT pool
3084         adresses = self.vapi.nat44_address_dump()
3085         self.assertEqual(0, len(adresses))
3086
3087         # configure interface address and check NAT address pool
3088         self.pg7.config_ip4()
3089         adresses = self.vapi.nat44_address_dump()
3090         self.assertEqual(1, len(adresses))
3091         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3092         self.assertEqual(adresses[0].twice_nat, 1)
3093
3094         # remove interface address and check NAT address pool
3095         self.pg7.unconfig_ip4()
3096         adresses = self.vapi.nat44_address_dump()
3097         self.assertEqual(0, len(adresses))
3098
3099     def tearDown(self):
3100         super(TestNAT44, self).tearDown()
3101         if not self.vpp_dead:
3102             self.logger.info(self.vapi.cli("show nat44 verbose"))
3103             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3104             self.vapi.cli("nat addr-port-assignment-alg default")
3105             self.clear_nat44()
3106
3107
3108 class TestDeterministicNAT(MethodHolder):
3109     """ Deterministic NAT Test Cases """
3110
3111     @classmethod
3112     def setUpConstants(cls):
3113         super(TestDeterministicNAT, cls).setUpConstants()
3114         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3115
3116     @classmethod
3117     def setUpClass(cls):
3118         super(TestDeterministicNAT, cls).setUpClass()
3119
3120         try:
3121             cls.tcp_port_in = 6303
3122             cls.tcp_external_port = 6303
3123             cls.udp_port_in = 6304
3124             cls.udp_external_port = 6304
3125             cls.icmp_id_in = 6305
3126             cls.nat_addr = '10.0.0.3'
3127
3128             cls.create_pg_interfaces(range(3))
3129             cls.interfaces = list(cls.pg_interfaces)
3130
3131             for i in cls.interfaces:
3132                 i.admin_up()
3133                 i.config_ip4()
3134                 i.resolve_arp()
3135
3136             cls.pg0.generate_remote_hosts(2)
3137             cls.pg0.configure_ipv4_neighbors()
3138
3139         except Exception:
3140             super(TestDeterministicNAT, cls).tearDownClass()
3141             raise
3142
3143     def create_stream_in(self, in_if, out_if, ttl=64):
3144         """
3145         Create packet stream for inside network
3146
3147         :param in_if: Inside interface
3148         :param out_if: Outside interface
3149         :param ttl: TTL of generated packets
3150         """
3151         pkts = []
3152         # TCP
3153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3155              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3156         pkts.append(p)
3157
3158         # UDP
3159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3160              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3161              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3162         pkts.append(p)
3163
3164         # ICMP
3165         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3166              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3167              ICMP(id=self.icmp_id_in, type='echo-request'))
3168         pkts.append(p)
3169
3170         return pkts
3171
3172     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3173         """
3174         Create packet stream for outside network
3175
3176         :param out_if: Outside interface
3177         :param dst_ip: Destination IP address (Default use global NAT address)
3178         :param ttl: TTL of generated packets
3179         """
3180         if dst_ip is None:
3181             dst_ip = self.nat_addr
3182         pkts = []
3183         # TCP
3184         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3185              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3186              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3187         pkts.append(p)
3188
3189         # UDP
3190         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3191              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3192              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3193         pkts.append(p)
3194
3195         # ICMP
3196         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3197              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3198              ICMP(id=self.icmp_external_id, type='echo-reply'))
3199         pkts.append(p)
3200
3201         return pkts
3202
3203     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3204         """
3205         Verify captured packets on outside network
3206
3207         :param capture: Captured packets
3208         :param nat_ip: Translated IP address (Default use global NAT address)
3209         :param same_port: Sorce port number is not translated (Default False)
3210         :param packet_num: Expected number of packets (Default 3)
3211         """
3212         if nat_ip is None:
3213             nat_ip = self.nat_addr
3214         self.assertEqual(packet_num, len(capture))
3215         for packet in capture:
3216             try:
3217                 self.assertEqual(packet[IP].src, nat_ip)
3218                 if packet.haslayer(TCP):
3219                     self.tcp_port_out = packet[TCP].sport
3220                 elif packet.haslayer(UDP):
3221                     self.udp_port_out = packet[UDP].sport
3222                 else:
3223                     self.icmp_external_id = packet[ICMP].id
3224             except:
3225                 self.logger.error(ppp("Unexpected or invalid packet "
3226                                       "(outside network):", packet))
3227                 raise
3228
3229     def initiate_tcp_session(self, in_if, out_if):
3230         """
3231         Initiates TCP session
3232
3233         :param in_if: Inside interface
3234         :param out_if: Outside interface
3235         """
3236         try:
3237             # SYN packet in->out
3238             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3239                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3240                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3241                      flags="S"))
3242             in_if.add_stream(p)
3243             self.pg_enable_capture(self.pg_interfaces)
3244             self.pg_start()
3245             capture = out_if.get_capture(1)
3246             p = capture[0]
3247             self.tcp_port_out = p[TCP].sport
3248
3249             # SYN + ACK packet out->in
3250             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3251                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3252                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3253                      flags="SA"))
3254             out_if.add_stream(p)
3255             self.pg_enable_capture(self.pg_interfaces)
3256             self.pg_start()
3257             in_if.get_capture(1)
3258
3259             # ACK packet in->out
3260             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3261                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3262                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3263                      flags="A"))
3264             in_if.add_stream(p)
3265             self.pg_enable_capture(self.pg_interfaces)
3266             self.pg_start()
3267             out_if.get_capture(1)
3268
3269         except:
3270             self.logger.error("TCP 3 way handshake failed")
3271             raise
3272
3273     def verify_ipfix_max_entries_per_user(self, data):
3274         """
3275         Verify IPFIX maximum entries per user exceeded event
3276
3277         :param data: Decoded IPFIX data records
3278         """
3279         self.assertEqual(1, len(data))
3280         record = data[0]
3281         # natEvent
3282         self.assertEqual(ord(record[230]), 13)
3283         # natQuotaExceededEvent
3284         self.assertEqual('\x03\x00\x00\x00', record[466])
3285         # sourceIPv4Address
3286         self.assertEqual(self.pg0.remote_ip4n, record[8])
3287
3288     def test_deterministic_mode(self):
3289         """ NAT plugin run deterministic mode """
3290         in_addr = '172.16.255.0'
3291         out_addr = '172.17.255.50'
3292         in_addr_t = '172.16.255.20'
3293         in_addr_n = socket.inet_aton(in_addr)
3294         out_addr_n = socket.inet_aton(out_addr)
3295         in_addr_t_n = socket.inet_aton(in_addr_t)
3296         in_plen = 24
3297         out_plen = 32
3298
3299         nat_config = self.vapi.nat_show_config()
3300         self.assertEqual(1, nat_config.deterministic)
3301
3302         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3303
3304         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3305         self.assertEqual(rep1.out_addr[:4], out_addr_n)
3306         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3307         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3308
3309         deterministic_mappings = self.vapi.nat_det_map_dump()
3310         self.assertEqual(len(deterministic_mappings), 1)
3311         dsm = deterministic_mappings[0]
3312         self.assertEqual(in_addr_n, dsm.in_addr[:4])
3313         self.assertEqual(in_plen, dsm.in_plen)
3314         self.assertEqual(out_addr_n, dsm.out_addr[:4])
3315         self.assertEqual(out_plen, dsm.out_plen)
3316
3317         self.clear_nat_det()
3318         deterministic_mappings = self.vapi.nat_det_map_dump()
3319         self.assertEqual(len(deterministic_mappings), 0)
3320
3321     def test_set_timeouts(self):
3322         """ Set deterministic NAT timeouts """
3323         timeouts_before = self.vapi.nat_det_get_timeouts()
3324
3325         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3326                                        timeouts_before.tcp_established + 10,
3327                                        timeouts_before.tcp_transitory + 10,
3328                                        timeouts_before.icmp + 10)
3329
3330         timeouts_after = self.vapi.nat_det_get_timeouts()
3331
3332         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3333         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3334         self.assertNotEqual(timeouts_before.tcp_established,
3335                             timeouts_after.tcp_established)
3336         self.assertNotEqual(timeouts_before.tcp_transitory,
3337                             timeouts_after.tcp_transitory)
3338
3339     def test_det_in(self):
3340         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3341
3342         nat_ip = "10.0.0.10"
3343
3344         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3345                                       32,
3346                                       socket.inet_aton(nat_ip),
3347                                       32)
3348         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3349         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3350                                                   is_inside=0)
3351
3352         # in2out
3353         pkts = self.create_stream_in(self.pg0, self.pg1)
3354         self.pg0.add_stream(pkts)
3355         self.pg_enable_capture(self.pg_interfaces)
3356         self.pg_start()
3357         capture = self.pg1.get_capture(len(pkts))
3358         self.verify_capture_out(capture, nat_ip)
3359
3360         # out2in
3361         pkts = self.create_stream_out(self.pg1, nat_ip)
3362         self.pg1.add_stream(pkts)
3363         self.pg_enable_capture(self.pg_interfaces)
3364         self.pg_start()
3365         capture = self.pg0.get_capture(len(pkts))
3366         self.verify_capture_in(capture, self.pg0)
3367
3368         # session dump test
3369         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3370         self.assertEqual(len(sessions), 3)
3371
3372         # TCP session
3373         s = sessions[0]
3374         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3375         self.assertEqual(s.in_port, self.tcp_port_in)
3376         self.assertEqual(s.out_port, self.tcp_port_out)
3377         self.assertEqual(s.ext_port, self.tcp_external_port)
3378
3379         # UDP session
3380         s = sessions[1]
3381         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3382         self.assertEqual(s.in_port, self.udp_port_in)
3383         self.assertEqual(s.out_port, self.udp_port_out)
3384         self.assertEqual(s.ext_port, self.udp_external_port)
3385
3386         # ICMP session
3387         s = sessions[2]
3388         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3389         self.assertEqual(s.in_port, self.icmp_id_in)
3390         self.assertEqual(s.out_port, self.icmp_external_id)
3391
3392     def test_multiple_users(self):
3393         """ Deterministic NAT multiple users """
3394
3395         nat_ip = "10.0.0.10"
3396         port_in = 80
3397         external_port = 6303
3398
3399         host0 = self.pg0.remote_hosts[0]
3400         host1 = self.pg0.remote_hosts[1]
3401
3402         self.vapi.nat_det_add_del_map(host0.ip4n,
3403                                       24,
3404                                       socket.inet_aton(nat_ip),
3405                                       32)
3406         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3407         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3408                                                   is_inside=0)
3409
3410         # host0 to out
3411         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3412              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3413              TCP(sport=port_in, dport=external_port))
3414         self.pg0.add_stream(p)
3415         self.pg_enable_capture(self.pg_interfaces)
3416         self.pg_start()
3417         capture = self.pg1.get_capture(1)
3418         p = capture[0]
3419         try:
3420             ip = p[IP]
3421             tcp = p[TCP]
3422             self.assertEqual(ip.src, nat_ip)
3423             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3424             self.assertEqual(tcp.dport, external_port)
3425             port_out0 = tcp.sport
3426         except:
3427             self.logger.error(ppp("Unexpected or invalid packet:", p))
3428             raise
3429
3430         # host1 to out
3431         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
3432              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
3433              TCP(sport=port_in, dport=external_port))
3434         self.pg0.add_stream(p)
3435         self.pg_enable_capture(self.pg_interfaces)
3436         self.pg_start()
3437         capture = self.pg1.get_capture(1)
3438         p = capture[0]
3439         try:
3440             ip = p[IP]
3441             tcp = p[TCP]
3442             self.assertEqual(ip.src, nat_ip)
3443             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3444             self.assertEqual(tcp.dport, external_port)
3445             port_out1 = tcp.sport
3446         except:
3447             self.logger.error(ppp("Unexpected or invalid packet:", p))
3448             raise
3449
3450         dms = self.vapi.nat_det_map_dump()
3451         self.assertEqual(1, len(dms))
3452         self.assertEqual(2, dms[0].ses_num)
3453
3454         # out to host0
3455         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3456              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3457              TCP(sport=external_port, dport=port_out0))
3458         self.pg1.add_stream(p)
3459         self.pg_enable_capture(self.pg_interfaces)
3460         self.pg_start()
3461         capture = self.pg0.get_capture(1)
3462         p = capture[0]
3463         try:
3464             ip = p[IP]
3465             tcp = p[TCP]
3466             self.assertEqual(ip.src, self.pg1.remote_ip4)
3467             self.assertEqual(ip.dst, host0.ip4)
3468             self.assertEqual(tcp.dport, port_in)
3469             self.assertEqual(tcp.sport, external_port)
3470         except:
3471             self.logger.error(ppp("Unexpected or invalid packet:", p))
3472             raise
3473
3474         # out to host1
3475         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3476              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3477              TCP(sport=external_port, dport=port_out1))
3478         self.pg1.add_stream(p)
3479         self.pg_enable_capture(self.pg_interfaces)
3480         self.pg_start()
3481         capture = self.pg0.get_capture(1)
3482         p = capture[0]
3483         try:
3484             ip = p[IP]
3485             tcp = p[TCP]
3486             self.assertEqual(ip.src, self.pg1.remote_ip4)
3487             self.assertEqual(ip.dst, host1.ip4)
3488             self.assertEqual(tcp.dport, port_in)
3489             self.assertEqual(tcp.sport, external_port)
3490         except:
3491             self.logger.error(ppp("Unexpected or invalid packet", p))
3492             raise
3493
3494         # session close api test
3495         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
3496                                             port_out1,
3497                                             self.pg1.remote_ip4n,
3498                                             external_port)
3499         dms = self.vapi.nat_det_map_dump()
3500         self.assertEqual(dms[0].ses_num, 1)
3501
3502         self.vapi.nat_det_close_session_in(host0.ip4n,
3503                                            port_in,
3504                                            self.pg1.remote_ip4n,
3505                                            external_port)
3506         dms = self.vapi.nat_det_map_dump()
3507         self.assertEqual(dms[0].ses_num, 0)
3508
3509     def test_tcp_session_close_detection_in(self):
3510         """ Deterministic NAT TCP session close from inside network """
3511         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3512                                       32,
3513                                       socket.inet_aton(self.nat_addr),
3514                                       32)
3515         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3516         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3517                                                   is_inside=0)
3518
3519         self.initiate_tcp_session(self.pg0, self.pg1)
3520
3521         # close the session from inside
3522         try:
3523             # FIN packet in -> out
3524             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3525                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3526                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3527                      flags="F"))
3528             self.pg0.add_stream(p)
3529             self.pg_enable_capture(self.pg_interfaces)
3530             self.pg_start()
3531             self.pg1.get_capture(1)
3532
3533             pkts = []
3534
3535             # ACK packet out -> in
3536             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3537                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3538                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3539                      flags="A"))
3540             pkts.append(p)
3541
3542             # FIN packet out -> in
3543             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3544                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3545                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3546                      flags="F"))
3547             pkts.append(p)
3548
3549             self.pg1.add_stream(pkts)
3550             self.pg_enable_capture(self.pg_interfaces)
3551             self.pg_start()
3552             self.pg0.get_capture(2)
3553
3554             # ACK packet in -> out
3555             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3556                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3557                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3558                      flags="A"))
3559             self.pg0.add_stream(p)
3560             self.pg_enable_capture(self.pg_interfaces)
3561             self.pg_start()
3562             self.pg1.get_capture(1)
3563
3564             # Check if deterministic NAT44 closed the session
3565             dms = self.vapi.nat_det_map_dump()
3566             self.assertEqual(0, dms[0].ses_num)
3567         except:
3568             self.logger.error("TCP session termination failed")
3569             raise
3570
3571     def test_tcp_session_close_detection_out(self):
3572         """ Deterministic NAT TCP session close from outside network """
3573         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3574                                       32,
3575                                       socket.inet_aton(self.nat_addr),
3576                                       32)
3577         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3578         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3579                                                   is_inside=0)
3580
3581         self.initiate_tcp_session(self.pg0, self.pg1)
3582
3583         # close the session from outside
3584         try:
3585             # FIN packet out -> in
3586             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3587                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3588                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3589                      flags="F"))
3590             self.pg1.add_stream(p)
3591             self.pg_enable_capture(self.pg_interfaces)
3592             self.pg_start()
3593             self.pg0.get_capture(1)
3594
3595             pkts = []
3596
3597             # ACK packet in -> out
3598             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3599                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3600                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3601                      flags="A"))
3602             pkts.append(p)
3603
3604             # ACK packet in -> out
3605             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3606                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3607                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3608                      flags="F"))
3609             pkts.append(p)
3610
3611             self.pg0.add_stream(pkts)
3612             self.pg_enable_capture(self.pg_interfaces)
3613             self.pg_start()
3614             self.pg1.get_capture(2)
3615
3616             # ACK packet out -> in
3617             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3618                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3619                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3620                      flags="A"))
3621             self.pg1.add_stream(p)
3622             self.pg_enable_capture(self.pg_interfaces)
3623             self.pg_start()
3624             self.pg0.get_capture(1)
3625
3626             # Check if deterministic NAT44 closed the session
3627             dms = self.vapi.nat_det_map_dump()
3628             self.assertEqual(0, dms[0].ses_num)
3629         except:
3630             self.logger.error("TCP session termination failed")
3631             raise
3632
3633     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3634     def test_session_timeout(self):
3635         """ Deterministic NAT session timeouts """
3636         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3637                                       32,
3638                                       socket.inet_aton(self.nat_addr),
3639                                       32)
3640         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3641         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3642                                                   is_inside=0)
3643
3644         self.initiate_tcp_session(self.pg0, self.pg1)
3645         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3646         pkts = self.create_stream_in(self.pg0, self.pg1)
3647         self.pg0.add_stream(pkts)
3648         self.pg_enable_capture(self.pg_interfaces)
3649         self.pg_start()
3650         capture = self.pg1.get_capture(len(pkts))
3651         sleep(15)
3652
3653         dms = self.vapi.nat_det_map_dump()
3654         self.assertEqual(0, dms[0].ses_num)
3655
3656     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3657     def test_session_limit_per_user(self):
3658         """ Deterministic NAT maximum sessions per user limit """
3659         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3660                                       32,
3661                                       socket.inet_aton(self.nat_addr),
3662                                       32)
3663         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3664         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3665                                                   is_inside=0)
3666         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3667                                      src_address=self.pg2.local_ip4n,
3668                                      path_mtu=512,
3669                                      template_interval=10)
3670         self.vapi.nat_ipfix()
3671
3672         pkts = []
3673         for port in range(1025, 2025):
3674             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3675                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3676                  UDP(sport=port, dport=port))
3677             pkts.append(p)
3678
3679         self.pg0.add_stream(pkts)
3680         self.pg_enable_capture(self.pg_interfaces)
3681         self.pg_start()
3682         capture = self.pg1.get_capture(len(pkts))
3683
3684         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3685              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3686              UDP(sport=3001, dport=3002))
3687         self.pg0.add_stream(p)
3688         self.pg_enable_capture(self.pg_interfaces)
3689         self.pg_start()
3690         capture = self.pg1.assert_nothing_captured()
3691
3692         # verify ICMP error packet
3693         capture = self.pg0.get_capture(1)
3694         p = capture[0]
3695         self.assertTrue(p.haslayer(ICMP))
3696         icmp = p[ICMP]
3697         self.assertEqual(icmp.type, 3)
3698         self.assertEqual(icmp.code, 1)
3699         self.assertTrue(icmp.haslayer(IPerror))
3700         inner_ip = icmp[IPerror]
3701         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3702         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3703
3704         dms = self.vapi.nat_det_map_dump()
3705
3706         self.assertEqual(1000, dms[0].ses_num)
3707
3708         # verify IPFIX logging
3709         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3710         sleep(1)
3711         capture = self.pg2.get_capture(2)
3712         ipfix = IPFIXDecoder()
3713         # first load template
3714         for p in capture:
3715             self.assertTrue(p.haslayer(IPFIX))
3716             if p.haslayer(Template):
3717                 ipfix.add_template(p.getlayer(Template))
3718         # verify events in data set
3719         for p in capture:
3720             if p.haslayer(Data):
3721                 data = ipfix.decode_data_set(p.getlayer(Set))
3722                 self.verify_ipfix_max_entries_per_user(data)
3723
3724     def clear_nat_det(self):
3725         """
3726         Clear deterministic NAT configuration.
3727         """
3728         self.vapi.nat_ipfix(enable=0)
3729         self.vapi.nat_det_set_timeouts()
3730         deterministic_mappings = self.vapi.nat_det_map_dump()
3731         for dsm in deterministic_mappings:
3732             self.vapi.nat_det_add_del_map(dsm.in_addr,
3733                                           dsm.in_plen,
3734                                           dsm.out_addr,
3735                                           dsm.out_plen,
3736                                           is_add=0)
3737
3738         interfaces = self.vapi.nat44_interface_dump()
3739         for intf in interfaces:
3740             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3741                                                       intf.is_inside,
3742                                                       is_add=0)
3743
3744     def tearDown(self):
3745         super(TestDeterministicNAT, self).tearDown()
3746         if not self.vpp_dead:
3747             self.logger.info(self.vapi.cli("show nat44 detail"))
3748             self.clear_nat_det()
3749
3750
3751 class TestNAT64(MethodHolder):
3752     """ NAT64 Test Cases """
3753
3754     @classmethod
3755     def setUpClass(cls):
3756         super(TestNAT64, cls).setUpClass()
3757
3758         try:
3759             cls.tcp_port_in = 6303
3760             cls.tcp_port_out = 6303
3761             cls.udp_port_in = 6304
3762             cls.udp_port_out = 6304
3763             cls.icmp_id_in = 6305
3764             cls.icmp_id_out = 6305
3765             cls.nat_addr = '10.0.0.3'
3766             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3767             cls.vrf1_id = 10
3768             cls.vrf1_nat_addr = '10.0.10.3'
3769             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3770                                                    cls.vrf1_nat_addr)
3771
3772             cls.create_pg_interfaces(range(5))
3773             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3774             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3775             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3776
3777             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3778
3779             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3780
3781             cls.pg0.generate_remote_hosts(2)
3782
3783             for i in cls.ip6_interfaces:
3784                 i.admin_up()
3785                 i.config_ip6()
3786                 i.configure_ipv6_neighbors()
3787
3788             for i in cls.ip4_interfaces:
3789                 i.admin_up()
3790                 i.config_ip4()
3791                 i.resolve_arp()
3792
3793             cls.pg3.admin_up()
3794             cls.pg3.config_ip4()
3795             cls.pg3.resolve_arp()
3796             cls.pg3.config_ip6()
3797             cls.pg3.configure_ipv6_neighbors()
3798
3799         except Exception:
3800             super(TestNAT64, cls).tearDownClass()
3801             raise
3802
3803     def test_pool(self):
3804         """ Add/delete address to NAT64 pool """
3805         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3806
3807         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3808
3809         addresses = self.vapi.nat64_pool_addr_dump()
3810         self.assertEqual(len(addresses), 1)
3811         self.assertEqual(addresses[0].address, nat_addr)
3812
3813         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3814
3815         addresses = self.vapi.nat64_pool_addr_dump()
3816         self.assertEqual(len(addresses), 0)
3817
3818     def test_interface(self):
3819         """ Enable/disable NAT64 feature on the interface """
3820         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3821         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3822
3823         interfaces = self.vapi.nat64_interface_dump()
3824         self.assertEqual(len(interfaces), 2)
3825         pg0_found = False
3826         pg1_found = False
3827         for intf in interfaces:
3828             if intf.sw_if_index == self.pg0.sw_if_index:
3829                 self.assertEqual(intf.is_inside, 1)
3830                 pg0_found = True
3831             elif intf.sw_if_index == self.pg1.sw_if_index:
3832                 self.assertEqual(intf.is_inside, 0)
3833                 pg1_found = True
3834         self.assertTrue(pg0_found)
3835         self.assertTrue(pg1_found)
3836
3837         features = self.vapi.cli("show interface features pg0")
3838         self.assertNotEqual(features.find('nat64-in2out'), -1)
3839         features = self.vapi.cli("show interface features pg1")
3840         self.assertNotEqual(features.find('nat64-out2in'), -1)
3841
3842         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3843         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3844
3845         interfaces = self.vapi.nat64_interface_dump()
3846         self.assertEqual(len(interfaces), 0)
3847
3848     def test_static_bib(self):
3849         """ Add/delete static BIB entry """
3850         in_addr = socket.inet_pton(socket.AF_INET6,
3851                                    '2001:db8:85a3::8a2e:370:7334')
3852         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3853         in_port = 1234
3854         out_port = 5678
3855         proto = IP_PROTOS.tcp
3856
3857         self.vapi.nat64_add_del_static_bib(in_addr,
3858                                            out_addr,
3859                                            in_port,
3860                                            out_port,
3861                                            proto)
3862         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3863         static_bib_num = 0
3864         for bibe in bib:
3865             if bibe.is_static:
3866                 static_bib_num += 1
3867                 self.assertEqual(bibe.i_addr, in_addr)
3868                 self.assertEqual(bibe.o_addr, out_addr)
3869                 self.assertEqual(bibe.i_port, in_port)
3870                 self.assertEqual(bibe.o_port, out_port)
3871         self.assertEqual(static_bib_num, 1)
3872
3873         self.vapi.nat64_add_del_static_bib(in_addr,
3874                                            out_addr,
3875                                            in_port,
3876                                            out_port,
3877                                            proto,
3878                                            is_add=0)
3879         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3880         static_bib_num = 0
3881         for bibe in bib:
3882             if bibe.is_static:
3883                 static_bib_num += 1
3884         self.assertEqual(static_bib_num, 0)
3885
3886     def test_set_timeouts(self):
3887         """ Set NAT64 timeouts """
3888         # verify default values
3889         timeouts = self.vapi.nat64_get_timeouts()
3890         self.assertEqual(timeouts.udp, 300)
3891         self.assertEqual(timeouts.icmp, 60)
3892         self.assertEqual(timeouts.tcp_trans, 240)
3893         self.assertEqual(timeouts.tcp_est, 7440)
3894         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3895
3896         # set and verify custom values
3897         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3898                                      tcp_est=7450, tcp_incoming_syn=10)
3899         timeouts = self.vapi.nat64_get_timeouts()
3900         self.assertEqual(timeouts.udp, 200)
3901         self.assertEqual(timeouts.icmp, 30)
3902         self.assertEqual(timeouts.tcp_trans, 250)
3903         self.assertEqual(timeouts.tcp_est, 7450)
3904         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3905
3906     def test_dynamic(self):
3907         """ NAT64 dynamic translation test """
3908         self.tcp_port_in = 6303
3909         self.udp_port_in = 6304
3910         self.icmp_id_in = 6305
3911
3912         ses_num_start = self.nat64_get_ses_num()
3913
3914         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3915                                                 self.nat_addr_n)
3916         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3917         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3918
3919         # in2out
3920         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3921         self.pg0.add_stream(pkts)
3922         self.pg_enable_capture(self.pg_interfaces)
3923         self.pg_start()
3924         capture = self.pg1.get_capture(len(pkts))
3925         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3926                                 dst_ip=self.pg1.remote_ip4)
3927
3928         # out2in
3929         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3930         self.pg1.add_stream(pkts)
3931         self.pg_enable_capture(self.pg_interfaces)
3932         self.pg_start()
3933         capture = self.pg0.get_capture(len(pkts))
3934         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3935         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3936
3937         # in2out
3938         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3939         self.pg0.add_stream(pkts)
3940         self.pg_enable_capture(self.pg_interfaces)
3941         self.pg_start()
3942         capture = self.pg1.get_capture(len(pkts))
3943         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3944                                 dst_ip=self.pg1.remote_ip4)
3945
3946         # out2in
3947         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3948         self.pg1.add_stream(pkts)
3949         self.pg_enable_capture(self.pg_interfaces)
3950         self.pg_start()
3951         capture = self.pg0.get_capture(len(pkts))
3952         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3953
3954         ses_num_end = self.nat64_get_ses_num()
3955
3956         self.assertEqual(ses_num_end - ses_num_start, 3)
3957
3958         # tenant with specific VRF
3959         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3960                                                 self.vrf1_nat_addr_n,
3961                                                 vrf_id=self.vrf1_id)
3962         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3963
3964         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3965         self.pg2.add_stream(pkts)
3966         self.pg_enable_capture(self.pg_interfaces)
3967         self.pg_start()
3968         capture = self.pg1.get_capture(len(pkts))
3969         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3970                                 dst_ip=self.pg1.remote_ip4)
3971
3972         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3973         self.pg1.add_stream(pkts)
3974         self.pg_enable_capture(self.pg_interfaces)
3975         self.pg_start()
3976         capture = self.pg2.get_capture(len(pkts))
3977         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3978
3979     def test_static(self):
3980         """ NAT64 static translation test """
3981         self.tcp_port_in = 60303
3982         self.udp_port_in = 60304
3983         self.icmp_id_in = 60305
3984         self.tcp_port_out = 60303
3985         self.udp_port_out = 60304
3986         self.icmp_id_out = 60305
3987
3988         ses_num_start = self.nat64_get_ses_num()
3989
3990         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3991                                                 self.nat_addr_n)
3992         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3993         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3994
3995         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3996                                            self.nat_addr_n,
3997                                            self.tcp_port_in,
3998                                            self.tcp_port_out,
3999                                            IP_PROTOS.tcp)
4000         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4001                                            self.nat_addr_n,
4002                                            self.udp_port_in,
4003                                            self.udp_port_out,
4004                                            IP_PROTOS.udp)
4005         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4006                                            self.nat_addr_n,
4007                                            self.icmp_id_in,
4008                                            self.icmp_id_out,
4009                                            IP_PROTOS.icmp)
4010
4011         # in2out
4012         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4013         self.pg0.add_stream(pkts)
4014         self.pg_enable_capture(self.pg_interfaces)
4015         self.pg_start()
4016         capture = self.pg1.get_capture(len(pkts))
4017         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4018                                 dst_ip=self.pg1.remote_ip4, same_port=True)
4019
4020         # out2in
4021         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4022         self.pg1.add_stream(pkts)
4023         self.pg_enable_capture(self.pg_interfaces)
4024         self.pg_start()
4025         capture = self.pg0.get_capture(len(pkts))
4026         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4027         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4028
4029         ses_num_end = self.nat64_get_ses_num()
4030
4031         self.assertEqual(ses_num_end - ses_num_start, 3)
4032
4033     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4034     def test_session_timeout(self):
4035         """ NAT64 session timeout """
4036         self.icmp_id_in = 1234
4037         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4038                                                 self.nat_addr_n)
4039         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4040         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4041         self.vapi.nat64_set_timeouts(icmp=5)
4042
4043         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4044         self.pg0.add_stream(pkts)
4045         self.pg_enable_capture(self.pg_interfaces)
4046         self.pg_start()
4047         capture = self.pg1.get_capture(len(pkts))
4048
4049         ses_num_before_timeout = self.nat64_get_ses_num()
4050
4051         sleep(15)
4052
4053         # ICMP session after timeout
4054         ses_num_after_timeout = self.nat64_get_ses_num()
4055         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4056
4057     def test_icmp_error(self):
4058         """ NAT64 ICMP Error message translation """
4059         self.tcp_port_in = 6303
4060         self.udp_port_in = 6304
4061         self.icmp_id_in = 6305
4062
4063         ses_num_start = self.nat64_get_ses_num()
4064
4065         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4066                                                 self.nat_addr_n)
4067         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4068         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4069
4070         # send some packets to create sessions
4071         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4072         self.pg0.add_stream(pkts)
4073         self.pg_enable_capture(self.pg_interfaces)
4074         self.pg_start()
4075         capture_ip4 = self.pg1.get_capture(len(pkts))
4076         self.verify_capture_out(capture_ip4,
4077                                 nat_ip=self.nat_addr,
4078                                 dst_ip=self.pg1.remote_ip4)
4079
4080         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4081         self.pg1.add_stream(pkts)
4082         self.pg_enable_capture(self.pg_interfaces)
4083         self.pg_start()
4084         capture_ip6 = self.pg0.get_capture(len(pkts))
4085         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4086         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4087                                    self.pg0.remote_ip6)
4088
4089         # in2out
4090         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4091                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4092                 ICMPv6DestUnreach(code=1) /
4093                 packet[IPv6] for packet in capture_ip6]
4094         self.pg0.add_stream(pkts)
4095         self.pg_enable_capture(self.pg_interfaces)
4096         self.pg_start()
4097         capture = self.pg1.get_capture(len(pkts))
4098         for packet in capture:
4099             try:
4100                 self.assertEqual(packet[IP].src, self.nat_addr)
4101                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4102                 self.assertEqual(packet[ICMP].type, 3)
4103                 self.assertEqual(packet[ICMP].code, 13)
4104                 inner = packet[IPerror]
4105                 self.assertEqual(inner.src, self.pg1.remote_ip4)
4106                 self.assertEqual(inner.dst, self.nat_addr)
4107                 self.check_icmp_checksum(packet)
4108                 if inner.haslayer(TCPerror):
4109                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4110                 elif inner.haslayer(UDPerror):
4111                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4112                 else:
4113                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4114             except:
4115                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4116                 raise
4117
4118         # out2in
4119         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4120                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4121                 ICMP(type=3, code=13) /
4122                 packet[IP] for packet in capture_ip4]
4123         self.pg1.add_stream(pkts)
4124         self.pg_enable_capture(self.pg_interfaces)
4125         self.pg_start()
4126         capture = self.pg0.get_capture(len(pkts))
4127         for packet in capture:
4128             try:
4129                 self.assertEqual(packet[IPv6].src, ip.src)
4130                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4131                 icmp = packet[ICMPv6DestUnreach]
4132                 self.assertEqual(icmp.code, 1)
4133                 inner = icmp[IPerror6]
4134                 self.assertEqual(inner.src, self.pg0.remote_ip6)
4135                 self.assertEqual(inner.dst, ip.src)
4136                 self.check_icmpv6_checksum(packet)
4137                 if inner.haslayer(TCPerror):
4138                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4139                 elif inner.haslayer(UDPerror):
4140                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4141                 else:
4142                     self.assertEqual(inner[ICMPv6EchoRequest].id,
4143                                      self.icmp_id_in)
4144             except:
4145                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4146                 raise
4147
4148     def test_hairpinning(self):
4149         """ NAT64 hairpinning """
4150
4151         client = self.pg0.remote_hosts[0]
4152         server = self.pg0.remote_hosts[1]
4153         server_tcp_in_port = 22
4154         server_tcp_out_port = 4022
4155         server_udp_in_port = 23
4156         server_udp_out_port = 4023
4157         client_tcp_in_port = 1234
4158         client_udp_in_port = 1235
4159         client_tcp_out_port = 0
4160         client_udp_out_port = 0
4161         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4162         nat_addr_ip6 = ip.src
4163
4164         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4165                                                 self.nat_addr_n)
4166         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4167         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4168
4169         self.vapi.nat64_add_del_static_bib(server.ip6n,
4170                                            self.nat_addr_n,
4171                                            server_tcp_in_port,
4172                                            server_tcp_out_port,
4173                                            IP_PROTOS.tcp)
4174         self.vapi.nat64_add_del_static_bib(server.ip6n,
4175                                            self.nat_addr_n,
4176                                            server_udp_in_port,
4177                                            server_udp_out_port,
4178                                            IP_PROTOS.udp)
4179
4180         # client to server
4181         pkts = []
4182         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4183              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4184              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4185         pkts.append(p)
4186         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4187              IPv6(src=client.ip6, dst=nat_addr_ip6) /
4188              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4189         pkts.append(p)
4190         self.pg0.add_stream(pkts)
4191         self.pg_enable_capture(self.pg_interfaces)
4192         self.pg_start()
4193         capture = self.pg0.get_capture(len(pkts))
4194         for packet in capture:
4195             try:
4196                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4197                 self.assertEqual(packet[IPv6].dst, server.ip6)
4198                 if packet.haslayer(TCP):
4199                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4200                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4201                     self.check_tcp_checksum(packet)
4202                     client_tcp_out_port = packet[TCP].sport
4203                 else:
4204                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4205                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
4206                     self.check_udp_checksum(packet)
4207                     client_udp_out_port = packet[UDP].sport
4208             except:
4209                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4210                 raise
4211
4212         # server to client
4213         pkts = []
4214         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4215              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4216              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4217         pkts.append(p)
4218         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4219              IPv6(src=server.ip6, dst=nat_addr_ip6) /
4220              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4221         pkts.append(p)
4222         self.pg0.add_stream(pkts)
4223         self.pg_enable_capture(self.pg_interfaces)
4224         self.pg_start()
4225         capture = self.pg0.get_capture(len(pkts))
4226         for packet in capture:
4227             try:
4228                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4229                 self.assertEqual(packet[IPv6].dst, client.ip6)
4230                 if packet.haslayer(TCP):
4231                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4232                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4233                     self.check_tcp_checksum(packet)
4234                 else:
4235                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
4236                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
4237                     self.check_udp_checksum(packet)
4238             except:
4239                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4240                 raise
4241
4242         # ICMP error
4243         pkts = []
4244         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4245                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4246                 ICMPv6DestUnreach(code=1) /
4247                 packet[IPv6] for packet in capture]
4248         self.pg0.add_stream(pkts)
4249         self.pg_enable_capture(self.pg_interfaces)
4250         self.pg_start()
4251         capture = self.pg0.get_capture(len(pkts))
4252         for packet in capture:
4253             try:
4254                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4255                 self.assertEqual(packet[IPv6].dst, server.ip6)
4256                 icmp = packet[ICMPv6DestUnreach]
4257                 self.assertEqual(icmp.code, 1)
4258                 inner = icmp[IPerror6]
4259                 self.assertEqual(inner.src, server.ip6)
4260                 self.assertEqual(inner.dst, nat_addr_ip6)
4261                 self.check_icmpv6_checksum(packet)
4262                 if inner.haslayer(TCPerror):
4263                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4264                     self.assertEqual(inner[TCPerror].dport,
4265                                      client_tcp_out_port)
4266                 else:
4267                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4268                     self.assertEqual(inner[UDPerror].dport,
4269                                      client_udp_out_port)
4270             except:
4271                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4272                 raise
4273
4274     def test_prefix(self):
4275         """ NAT64 Network-Specific Prefix """
4276
4277         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4278                                                 self.nat_addr_n)
4279         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4280         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4281         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4282                                                 self.vrf1_nat_addr_n,
4283                                                 vrf_id=self.vrf1_id)
4284         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4285
4286         # Add global prefix
4287         global_pref64 = "2001:db8::"
4288         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4289         global_pref64_len = 32
4290         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4291
4292         prefix = self.vapi.nat64_prefix_dump()
4293         self.assertEqual(len(prefix), 1)
4294         self.assertEqual(prefix[0].prefix, global_pref64_n)
4295         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4296         self.assertEqual(prefix[0].vrf_id, 0)
4297
4298         # Add tenant specific prefix
4299         vrf1_pref64 = "2001:db8:122:300::"
4300         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4301         vrf1_pref64_len = 56
4302         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4303                                        vrf1_pref64_len,
4304                                        vrf_id=self.vrf1_id)
4305         prefix = self.vapi.nat64_prefix_dump()
4306         self.assertEqual(len(prefix), 2)
4307
4308         # Global prefix
4309         pkts = self.create_stream_in_ip6(self.pg0,
4310                                          self.pg1,
4311                                          pref=global_pref64,
4312                                          plen=global_pref64_len)
4313         self.pg0.add_stream(pkts)
4314         self.pg_enable_capture(self.pg_interfaces)
4315         self.pg_start()
4316         capture = self.pg1.get_capture(len(pkts))
4317         self.verify_capture_out(capture, nat_ip=self.nat_addr,
4318                                 dst_ip=self.pg1.remote_ip4)
4319
4320         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4321         self.pg1.add_stream(pkts)
4322         self.pg_enable_capture(self.pg_interfaces)
4323         self.pg_start()
4324         capture = self.pg0.get_capture(len(pkts))
4325         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4326                                   global_pref64,
4327                                   global_pref64_len)
4328         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4329
4330         # Tenant specific prefix
4331         pkts = self.create_stream_in_ip6(self.pg2,
4332                                          self.pg1,
4333                                          pref=vrf1_pref64,
4334                                          plen=vrf1_pref64_len)
4335         self.pg2.add_stream(pkts)
4336         self.pg_enable_capture(self.pg_interfaces)
4337         self.pg_start()
4338         capture = self.pg1.get_capture(len(pkts))
4339         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4340                                 dst_ip=self.pg1.remote_ip4)
4341
4342         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4343         self.pg1.add_stream(pkts)
4344         self.pg_enable_capture(self.pg_interfaces)
4345         self.pg_start()
4346         capture = self.pg2.get_capture(len(pkts))
4347         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4348                                   vrf1_pref64,
4349                                   vrf1_pref64_len)
4350         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4351
4352     def test_unknown_proto(self):
4353         """ NAT64 translate packet with unknown protocol """
4354
4355         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4356                                                 self.nat_addr_n)
4357         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4358         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4359         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4360
4361         # in2out
4362         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4363              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4364              TCP(sport=self.tcp_port_in, dport=20))
4365         self.pg0.add_stream(p)
4366         self.pg_enable_capture(self.pg_interfaces)
4367         self.pg_start()
4368         p = self.pg1.get_capture(1)
4369
4370         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4371              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4372              GRE() /
4373              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4374              TCP(sport=1234, dport=1234))
4375         self.pg0.add_stream(p)
4376         self.pg_enable_capture(self.pg_interfaces)
4377         self.pg_start()
4378         p = self.pg1.get_capture(1)
4379         packet = p[0]
4380         try:
4381             self.assertEqual(packet[IP].src, self.nat_addr)
4382             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4383             self.assertTrue(packet.haslayer(GRE))
4384             self.check_ip_checksum(packet)
4385         except:
4386             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4387             raise
4388
4389         # out2in
4390         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4391              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4392              GRE() /
4393              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4394              TCP(sport=1234, dport=1234))
4395         self.pg1.add_stream(p)
4396         self.pg_enable_capture(self.pg_interfaces)
4397         self.pg_start()
4398         p = self.pg0.get_capture(1)
4399         packet = p[0]
4400         try:
4401             self.assertEqual(packet[IPv6].src, remote_ip6)
4402             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4403             self.assertEqual(packet[IPv6].nh, 47)
4404         except:
4405             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4406             raise
4407
4408     def test_hairpinning_unknown_proto(self):
4409         """ NAT64 translate packet with unknown protocol - hairpinning """
4410
4411         client = self.pg0.remote_hosts[0]
4412         server = self.pg0.remote_hosts[1]
4413         server_tcp_in_port = 22
4414         server_tcp_out_port = 4022
4415         client_tcp_in_port = 1234
4416         client_tcp_out_port = 1235
4417         server_nat_ip = "10.0.0.100"
4418         client_nat_ip = "10.0.0.110"
4419         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
4420         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
4421         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
4422         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
4423
4424         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
4425                                                 client_nat_ip_n)
4426         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4427         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4428
4429         self.vapi.nat64_add_del_static_bib(server.ip6n,
4430                                            server_nat_ip_n,
4431                                            server_tcp_in_port,
4432                                            server_tcp_out_port,
4433                                            IP_PROTOS.tcp)
4434
4435         self.vapi.nat64_add_del_static_bib(server.ip6n,
4436                                            server_nat_ip_n,
4437                                            0,
4438                                            0,
4439                                            IP_PROTOS.gre)
4440
4441         self.vapi.nat64_add_del_static_bib(client.ip6n,
4442                                            client_nat_ip_n,
4443                                            client_tcp_in_port,
4444                                            client_tcp_out_port,
4445                                            IP_PROTOS.tcp)
4446
4447         # client to server
4448         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4449              IPv6(src=client.ip6, dst=server_nat_ip6) /
4450              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4451         self.pg0.add_stream(p)
4452         self.pg_enable_capture(self.pg_interfaces)
4453         self.pg_start()
4454         p = self.pg0.get_capture(1)
4455
4456         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4457              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
4458              GRE() /
4459              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4460              TCP(sport=1234, dport=1234))
4461         self.pg0.add_stream(p)
4462         self.pg_enable_capture(self.pg_interfaces)
4463         self.pg_start()
4464         p = self.pg0.get_capture(1)
4465         packet = p[0]
4466         try:
4467             self.assertEqual(packet[IPv6].src, client_nat_ip6)
4468             self.assertEqual(packet[IPv6].dst, server.ip6)
4469             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4470         except:
4471             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4472             raise
4473
4474         # server to client
4475         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4476              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
4477              GRE() /
4478              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4479              TCP(sport=1234, dport=1234))
4480         self.pg0.add_stream(p)
4481         self.pg_enable_capture(self.pg_interfaces)
4482         self.pg_start()
4483         p = self.pg0.get_capture(1)
4484         packet = p[0]
4485         try:
4486             self.assertEqual(packet[IPv6].src, server_nat_ip6)
4487             self.assertEqual(packet[IPv6].dst, client.ip6)
4488             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
4489         except:
4490             self.logger.error(ppp("Unexpected or invalid packet:", packet))
4491             raise
4492
4493     def test_one_armed_nat64(self):
4494         """ One armed NAT64 """
4495         external_port = 0
4496         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
4497                                            '64:ff9b::',
4498                                            96)
4499
4500         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4501                                                 self.nat_addr_n)
4502         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
4503         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
4504
4505         # in2out
4506         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4507              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
4508              TCP(sport=12345, dport=80))
4509         self.pg3.add_stream(p)
4510         self.pg_enable_capture(self.pg_interfaces)
4511         self.pg_start()
4512         capture = self.pg3.get_capture(1)
4513         p = capture[0]
4514         try:
4515             ip = p[IP]
4516             tcp = p[TCP]
4517             self.assertEqual(ip.src, self.nat_addr)
4518             self.assertEqual(ip.dst, self.pg3.remote_ip4)
4519             self.assertNotEqual(tcp.sport, 12345)
4520             external_port = tcp.sport
4521             self.assertEqual(tcp.dport, 80)
4522             self.check_tcp_checksum(p)
4523             self.check_ip_checksum(p)
4524         except:
4525             self.logger.error(ppp("Unexpected or invalid packet:", p))
4526             raise
4527
4528         # out2in
4529         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
4530              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
4531              TCP(sport=80, dport=external_port))
4532         self.pg3.add_stream(p)
4533         self.pg_enable_capture(self.pg_interfaces)
4534         self.pg_start()
4535         capture = self.pg3.get_capture(1)
4536         p = capture[0]
4537         try:
4538             ip = p[IPv6]
4539             tcp = p[TCP]
4540             self.assertEqual(ip.src, remote_host_ip6)
4541             self.assertEqual(ip.dst, self.pg3.remote_ip6)
4542             self.assertEqual(tcp.sport, 80)
4543             self.assertEqual(tcp.dport, 12345)
4544             self.check_tcp_checksum(p)
4545         except:
4546             self.logger.error(ppp("Unexpected or invalid packet:", p))
4547             raise
4548
4549     def test_frag_in_order(self):
4550         """ NAT64 translate fragments arriving in order """
4551         self.tcp_port_in = random.randint(1025, 65535)
4552
4553         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4554                                                 self.nat_addr_n)
4555         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4556         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4557
4558         reass = self.vapi.nat_reass_dump()
4559         reass_n_start = len(reass)
4560
4561         # in2out
4562         data = 'a' * 200
4563         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4564                                            self.tcp_port_in, 20, data)
4565         self.pg0.add_stream(pkts)
4566         self.pg_enable_capture(self.pg_interfaces)
4567         self.pg_start()
4568         frags = self.pg1.get_capture(len(pkts))
4569         p = self.reass_frags_and_verify(frags,
4570                                         self.nat_addr,
4571                                         self.pg1.remote_ip4)
4572         self.assertEqual(p[TCP].dport, 20)
4573         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4574         self.tcp_port_out = p[TCP].sport
4575         self.assertEqual(data, p[Raw].load)
4576
4577         # out2in
4578         data = "A" * 4 + "b" * 16 + "C" * 3
4579         pkts = self.create_stream_frag(self.pg1,
4580                                        self.nat_addr,
4581                                        20,
4582                                        self.tcp_port_out,
4583                                        data)
4584         self.pg1.add_stream(pkts)
4585         self.pg_enable_capture(self.pg_interfaces)
4586         self.pg_start()
4587         frags = self.pg0.get_capture(len(pkts))
4588         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4589         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4590         self.assertEqual(p[TCP].sport, 20)
4591         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4592         self.assertEqual(data, p[Raw].load)
4593
4594         reass = self.vapi.nat_reass_dump()
4595         reass_n_end = len(reass)
4596
4597         self.assertEqual(reass_n_end - reass_n_start, 2)
4598
4599     def test_reass_hairpinning(self):
4600         """ NAT64 fragments hairpinning """
4601         data = 'a' * 200
4602         client = self.pg0.remote_hosts[0]
4603         server = self.pg0.remote_hosts[1]
4604         server_in_port = random.randint(1025, 65535)
4605         server_out_port = random.randint(1025, 65535)
4606         client_in_port = random.randint(1025, 65535)
4607         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4608         nat_addr_ip6 = ip.src
4609
4610         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4611                                                 self.nat_addr_n)
4612         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4613         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4614
4615         # add static BIB entry for server
4616         self.vapi.nat64_add_del_static_bib(server.ip6n,
4617                                            self.nat_addr_n,
4618                                            server_in_port,
4619                                            server_out_port,
4620                                            IP_PROTOS.tcp)
4621
4622         # send packet from host to server
4623         pkts = self.create_stream_frag_ip6(self.pg0,
4624                                            self.nat_addr,
4625                                            client_in_port,
4626                                            server_out_port,
4627                                            data)
4628         self.pg0.add_stream(pkts)
4629         self.pg_enable_capture(self.pg_interfaces)
4630         self.pg_start()
4631         frags = self.pg0.get_capture(len(pkts))
4632         p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
4633         self.assertNotEqual(p[TCP].sport, client_in_port)
4634         self.assertEqual(p[TCP].dport, server_in_port)
4635         self.assertEqual(data, p[Raw].load)
4636
4637     def test_frag_out_of_order(self):
4638         """ NAT64 translate fragments arriving out of order """
4639         self.tcp_port_in = random.randint(1025, 65535)
4640
4641         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4642                                                 self.nat_addr_n)
4643         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4644         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4645
4646         # in2out
4647         data = 'a' * 200
4648         pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
4649                                            self.tcp_port_in, 20, data)
4650         pkts.reverse()
4651         self.pg0.add_stream(pkts)
4652         self.pg_enable_capture(self.pg_interfaces)
4653         self.pg_start()
4654         frags = self.pg1.get_capture(len(pkts))
4655         p = self.reass_frags_and_verify(frags,
4656                                         self.nat_addr,
4657                                         self.pg1.remote_ip4)
4658         self.assertEqual(p[TCP].dport, 20)
4659         self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
4660         self.tcp_port_out = p[TCP].sport
4661         self.assertEqual(data, p[Raw].load)
4662
4663         # out2in
4664         data = "A" * 4 + "B" * 16 + "C" * 3
4665         pkts = self.create_stream_frag(self.pg1,
4666                                        self.nat_addr,
4667                                        20,
4668                                        self.tcp_port_out,
4669                                        data)
4670         pkts.reverse()
4671         self.pg1.add_stream(pkts)
4672         self.pg_enable_capture(self.pg_interfaces)
4673         self.pg_start()
4674         frags = self.pg0.get_capture(len(pkts))
4675         src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4676         p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
4677         self.assertEqual(p[TCP].sport, 20)
4678         self.assertEqual(p[TCP].dport, self.tcp_port_in)
4679         self.assertEqual(data, p[Raw].load)
4680
4681     def test_interface_addr(self):
4682         """ Acquire NAT64 pool addresses from interface """
4683         self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
4684
4685         # no address in NAT64 pool
4686         adresses = self.vapi.nat44_address_dump()
4687         self.assertEqual(0, len(adresses))
4688
4689         # configure interface address and check NAT64 address pool
4690         self.pg4.config_ip4()
4691         addresses = self.vapi.nat64_pool_addr_dump()
4692         self.assertEqual(len(addresses), 1)
4693         self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
4694
4695         # remove interface address and check NAT64 address pool
4696         self.pg4.unconfig_ip4()
4697         addresses = self.vapi.nat64_pool_addr_dump()
4698         self.assertEqual(0, len(adresses))
4699
4700     def nat64_get_ses_num(self):
4701         """
4702         Return number of active NAT64 sessions.
4703         """
4704         st = self.vapi.nat64_st_dump()
4705         return len(st)
4706
4707     def clear_nat64(self):
4708         """
4709         Clear NAT64 configuration.
4710         """
4711         self.vapi.nat64_set_timeouts()
4712
4713         interfaces = self.vapi.nat64_interface_dump()
4714         for intf in interfaces:
4715             if intf.is_inside > 1:
4716                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
4717                                                   0,
4718                                                   is_add=0)
4719             self.vapi.nat64_add_del_interface(intf.sw_if_index,
4720                                               intf.is_inside,
4721                                               is_add=0)
4722
4723         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4724         for bibe in bib:
4725             if bibe.is_static:
4726                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4727                                                    bibe.o_addr,
4728                                                    bibe.i_port,
4729                                                    bibe.o_port,
4730                                                    bibe.proto,
4731                                                    bibe.vrf_id,
4732                                                    is_add=0)
4733
4734         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
4735         for bibe in bib:
4736             if bibe.is_static:
4737                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4738                                                    bibe.o_addr,
4739                                                    bibe.i_port,
4740                                                    bibe.o_port,
4741                                                    bibe.proto,
4742                                                    bibe.vrf_id,
4743                                                    is_add=0)
4744
4745         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
4746         for bibe in bib:
4747             if bibe.is_static:
4748                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
4749                                                    bibe.o_addr,
4750                                                    bibe.i_port,
4751                                                    bibe.o_port,
4752                                                    bibe.proto,
4753                                                    bibe.vrf_id,
4754                                                    is_add=0)
4755
4756         adresses = self.vapi.nat64_pool_addr_dump()
4757         for addr in adresses:
4758             self.vapi.nat64_add_del_pool_addr_range(addr.address,
4759                                                     addr.address,
4760                                                     vrf_id=addr.vrf_id,
4761                                                     is_add=0)
4762
4763         prefixes = self.vapi.nat64_prefix_dump()
4764         for prefix in prefixes:
4765             self.vapi.nat64_add_del_prefix(prefix.prefix,
4766                                            prefix.prefix_len,
4767                                            vrf_id=prefix.vrf_id,
4768                                            is_add=0)
4769
4770     def tearDown(self):
4771         super(TestNAT64, self).tearDown()
4772         if not self.vpp_dead:
4773             self.logger.info(self.vapi.cli("show nat64 pool"))
4774             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4775             self.logger.info(self.vapi.cli("show nat64 prefix"))
4776             self.logger.info(self.vapi.cli("show nat64 bib all"))
4777             self.logger.info(self.vapi.cli("show nat64 session table all"))
4778             self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4779             self.clear_nat64()
4780
4781
4782 class TestDSlite(MethodHolder):
4783     """ DS-Lite Test Cases """
4784
4785     @classmethod
4786     def setUpClass(cls):
4787         super(TestDSlite, cls).setUpClass()
4788
4789         try:
4790             cls.nat_addr = '10.0.0.3'
4791             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4792
4793             cls.create_pg_interfaces(range(2))
4794             cls.pg0.admin_up()
4795             cls.pg0.config_ip4()
4796             cls.pg0.resolve_arp()
4797             cls.pg1.admin_up()
4798             cls.pg1.config_ip6()
4799             cls.pg1.generate_remote_hosts(2)
4800             cls.pg1.configure_ipv6_neighbors()
4801
4802         except Exception:
4803             super(TestDSlite, cls).tearDownClass()
4804             raise
4805
4806     def test_dslite(self):
4807         """ Test DS-Lite """
4808         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4809                                                  self.nat_addr_n)
4810         aftr_ip4 = '192.0.0.1'
4811         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4812         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4813         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4814         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4815
4816         # UDP
4817         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4818              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4819              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4820              UDP(sport=20000, dport=10000))
4821         self.pg1.add_stream(p)
4822         self.pg_enable_capture(self.pg_interfaces)
4823         self.pg_start()
4824         capture = self.pg0.get_capture(1)
4825         capture = capture[0]
4826         self.assertFalse(capture.haslayer(IPv6))
4827         self.assertEqual(capture[IP].src, self.nat_addr)
4828         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4829         self.assertNotEqual(capture[UDP].sport, 20000)
4830         self.assertEqual(capture[UDP].dport, 10000)
4831         self.check_ip_checksum(capture)
4832         out_port = capture[UDP].sport
4833
4834         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4835              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4836              UDP(sport=10000, dport=out_port))
4837         self.pg0.add_stream(p)
4838         self.pg_enable_capture(self.pg_interfaces)
4839         self.pg_start()
4840         capture = self.pg1.get_capture(1)
4841         capture = capture[0]
4842         self.assertEqual(capture[IPv6].src, aftr_ip6)
4843         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4844         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4845         self.assertEqual(capture[IP].dst, '192.168.1.1')
4846         self.assertEqual(capture[UDP].sport, 10000)
4847         self.assertEqual(capture[UDP].dport, 20000)
4848         self.check_ip_checksum(capture)
4849
4850         # TCP
4851         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4852              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4853              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4854              TCP(sport=20001, dport=10001))
4855         self.pg1.add_stream(p)
4856         self.pg_enable_capture(self.pg_interfaces)
4857         self.pg_start()
4858         capture = self.pg0.get_capture(1)
4859         capture = capture[0]
4860         self.assertFalse(capture.haslayer(IPv6))
4861         self.assertEqual(capture[IP].src, self.nat_addr)
4862         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4863         self.assertNotEqual(capture[TCP].sport, 20001)
4864         self.assertEqual(capture[TCP].dport, 10001)
4865         self.check_ip_checksum(capture)
4866         self.check_tcp_checksum(capture)
4867         out_port = capture[TCP].sport
4868
4869         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4870              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4871              TCP(sport=10001, dport=out_port))
4872         self.pg0.add_stream(p)
4873         self.pg_enable_capture(self.pg_interfaces)
4874         self.pg_start()
4875         capture = self.pg1.get_capture(1)
4876         capture = capture[0]
4877         self.assertEqual(capture[IPv6].src, aftr_ip6)
4878         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4879         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4880         self.assertEqual(capture[IP].dst, '192.168.1.1')
4881         self.assertEqual(capture[TCP].sport, 10001)
4882         self.assertEqual(capture[TCP].dport, 20001)
4883         self.check_ip_checksum(capture)
4884         self.check_tcp_checksum(capture)
4885
4886         # ICMP
4887         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4888              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4889              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4890              ICMP(id=4000, type='echo-request'))
4891         self.pg1.add_stream(p)
4892         self.pg_enable_capture(self.pg_interfaces)
4893         self.pg_start()
4894         capture = self.pg0.get_capture(1)
4895         capture = capture[0]
4896         self.assertFalse(capture.haslayer(IPv6))
4897         self.assertEqual(capture[IP].src, self.nat_addr)
4898         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4899         self.assertNotEqual(capture[ICMP].id, 4000)
4900         self.check_ip_checksum(capture)
4901         self.check_icmp_checksum(capture)
4902         out_id = capture[ICMP].id
4903
4904         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4905              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4906              ICMP(id=out_id, type='echo-reply'))
4907         self.pg0.add_stream(p)
4908         self.pg_enable_capture(self.pg_interfaces)
4909         self.pg_start()
4910         capture = self.pg1.get_capture(1)
4911         capture = capture[0]
4912         self.assertEqual(capture[IPv6].src, aftr_ip6)
4913         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4914         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4915         self.assertEqual(capture[IP].dst, '192.168.1.1')
4916         self.assertEqual(capture[ICMP].id, 4000)
4917         self.check_ip_checksum(capture)
4918         self.check_icmp_checksum(capture)
4919
4920         # ping DS-Lite AFTR tunnel endpoint address
4921         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4922              IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
4923              ICMPv6EchoRequest())
4924         self.pg1.add_stream(p)
4925         self.pg_enable_capture(self.pg_interfaces)
4926         self.pg_start()
4927         capture = self.pg1.get_capture(1)
4928         self.assertEqual(1, len(capture))
4929         capture = capture[0]
4930         self.assertEqual(capture[IPv6].src, aftr_ip6)
4931         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4932         self.assertTrue(capture.haslayer(ICMPv6EchoReply))
4933
4934     def tearDown(self):
4935         super(TestDSlite, self).tearDown()
4936         if not self.vpp_dead:
4937             self.logger.info(self.vapi.cli("show dslite pool"))
4938             self.logger.info(
4939                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4940             self.logger.info(self.vapi.cli("show dslite sessions"))
4941
4942 if __name__ == '__main__':
4943     unittest.main(testRunner=VppTestRunner)