NAT: DS-Lite (VPP-1040)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from scapy.layers.inet import IP, TCP, UDP, ICMP
10 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
11 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
12 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
13 from scapy.layers.l2 import Ether, ARP, GRE
14 from scapy.data import IP_PROTOS
15 from scapy.packet import bind_layers
16 from util import ppp
17 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
18 from time import sleep
19 from util import ip4_range
20 from util import mactobinary
21
22
23 class MethodHolder(VppTestCase):
24     """ NAT create capture and verify method holder """
25
26     @classmethod
27     def setUpClass(cls):
28         super(MethodHolder, cls).setUpClass()
29
30     def tearDown(self):
31         super(MethodHolder, self).tearDown()
32
33     def check_ip_checksum(self, pkt):
34         """
35         Check IP checksum of the packet
36
37         :param pkt: Packet to check IP checksum
38         """
39         new = pkt.__class__(str(pkt))
40         del new['IP'].chksum
41         new = new.__class__(str(new))
42         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
43
44     def check_tcp_checksum(self, pkt):
45         """
46         Check TCP checksum in IP packet
47
48         :param pkt: Packet to check TCP checksum
49         """
50         new = pkt.__class__(str(pkt))
51         del new['TCP'].chksum
52         new = new.__class__(str(new))
53         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
54
55     def check_udp_checksum(self, pkt):
56         """
57         Check UDP checksum in IP packet
58
59         :param pkt: Packet to check UDP checksum
60         """
61         new = pkt.__class__(str(pkt))
62         del new['UDP'].chksum
63         new = new.__class__(str(new))
64         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
65
66     def check_icmp_errror_embedded(self, pkt):
67         """
68         Check ICMP error embeded packet checksum
69
70         :param pkt: Packet to check ICMP error embeded packet checksum
71         """
72         if pkt.haslayer(IPerror):
73             new = pkt.__class__(str(pkt))
74             del new['IPerror'].chksum
75             new = new.__class__(str(new))
76             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
77
78         if pkt.haslayer(TCPerror):
79             new = pkt.__class__(str(pkt))
80             del new['TCPerror'].chksum
81             new = new.__class__(str(new))
82             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
83
84         if pkt.haslayer(UDPerror):
85             if pkt['UDPerror'].chksum != 0:
86                 new = pkt.__class__(str(pkt))
87                 del new['UDPerror'].chksum
88                 new = new.__class__(str(new))
89                 self.assertEqual(new['UDPerror'].chksum,
90                                  pkt['UDPerror'].chksum)
91
92         if pkt.haslayer(ICMPerror):
93             del new['ICMPerror'].chksum
94             new = new.__class__(str(new))
95             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
96
97     def check_icmp_checksum(self, pkt):
98         """
99         Check ICMP checksum in IPv4 packet
100
101         :param pkt: Packet to check ICMP checksum
102         """
103         new = pkt.__class__(str(pkt))
104         del new['ICMP'].chksum
105         new = new.__class__(str(new))
106         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
107         if pkt.haslayer(IPerror):
108             self.check_icmp_errror_embedded(pkt)
109
110     def check_icmpv6_checksum(self, pkt):
111         """
112         Check ICMPv6 checksum in IPv4 packet
113
114         :param pkt: Packet to check ICMPv6 checksum
115         """
116         new = pkt.__class__(str(pkt))
117         if pkt.haslayer(ICMPv6DestUnreach):
118             del new['ICMPv6DestUnreach'].cksum
119             new = new.__class__(str(new))
120             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
121                              pkt['ICMPv6DestUnreach'].cksum)
122             self.check_icmp_errror_embedded(pkt)
123         if pkt.haslayer(ICMPv6EchoRequest):
124             del new['ICMPv6EchoRequest'].cksum
125             new = new.__class__(str(new))
126             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
127                              pkt['ICMPv6EchoRequest'].cksum)
128         if pkt.haslayer(ICMPv6EchoReply):
129             del new['ICMPv6EchoReply'].cksum
130             new = new.__class__(str(new))
131             self.assertEqual(new['ICMPv6EchoReply'].cksum,
132                              pkt['ICMPv6EchoReply'].cksum)
133
134     def create_stream_in(self, in_if, out_if, ttl=64):
135         """
136         Create packet stream for inside network
137
138         :param in_if: Inside interface
139         :param out_if: Outside interface
140         :param ttl: TTL of generated packets
141         """
142         pkts = []
143         # TCP
144         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
145              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
146              TCP(sport=self.tcp_port_in, dport=20))
147         pkts.append(p)
148
149         # UDP
150         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
151              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
152              UDP(sport=self.udp_port_in, dport=20))
153         pkts.append(p)
154
155         # ICMP
156         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
157              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
158              ICMP(id=self.icmp_id_in, type='echo-request'))
159         pkts.append(p)
160
161         return pkts
162
163     def compose_ip6(self, ip4, pref, plen):
164         """
165         Compose IPv4-embedded IPv6 addresses
166
167         :param ip4: IPv4 address
168         :param pref: IPv6 prefix
169         :param plen: IPv6 prefix length
170         :returns: IPv4-embedded IPv6 addresses
171         """
172         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
173         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
174         if plen == 32:
175             pref_n[4] = ip4_n[0]
176             pref_n[5] = ip4_n[1]
177             pref_n[6] = ip4_n[2]
178             pref_n[7] = ip4_n[3]
179         elif plen == 40:
180             pref_n[5] = ip4_n[0]
181             pref_n[6] = ip4_n[1]
182             pref_n[7] = ip4_n[2]
183             pref_n[9] = ip4_n[3]
184         elif plen == 48:
185             pref_n[6] = ip4_n[0]
186             pref_n[7] = ip4_n[1]
187             pref_n[9] = ip4_n[2]
188             pref_n[10] = ip4_n[3]
189         elif plen == 56:
190             pref_n[7] = ip4_n[0]
191             pref_n[9] = ip4_n[1]
192             pref_n[10] = ip4_n[2]
193             pref_n[11] = ip4_n[3]
194         elif plen == 64:
195             pref_n[9] = ip4_n[0]
196             pref_n[10] = ip4_n[1]
197             pref_n[11] = ip4_n[2]
198             pref_n[12] = ip4_n[3]
199         elif plen == 96:
200             pref_n[12] = ip4_n[0]
201             pref_n[13] = ip4_n[1]
202             pref_n[14] = ip4_n[2]
203             pref_n[15] = ip4_n[3]
204         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
205
206     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
207         """
208         Create IPv6 packet stream for inside network
209
210         :param in_if: Inside interface
211         :param out_if: Outside interface
212         :param ttl: Hop Limit of generated packets
213         :param pref: NAT64 prefix
214         :param plen: NAT64 prefix length
215         """
216         pkts = []
217         if pref is None:
218             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
219         else:
220             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
221
222         # TCP
223         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
225              TCP(sport=self.tcp_port_in, dport=20))
226         pkts.append(p)
227
228         # UDP
229         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
230              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
231              UDP(sport=self.udp_port_in, dport=20))
232         pkts.append(p)
233
234         # ICMP
235         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
236              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
237              ICMPv6EchoRequest(id=self.icmp_id_in))
238         pkts.append(p)
239
240         return pkts
241
242     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
243         """
244         Create packet stream for outside network
245
246         :param out_if: Outside interface
247         :param dst_ip: Destination IP address (Default use global NAT address)
248         :param ttl: TTL of generated packets
249         """
250         if dst_ip is None:
251             dst_ip = self.nat_addr
252         pkts = []
253         # TCP
254         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
255              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
256              TCP(dport=self.tcp_port_out, sport=20))
257         pkts.append(p)
258
259         # UDP
260         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
261              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
262              UDP(dport=self.udp_port_out, sport=20))
263         pkts.append(p)
264
265         # ICMP
266         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
267              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
268              ICMP(id=self.icmp_id_out, type='echo-reply'))
269         pkts.append(p)
270
271         return pkts
272
273     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
274                            packet_num=3, dst_ip=None):
275         """
276         Verify captured packets on outside network
277
278         :param capture: Captured packets
279         :param nat_ip: Translated IP address (Default use global NAT address)
280         :param same_port: Sorce port number is not translated (Default False)
281         :param packet_num: Expected number of packets (Default 3)
282         :param dst_ip: Destination IP address (Default do not verify)
283         """
284         if nat_ip is None:
285             nat_ip = self.nat_addr
286         self.assertEqual(packet_num, len(capture))
287         for packet in capture:
288             try:
289                 self.check_ip_checksum(packet)
290                 self.assertEqual(packet[IP].src, nat_ip)
291                 if dst_ip is not None:
292                     self.assertEqual(packet[IP].dst, dst_ip)
293                 if packet.haslayer(TCP):
294                     if same_port:
295                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
296                     else:
297                         self.assertNotEqual(
298                             packet[TCP].sport, self.tcp_port_in)
299                     self.tcp_port_out = packet[TCP].sport
300                     self.check_tcp_checksum(packet)
301                 elif packet.haslayer(UDP):
302                     if same_port:
303                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
304                     else:
305                         self.assertNotEqual(
306                             packet[UDP].sport, self.udp_port_in)
307                     self.udp_port_out = packet[UDP].sport
308                 else:
309                     if same_port:
310                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
311                     else:
312                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
313                     self.icmp_id_out = packet[ICMP].id
314                     self.check_icmp_checksum(packet)
315             except:
316                 self.logger.error(ppp("Unexpected or invalid packet "
317                                       "(outside network):", packet))
318                 raise
319
320     def verify_capture_in(self, capture, in_if, packet_num=3):
321         """
322         Verify captured packets on inside network
323
324         :param capture: Captured packets
325         :param in_if: Inside interface
326         :param packet_num: Expected number of packets (Default 3)
327         """
328         self.assertEqual(packet_num, len(capture))
329         for packet in capture:
330             try:
331                 self.check_ip_checksum(packet)
332                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
333                 if packet.haslayer(TCP):
334                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
335                     self.check_tcp_checksum(packet)
336                 elif packet.haslayer(UDP):
337                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
338                 else:
339                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
340                     self.check_icmp_checksum(packet)
341             except:
342                 self.logger.error(ppp("Unexpected or invalid packet "
343                                       "(inside network):", packet))
344                 raise
345
346     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
347         """
348         Verify captured IPv6 packets on inside network
349
350         :param capture: Captured packets
351         :param src_ip: Source IP
352         :param dst_ip: Destination IP address
353         :param packet_num: Expected number of packets (Default 3)
354         """
355         self.assertEqual(packet_num, len(capture))
356         for packet in capture:
357             try:
358                 self.assertEqual(packet[IPv6].src, src_ip)
359                 self.assertEqual(packet[IPv6].dst, dst_ip)
360                 if packet.haslayer(TCP):
361                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
362                     self.check_tcp_checksum(packet)
363                 elif packet.haslayer(UDP):
364                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
365                     self.check_udp_checksum(packet)
366                 else:
367                     self.assertEqual(packet[ICMPv6EchoReply].id,
368                                      self.icmp_id_in)
369                     self.check_icmpv6_checksum(packet)
370             except:
371                 self.logger.error(ppp("Unexpected or invalid packet "
372                                       "(inside network):", packet))
373                 raise
374
375     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
376         """
377         Verify captured packet that don't have to be translated
378
379         :param capture: Captured packets
380         :param ingress_if: Ingress interface
381         :param egress_if: Egress interface
382         """
383         for packet in capture:
384             try:
385                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
386                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
387                 if packet.haslayer(TCP):
388                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
389                 elif packet.haslayer(UDP):
390                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
391                 else:
392                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
393             except:
394                 self.logger.error(ppp("Unexpected or invalid packet "
395                                       "(inside network):", packet))
396                 raise
397
398     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
399                                             packet_num=3, icmp_type=11):
400         """
401         Verify captured packets with ICMP errors on outside network
402
403         :param capture: Captured packets
404         :param src_ip: Translated IP address or IP address of VPP
405                        (Default use global NAT address)
406         :param packet_num: Expected number of packets (Default 3)
407         :param icmp_type: Type of error ICMP packet
408                           we are expecting (Default 11)
409         """
410         if src_ip is None:
411             src_ip = self.nat_addr
412         self.assertEqual(packet_num, len(capture))
413         for packet in capture:
414             try:
415                 self.assertEqual(packet[IP].src, src_ip)
416                 self.assertTrue(packet.haslayer(ICMP))
417                 icmp = packet[ICMP]
418                 self.assertEqual(icmp.type, icmp_type)
419                 self.assertTrue(icmp.haslayer(IPerror))
420                 inner_ip = icmp[IPerror]
421                 if inner_ip.haslayer(TCPerror):
422                     self.assertEqual(inner_ip[TCPerror].dport,
423                                      self.tcp_port_out)
424                 elif inner_ip.haslayer(UDPerror):
425                     self.assertEqual(inner_ip[UDPerror].dport,
426                                      self.udp_port_out)
427                 else:
428                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
429             except:
430                 self.logger.error(ppp("Unexpected or invalid packet "
431                                       "(outside network):", packet))
432                 raise
433
434     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435                                            icmp_type=11):
436         """
437         Verify captured packets with ICMP errors on inside network
438
439         :param capture: Captured packets
440         :param in_if: Inside interface
441         :param packet_num: Expected number of packets (Default 3)
442         :param icmp_type: Type of error ICMP packet
443                           we are expecting (Default 11)
444         """
445         self.assertEqual(packet_num, len(capture))
446         for packet in capture:
447             try:
448                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
449                 self.assertTrue(packet.haslayer(ICMP))
450                 icmp = packet[ICMP]
451                 self.assertEqual(icmp.type, icmp_type)
452                 self.assertTrue(icmp.haslayer(IPerror))
453                 inner_ip = icmp[IPerror]
454                 if inner_ip.haslayer(TCPerror):
455                     self.assertEqual(inner_ip[TCPerror].sport,
456                                      self.tcp_port_in)
457                 elif inner_ip.haslayer(UDPerror):
458                     self.assertEqual(inner_ip[UDPerror].sport,
459                                      self.udp_port_in)
460                 else:
461                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
462             except:
463                 self.logger.error(ppp("Unexpected or invalid packet "
464                                       "(inside network):", packet))
465                 raise
466
467     def verify_ipfix_nat44_ses(self, data):
468         """
469         Verify IPFIX NAT44 session create/delete event
470
471         :param data: Decoded IPFIX data records
472         """
473         nat44_ses_create_num = 0
474         nat44_ses_delete_num = 0
475         self.assertEqual(6, len(data))
476         for record in data:
477             # natEvent
478             self.assertIn(ord(record[230]), [4, 5])
479             if ord(record[230]) == 4:
480                 nat44_ses_create_num += 1
481             else:
482                 nat44_ses_delete_num += 1
483             # sourceIPv4Address
484             self.assertEqual(self.pg0.remote_ip4n, record[8])
485             # postNATSourceIPv4Address
486             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
487                              record[225])
488             # ingressVRFID
489             self.assertEqual(struct.pack("!I", 0), record[234])
490             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
491             if IP_PROTOS.icmp == ord(record[4]):
492                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
493                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
494                                  record[227])
495             elif IP_PROTOS.tcp == ord(record[4]):
496                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
497                                  record[7])
498                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
499                                  record[227])
500             elif IP_PROTOS.udp == ord(record[4]):
501                 self.assertEqual(struct.pack("!H", self.udp_port_in),
502                                  record[7])
503                 self.assertEqual(struct.pack("!H", self.udp_port_out),
504                                  record[227])
505             else:
506                 self.fail("Invalid protocol")
507         self.assertEqual(3, nat44_ses_create_num)
508         self.assertEqual(3, nat44_ses_delete_num)
509
510     def verify_ipfix_addr_exhausted(self, data):
511         """
512         Verify IPFIX NAT addresses event
513
514         :param data: Decoded IPFIX data records
515         """
516         self.assertEqual(1, len(data))
517         record = data[0]
518         # natEvent
519         self.assertEqual(ord(record[230]), 3)
520         # natPoolID
521         self.assertEqual(struct.pack("!I", 0), record[283])
522
523
524 class TestNAT44(MethodHolder):
525     """ NAT44 Test Cases """
526
527     @classmethod
528     def setUpClass(cls):
529         super(TestNAT44, cls).setUpClass()
530
531         try:
532             cls.tcp_port_in = 6303
533             cls.tcp_port_out = 6303
534             cls.udp_port_in = 6304
535             cls.udp_port_out = 6304
536             cls.icmp_id_in = 6305
537             cls.icmp_id_out = 6305
538             cls.nat_addr = '10.0.0.3'
539             cls.ipfix_src_port = 4739
540             cls.ipfix_domain_id = 1
541
542             cls.create_pg_interfaces(range(10))
543             cls.interfaces = list(cls.pg_interfaces[0:4])
544
545             for i in cls.interfaces:
546                 i.admin_up()
547                 i.config_ip4()
548                 i.resolve_arp()
549
550             cls.pg0.generate_remote_hosts(3)
551             cls.pg0.configure_ipv4_neighbors()
552
553             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
554             cls.vapi.ip_table_add_del(10, is_add=1)
555             cls.vapi.ip_table_add_del(20, is_add=1)
556
557             cls.pg4._local_ip4 = "172.16.255.1"
558             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
559             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
560             cls.pg4.set_table_ip4(10)
561             cls.pg5._local_ip4 = "172.17.255.3"
562             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
563             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
564             cls.pg5.set_table_ip4(10)
565             cls.pg6._local_ip4 = "172.16.255.1"
566             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
567             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
568             cls.pg6.set_table_ip4(20)
569             for i in cls.overlapping_interfaces:
570                 i.config_ip4()
571                 i.admin_up()
572                 i.resolve_arp()
573
574             cls.pg7.admin_up()
575             cls.pg8.admin_up()
576
577             cls.pg9.generate_remote_hosts(2)
578             cls.pg9.config_ip4()
579             ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
580             cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
581                                                   ip_addr_n,
582                                                   24)
583             cls.pg9.admin_up()
584             cls.pg9.resolve_arp()
585             cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
586             cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
587             cls.pg9.resolve_arp()
588
589         except Exception:
590             super(TestNAT44, cls).tearDownClass()
591             raise
592
593     def clear_nat44(self):
594         """
595         Clear NAT44 configuration.
596         """
597         # I found no elegant way to do this
598         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
599                                    dst_address_length=32,
600                                    next_hop_address=self.pg7.remote_ip4n,
601                                    next_hop_sw_if_index=self.pg7.sw_if_index,
602                                    is_add=0)
603         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
604                                    dst_address_length=32,
605                                    next_hop_address=self.pg8.remote_ip4n,
606                                    next_hop_sw_if_index=self.pg8.sw_if_index,
607                                    is_add=0)
608
609         for intf in [self.pg7, self.pg8]:
610             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
611             for n in neighbors:
612                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
613                                               n.mac_address,
614                                               n.ip_address,
615                                               is_add=0)
616
617         if self.pg7.has_ip4_config:
618             self.pg7.unconfig_ip4()
619
620         interfaces = self.vapi.nat44_interface_addr_dump()
621         for intf in interfaces:
622             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
623
624         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
625                             domain_id=self.ipfix_domain_id)
626         self.ipfix_src_port = 4739
627         self.ipfix_domain_id = 1
628
629         interfaces = self.vapi.nat44_interface_dump()
630         for intf in interfaces:
631             if intf.is_inside > 1:
632                 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
633                                                           0,
634                                                           is_add=0)
635             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
636                                                       intf.is_inside,
637                                                       is_add=0)
638
639         interfaces = self.vapi.nat44_interface_output_feature_dump()
640         for intf in interfaces:
641             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
642                                                              intf.is_inside,
643                                                              is_add=0)
644
645         static_mappings = self.vapi.nat44_static_mapping_dump()
646         for sm in static_mappings:
647             self.vapi.nat44_add_del_static_mapping(
648                 sm.local_ip_address,
649                 sm.external_ip_address,
650                 local_port=sm.local_port,
651                 external_port=sm.external_port,
652                 addr_only=sm.addr_only,
653                 vrf_id=sm.vrf_id,
654                 protocol=sm.protocol,
655                 is_add=0)
656
657         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
658         for lb_sm in lb_static_mappings:
659             self.vapi.nat44_add_del_lb_static_mapping(
660                 lb_sm.external_addr,
661                 lb_sm.external_port,
662                 lb_sm.protocol,
663                 lb_sm.vrf_id,
664                 is_add=0,
665                 local_num=0,
666                 locals=[])
667
668         adresses = self.vapi.nat44_address_dump()
669         for addr in adresses:
670             self.vapi.nat44_add_del_address_range(addr.ip_address,
671                                                   addr.ip_address,
672                                                   is_add=0)
673
674     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
675                                  local_port=0, external_port=0, vrf_id=0,
676                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
677                                  proto=0):
678         """
679         Add/delete NAT44 static mapping
680
681         :param local_ip: Local IP address
682         :param external_ip: External IP address
683         :param local_port: Local port number (Optional)
684         :param external_port: External port number (Optional)
685         :param vrf_id: VRF ID (Default 0)
686         :param is_add: 1 if add, 0 if delete (Default add)
687         :param external_sw_if_index: External interface instead of IP address
688         :param proto: IP protocol (Mandatory if port specified)
689         """
690         addr_only = 1
691         if local_port and external_port:
692             addr_only = 0
693         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
694         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
695         self.vapi.nat44_add_del_static_mapping(
696             l_ip,
697             e_ip,
698             external_sw_if_index,
699             local_port,
700             external_port,
701             addr_only,
702             vrf_id,
703             proto,
704             is_add)
705
706     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
707         """
708         Add/delete NAT44 address
709
710         :param ip: IP address
711         :param is_add: 1 if add, 0 if delete (Default add)
712         """
713         nat_addr = socket.inet_pton(socket.AF_INET, ip)
714         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
715                                               vrf_id=vrf_id)
716
717     def test_dynamic(self):
718         """ NAT44 dynamic translation test """
719
720         self.nat44_add_address(self.nat_addr)
721         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
722         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
723                                                   is_inside=0)
724
725         # in2out
726         pkts = self.create_stream_in(self.pg0, self.pg1)
727         self.pg0.add_stream(pkts)
728         self.pg_enable_capture(self.pg_interfaces)
729         self.pg_start()
730         capture = self.pg1.get_capture(len(pkts))
731         self.verify_capture_out(capture)
732
733         # out2in
734         pkts = self.create_stream_out(self.pg1)
735         self.pg1.add_stream(pkts)
736         self.pg_enable_capture(self.pg_interfaces)
737         self.pg_start()
738         capture = self.pg0.get_capture(len(pkts))
739         self.verify_capture_in(capture, self.pg0)
740
741     def test_dynamic_icmp_errors_in2out_ttl_1(self):
742         """ NAT44 handling of client packets with TTL=1 """
743
744         self.nat44_add_address(self.nat_addr)
745         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
746         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
747                                                   is_inside=0)
748
749         # Client side - generate traffic
750         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
751         self.pg0.add_stream(pkts)
752         self.pg_enable_capture(self.pg_interfaces)
753         self.pg_start()
754
755         # Client side - verify ICMP type 11 packets
756         capture = self.pg0.get_capture(len(pkts))
757         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
758
759     def test_dynamic_icmp_errors_out2in_ttl_1(self):
760         """ NAT44 handling of server packets with TTL=1 """
761
762         self.nat44_add_address(self.nat_addr)
763         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
764         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
765                                                   is_inside=0)
766
767         # Client side - create sessions
768         pkts = self.create_stream_in(self.pg0, self.pg1)
769         self.pg0.add_stream(pkts)
770         self.pg_enable_capture(self.pg_interfaces)
771         self.pg_start()
772
773         # Server side - generate traffic
774         capture = self.pg1.get_capture(len(pkts))
775         self.verify_capture_out(capture)
776         pkts = self.create_stream_out(self.pg1, ttl=1)
777         self.pg1.add_stream(pkts)
778         self.pg_enable_capture(self.pg_interfaces)
779         self.pg_start()
780
781         # Server side - verify ICMP type 11 packets
782         capture = self.pg1.get_capture(len(pkts))
783         self.verify_capture_out_with_icmp_errors(capture,
784                                                  src_ip=self.pg1.local_ip4)
785
786     def test_dynamic_icmp_errors_in2out_ttl_2(self):
787         """ NAT44 handling of error responses to client packets with TTL=2 """
788
789         self.nat44_add_address(self.nat_addr)
790         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
791         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
792                                                   is_inside=0)
793
794         # Client side - generate traffic
795         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
796         self.pg0.add_stream(pkts)
797         self.pg_enable_capture(self.pg_interfaces)
798         self.pg_start()
799
800         # Server side - simulate ICMP type 11 response
801         capture = self.pg1.get_capture(len(pkts))
802         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
803                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
804                 ICMP(type=11) / packet[IP] for packet in capture]
805         self.pg1.add_stream(pkts)
806         self.pg_enable_capture(self.pg_interfaces)
807         self.pg_start()
808
809         # Client side - verify ICMP type 11 packets
810         capture = self.pg0.get_capture(len(pkts))
811         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
812
813     def test_dynamic_icmp_errors_out2in_ttl_2(self):
814         """ NAT44 handling of error responses to server packets with TTL=2 """
815
816         self.nat44_add_address(self.nat_addr)
817         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
818         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
819                                                   is_inside=0)
820
821         # Client side - create sessions
822         pkts = self.create_stream_in(self.pg0, self.pg1)
823         self.pg0.add_stream(pkts)
824         self.pg_enable_capture(self.pg_interfaces)
825         self.pg_start()
826
827         # Server side - generate traffic
828         capture = self.pg1.get_capture(len(pkts))
829         self.verify_capture_out(capture)
830         pkts = self.create_stream_out(self.pg1, ttl=2)
831         self.pg1.add_stream(pkts)
832         self.pg_enable_capture(self.pg_interfaces)
833         self.pg_start()
834
835         # Client side - simulate ICMP type 11 response
836         capture = self.pg0.get_capture(len(pkts))
837         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
838                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
839                 ICMP(type=11) / packet[IP] for packet in capture]
840         self.pg0.add_stream(pkts)
841         self.pg_enable_capture(self.pg_interfaces)
842         self.pg_start()
843
844         # Server side - verify ICMP type 11 packets
845         capture = self.pg1.get_capture(len(pkts))
846         self.verify_capture_out_with_icmp_errors(capture)
847
848     def test_ping_out_interface_from_outside(self):
849         """ Ping NAT44 out interface from outside network """
850
851         self.nat44_add_address(self.nat_addr)
852         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
853         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
854                                                   is_inside=0)
855
856         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
857              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
858              ICMP(id=self.icmp_id_out, type='echo-request'))
859         pkts = [p]
860         self.pg1.add_stream(pkts)
861         self.pg_enable_capture(self.pg_interfaces)
862         self.pg_start()
863         capture = self.pg1.get_capture(len(pkts))
864         self.assertEqual(1, len(capture))
865         packet = capture[0]
866         try:
867             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
868             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
869             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
870             self.assertEqual(packet[ICMP].type, 0)  # echo reply
871         except:
872             self.logger.error(ppp("Unexpected or invalid packet "
873                                   "(outside network):", packet))
874             raise
875
876     def test_ping_internal_host_from_outside(self):
877         """ Ping internal host from outside network """
878
879         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
880         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
881         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
882                                                   is_inside=0)
883
884         # out2in
885         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
886                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
887                ICMP(id=self.icmp_id_out, type='echo-request'))
888         self.pg1.add_stream(pkt)
889         self.pg_enable_capture(self.pg_interfaces)
890         self.pg_start()
891         capture = self.pg0.get_capture(1)
892         self.verify_capture_in(capture, self.pg0, packet_num=1)
893         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
894
895         # in2out
896         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
897                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
898                ICMP(id=self.icmp_id_in, type='echo-reply'))
899         self.pg0.add_stream(pkt)
900         self.pg_enable_capture(self.pg_interfaces)
901         self.pg_start()
902         capture = self.pg1.get_capture(1)
903         self.verify_capture_out(capture, same_port=True, packet_num=1)
904         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
905
906     def test_static_in(self):
907         """ 1:1 NAT initialized from inside network """
908
909         nat_ip = "10.0.0.10"
910         self.tcp_port_out = 6303
911         self.udp_port_out = 6304
912         self.icmp_id_out = 6305
913
914         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
915         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
916         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
917                                                   is_inside=0)
918
919         # in2out
920         pkts = self.create_stream_in(self.pg0, self.pg1)
921         self.pg0.add_stream(pkts)
922         self.pg_enable_capture(self.pg_interfaces)
923         self.pg_start()
924         capture = self.pg1.get_capture(len(pkts))
925         self.verify_capture_out(capture, nat_ip, True)
926
927         # out2in
928         pkts = self.create_stream_out(self.pg1, nat_ip)
929         self.pg1.add_stream(pkts)
930         self.pg_enable_capture(self.pg_interfaces)
931         self.pg_start()
932         capture = self.pg0.get_capture(len(pkts))
933         self.verify_capture_in(capture, self.pg0)
934
935     def test_static_out(self):
936         """ 1:1 NAT initialized from outside network """
937
938         nat_ip = "10.0.0.20"
939         self.tcp_port_out = 6303
940         self.udp_port_out = 6304
941         self.icmp_id_out = 6305
942
943         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
944         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
945         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
946                                                   is_inside=0)
947
948         # out2in
949         pkts = self.create_stream_out(self.pg1, nat_ip)
950         self.pg1.add_stream(pkts)
951         self.pg_enable_capture(self.pg_interfaces)
952         self.pg_start()
953         capture = self.pg0.get_capture(len(pkts))
954         self.verify_capture_in(capture, self.pg0)
955
956         # in2out
957         pkts = self.create_stream_in(self.pg0, self.pg1)
958         self.pg0.add_stream(pkts)
959         self.pg_enable_capture(self.pg_interfaces)
960         self.pg_start()
961         capture = self.pg1.get_capture(len(pkts))
962         self.verify_capture_out(capture, nat_ip, True)
963
964     def test_static_with_port_in(self):
965         """ 1:1 NAPT initialized from inside network """
966
967         self.tcp_port_out = 3606
968         self.udp_port_out = 3607
969         self.icmp_id_out = 3608
970
971         self.nat44_add_address(self.nat_addr)
972         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
973                                       self.tcp_port_in, self.tcp_port_out,
974                                       proto=IP_PROTOS.tcp)
975         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
976                                       self.udp_port_in, self.udp_port_out,
977                                       proto=IP_PROTOS.udp)
978         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
979                                       self.icmp_id_in, self.icmp_id_out,
980                                       proto=IP_PROTOS.icmp)
981         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
982         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
983                                                   is_inside=0)
984
985         # in2out
986         pkts = self.create_stream_in(self.pg0, self.pg1)
987         self.pg0.add_stream(pkts)
988         self.pg_enable_capture(self.pg_interfaces)
989         self.pg_start()
990         capture = self.pg1.get_capture(len(pkts))
991         self.verify_capture_out(capture)
992
993         # out2in
994         pkts = self.create_stream_out(self.pg1)
995         self.pg1.add_stream(pkts)
996         self.pg_enable_capture(self.pg_interfaces)
997         self.pg_start()
998         capture = self.pg0.get_capture(len(pkts))
999         self.verify_capture_in(capture, self.pg0)
1000
1001     def test_static_with_port_out(self):
1002         """ 1:1 NAPT initialized from outside network """
1003
1004         self.tcp_port_out = 30606
1005         self.udp_port_out = 30607
1006         self.icmp_id_out = 30608
1007
1008         self.nat44_add_address(self.nat_addr)
1009         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1010                                       self.tcp_port_in, self.tcp_port_out,
1011                                       proto=IP_PROTOS.tcp)
1012         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1013                                       self.udp_port_in, self.udp_port_out,
1014                                       proto=IP_PROTOS.udp)
1015         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1016                                       self.icmp_id_in, self.icmp_id_out,
1017                                       proto=IP_PROTOS.icmp)
1018         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1019         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1020                                                   is_inside=0)
1021
1022         # out2in
1023         pkts = self.create_stream_out(self.pg1)
1024         self.pg1.add_stream(pkts)
1025         self.pg_enable_capture(self.pg_interfaces)
1026         self.pg_start()
1027         capture = self.pg0.get_capture(len(pkts))
1028         self.verify_capture_in(capture, self.pg0)
1029
1030         # in2out
1031         pkts = self.create_stream_in(self.pg0, self.pg1)
1032         self.pg0.add_stream(pkts)
1033         self.pg_enable_capture(self.pg_interfaces)
1034         self.pg_start()
1035         capture = self.pg1.get_capture(len(pkts))
1036         self.verify_capture_out(capture)
1037
1038     def test_static_vrf_aware(self):
1039         """ 1:1 NAT VRF awareness """
1040
1041         nat_ip1 = "10.0.0.30"
1042         nat_ip2 = "10.0.0.40"
1043         self.tcp_port_out = 6303
1044         self.udp_port_out = 6304
1045         self.icmp_id_out = 6305
1046
1047         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1048                                       vrf_id=10)
1049         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1050                                       vrf_id=10)
1051         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1052                                                   is_inside=0)
1053         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1054         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1055
1056         # inside interface VRF match NAT44 static mapping VRF
1057         pkts = self.create_stream_in(self.pg4, self.pg3)
1058         self.pg4.add_stream(pkts)
1059         self.pg_enable_capture(self.pg_interfaces)
1060         self.pg_start()
1061         capture = self.pg3.get_capture(len(pkts))
1062         self.verify_capture_out(capture, nat_ip1, True)
1063
1064         # inside interface VRF don't match NAT44 static mapping VRF (packets
1065         # are dropped)
1066         pkts = self.create_stream_in(self.pg0, self.pg3)
1067         self.pg0.add_stream(pkts)
1068         self.pg_enable_capture(self.pg_interfaces)
1069         self.pg_start()
1070         self.pg3.assert_nothing_captured()
1071
1072     def test_static_lb(self):
1073         """ NAT44 local service load balancing """
1074         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1075         external_port = 80
1076         local_port = 8080
1077         server1 = self.pg0.remote_hosts[0]
1078         server2 = self.pg0.remote_hosts[1]
1079
1080         locals = [{'addr': server1.ip4n,
1081                    'port': local_port,
1082                    'probability': 70},
1083                   {'addr': server2.ip4n,
1084                    'port': local_port,
1085                    'probability': 30}]
1086
1087         self.nat44_add_address(self.nat_addr)
1088         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1089                                                   external_port,
1090                                                   IP_PROTOS.tcp,
1091                                                   local_num=len(locals),
1092                                                   locals=locals)
1093         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1094         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1095                                                   is_inside=0)
1096
1097         # from client to service
1098         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1099              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1100              TCP(sport=12345, dport=external_port))
1101         self.pg1.add_stream(p)
1102         self.pg_enable_capture(self.pg_interfaces)
1103         self.pg_start()
1104         capture = self.pg0.get_capture(1)
1105         p = capture[0]
1106         server = None
1107         try:
1108             ip = p[IP]
1109             tcp = p[TCP]
1110             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1111             if ip.dst == server1.ip4:
1112                 server = server1
1113             else:
1114                 server = server2
1115             self.assertEqual(tcp.dport, local_port)
1116             self.check_tcp_checksum(p)
1117             self.check_ip_checksum(p)
1118         except:
1119             self.logger.error(ppp("Unexpected or invalid packet:", p))
1120             raise
1121
1122         # from service back to client
1123         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1124              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1125              TCP(sport=local_port, dport=12345))
1126         self.pg0.add_stream(p)
1127         self.pg_enable_capture(self.pg_interfaces)
1128         self.pg_start()
1129         capture = self.pg1.get_capture(1)
1130         p = capture[0]
1131         try:
1132             ip = p[IP]
1133             tcp = p[TCP]
1134             self.assertEqual(ip.src, self.nat_addr)
1135             self.assertEqual(tcp.sport, external_port)
1136             self.check_tcp_checksum(p)
1137             self.check_ip_checksum(p)
1138         except:
1139             self.logger.error(ppp("Unexpected or invalid packet:", p))
1140             raise
1141
1142         # multiple clients
1143         server1_n = 0
1144         server2_n = 0
1145         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1146         pkts = []
1147         for client in clients:
1148             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1149                  IP(src=client, dst=self.nat_addr) /
1150                  TCP(sport=12345, dport=external_port))
1151             pkts.append(p)
1152         self.pg1.add_stream(pkts)
1153         self.pg_enable_capture(self.pg_interfaces)
1154         self.pg_start()
1155         capture = self.pg0.get_capture(len(pkts))
1156         for p in capture:
1157             if p[IP].dst == server1.ip4:
1158                 server1_n += 1
1159             else:
1160                 server2_n += 1
1161         self.assertTrue(server1_n > server2_n)
1162
1163     def test_multiple_inside_interfaces(self):
1164         """ NAT44 multiple non-overlapping address space inside interfaces """
1165
1166         self.nat44_add_address(self.nat_addr)
1167         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1168         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1169         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1170                                                   is_inside=0)
1171
1172         # between two NAT44 inside interfaces (no translation)
1173         pkts = self.create_stream_in(self.pg0, self.pg1)
1174         self.pg0.add_stream(pkts)
1175         self.pg_enable_capture(self.pg_interfaces)
1176         self.pg_start()
1177         capture = self.pg1.get_capture(len(pkts))
1178         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1179
1180         # from NAT44 inside to interface without NAT44 feature (no translation)
1181         pkts = self.create_stream_in(self.pg0, self.pg2)
1182         self.pg0.add_stream(pkts)
1183         self.pg_enable_capture(self.pg_interfaces)
1184         self.pg_start()
1185         capture = self.pg2.get_capture(len(pkts))
1186         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1187
1188         # in2out 1st interface
1189         pkts = self.create_stream_in(self.pg0, self.pg3)
1190         self.pg0.add_stream(pkts)
1191         self.pg_enable_capture(self.pg_interfaces)
1192         self.pg_start()
1193         capture = self.pg3.get_capture(len(pkts))
1194         self.verify_capture_out(capture)
1195
1196         # out2in 1st interface
1197         pkts = self.create_stream_out(self.pg3)
1198         self.pg3.add_stream(pkts)
1199         self.pg_enable_capture(self.pg_interfaces)
1200         self.pg_start()
1201         capture = self.pg0.get_capture(len(pkts))
1202         self.verify_capture_in(capture, self.pg0)
1203
1204         # in2out 2nd interface
1205         pkts = self.create_stream_in(self.pg1, self.pg3)
1206         self.pg1.add_stream(pkts)
1207         self.pg_enable_capture(self.pg_interfaces)
1208         self.pg_start()
1209         capture = self.pg3.get_capture(len(pkts))
1210         self.verify_capture_out(capture)
1211
1212         # out2in 2nd interface
1213         pkts = self.create_stream_out(self.pg3)
1214         self.pg3.add_stream(pkts)
1215         self.pg_enable_capture(self.pg_interfaces)
1216         self.pg_start()
1217         capture = self.pg1.get_capture(len(pkts))
1218         self.verify_capture_in(capture, self.pg1)
1219
1220     def test_inside_overlapping_interfaces(self):
1221         """ NAT44 multiple inside interfaces with overlapping address space """
1222
1223         static_nat_ip = "10.0.0.10"
1224         self.nat44_add_address(self.nat_addr)
1225         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1226                                                   is_inside=0)
1227         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1228         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1229         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1230         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1231                                       vrf_id=20)
1232
1233         # between NAT44 inside interfaces with same VRF (no translation)
1234         pkts = self.create_stream_in(self.pg4, self.pg5)
1235         self.pg4.add_stream(pkts)
1236         self.pg_enable_capture(self.pg_interfaces)
1237         self.pg_start()
1238         capture = self.pg5.get_capture(len(pkts))
1239         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1240
1241         # between NAT44 inside interfaces with different VRF (hairpinning)
1242         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1243              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1244              TCP(sport=1234, dport=5678))
1245         self.pg4.add_stream(p)
1246         self.pg_enable_capture(self.pg_interfaces)
1247         self.pg_start()
1248         capture = self.pg6.get_capture(1)
1249         p = capture[0]
1250         try:
1251             ip = p[IP]
1252             tcp = p[TCP]
1253             self.assertEqual(ip.src, self.nat_addr)
1254             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1255             self.assertNotEqual(tcp.sport, 1234)
1256             self.assertEqual(tcp.dport, 5678)
1257         except:
1258             self.logger.error(ppp("Unexpected or invalid packet:", p))
1259             raise
1260
1261         # in2out 1st interface
1262         pkts = self.create_stream_in(self.pg4, self.pg3)
1263         self.pg4.add_stream(pkts)
1264         self.pg_enable_capture(self.pg_interfaces)
1265         self.pg_start()
1266         capture = self.pg3.get_capture(len(pkts))
1267         self.verify_capture_out(capture)
1268
1269         # out2in 1st interface
1270         pkts = self.create_stream_out(self.pg3)
1271         self.pg3.add_stream(pkts)
1272         self.pg_enable_capture(self.pg_interfaces)
1273         self.pg_start()
1274         capture = self.pg4.get_capture(len(pkts))
1275         self.verify_capture_in(capture, self.pg4)
1276
1277         # in2out 2nd interface
1278         pkts = self.create_stream_in(self.pg5, self.pg3)
1279         self.pg5.add_stream(pkts)
1280         self.pg_enable_capture(self.pg_interfaces)
1281         self.pg_start()
1282         capture = self.pg3.get_capture(len(pkts))
1283         self.verify_capture_out(capture)
1284
1285         # out2in 2nd interface
1286         pkts = self.create_stream_out(self.pg3)
1287         self.pg3.add_stream(pkts)
1288         self.pg_enable_capture(self.pg_interfaces)
1289         self.pg_start()
1290         capture = self.pg5.get_capture(len(pkts))
1291         self.verify_capture_in(capture, self.pg5)
1292
1293         # pg5 session dump
1294         addresses = self.vapi.nat44_address_dump()
1295         self.assertEqual(len(addresses), 1)
1296         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1297         self.assertEqual(len(sessions), 3)
1298         for session in sessions:
1299             self.assertFalse(session.is_static)
1300             self.assertEqual(session.inside_ip_address[0:4],
1301                              self.pg5.remote_ip4n)
1302             self.assertEqual(session.outside_ip_address,
1303                              addresses[0].ip_address)
1304         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1305         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1306         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1307         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1308         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1309         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1310         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1311         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1312         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1313
1314         # in2out 3rd interface
1315         pkts = self.create_stream_in(self.pg6, self.pg3)
1316         self.pg6.add_stream(pkts)
1317         self.pg_enable_capture(self.pg_interfaces)
1318         self.pg_start()
1319         capture = self.pg3.get_capture(len(pkts))
1320         self.verify_capture_out(capture, static_nat_ip, True)
1321
1322         # out2in 3rd interface
1323         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1324         self.pg3.add_stream(pkts)
1325         self.pg_enable_capture(self.pg_interfaces)
1326         self.pg_start()
1327         capture = self.pg6.get_capture(len(pkts))
1328         self.verify_capture_in(capture, self.pg6)
1329
1330         # general user and session dump verifications
1331         users = self.vapi.nat44_user_dump()
1332         self.assertTrue(len(users) >= 3)
1333         addresses = self.vapi.nat44_address_dump()
1334         self.assertEqual(len(addresses), 1)
1335         for user in users:
1336             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1337                                                          user.vrf_id)
1338             for session in sessions:
1339                 self.assertEqual(user.ip_address, session.inside_ip_address)
1340                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1341                 self.assertTrue(session.protocol in
1342                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1343                                  IP_PROTOS.icmp])
1344
1345         # pg4 session dump
1346         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1347         self.assertTrue(len(sessions) >= 4)
1348         for session in sessions:
1349             self.assertFalse(session.is_static)
1350             self.assertEqual(session.inside_ip_address[0:4],
1351                              self.pg4.remote_ip4n)
1352             self.assertEqual(session.outside_ip_address,
1353                              addresses[0].ip_address)
1354
1355         # pg6 session dump
1356         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1357         self.assertTrue(len(sessions) >= 3)
1358         for session in sessions:
1359             self.assertTrue(session.is_static)
1360             self.assertEqual(session.inside_ip_address[0:4],
1361                              self.pg6.remote_ip4n)
1362             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1363                              map(int, static_nat_ip.split('.')))
1364             self.assertTrue(session.inside_port in
1365                             [self.tcp_port_in, self.udp_port_in,
1366                              self.icmp_id_in])
1367
1368     def test_hairpinning(self):
1369         """ NAT44 hairpinning - 1:1 NAPT """
1370
1371         host = self.pg0.remote_hosts[0]
1372         server = self.pg0.remote_hosts[1]
1373         host_in_port = 1234
1374         host_out_port = 0
1375         server_in_port = 5678
1376         server_out_port = 8765
1377
1378         self.nat44_add_address(self.nat_addr)
1379         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1380         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381                                                   is_inside=0)
1382         # add static mapping for server
1383         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1384                                       server_in_port, server_out_port,
1385                                       proto=IP_PROTOS.tcp)
1386
1387         # send packet from host to server
1388         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1389              IP(src=host.ip4, dst=self.nat_addr) /
1390              TCP(sport=host_in_port, dport=server_out_port))
1391         self.pg0.add_stream(p)
1392         self.pg_enable_capture(self.pg_interfaces)
1393         self.pg_start()
1394         capture = self.pg0.get_capture(1)
1395         p = capture[0]
1396         try:
1397             ip = p[IP]
1398             tcp = p[TCP]
1399             self.assertEqual(ip.src, self.nat_addr)
1400             self.assertEqual(ip.dst, server.ip4)
1401             self.assertNotEqual(tcp.sport, host_in_port)
1402             self.assertEqual(tcp.dport, server_in_port)
1403             self.check_tcp_checksum(p)
1404             host_out_port = tcp.sport
1405         except:
1406             self.logger.error(ppp("Unexpected or invalid packet:", p))
1407             raise
1408
1409         # send reply from server to host
1410         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1411              IP(src=server.ip4, dst=self.nat_addr) /
1412              TCP(sport=server_in_port, dport=host_out_port))
1413         self.pg0.add_stream(p)
1414         self.pg_enable_capture(self.pg_interfaces)
1415         self.pg_start()
1416         capture = self.pg0.get_capture(1)
1417         p = capture[0]
1418         try:
1419             ip = p[IP]
1420             tcp = p[TCP]
1421             self.assertEqual(ip.src, self.nat_addr)
1422             self.assertEqual(ip.dst, host.ip4)
1423             self.assertEqual(tcp.sport, server_out_port)
1424             self.assertEqual(tcp.dport, host_in_port)
1425             self.check_tcp_checksum(p)
1426         except:
1427             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1428             raise
1429
1430     def test_hairpinning2(self):
1431         """ NAT44 hairpinning - 1:1 NAT"""
1432
1433         server1_nat_ip = "10.0.0.10"
1434         server2_nat_ip = "10.0.0.11"
1435         host = self.pg0.remote_hosts[0]
1436         server1 = self.pg0.remote_hosts[1]
1437         server2 = self.pg0.remote_hosts[2]
1438         server_tcp_port = 22
1439         server_udp_port = 20
1440
1441         self.nat44_add_address(self.nat_addr)
1442         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1444                                                   is_inside=0)
1445
1446         # add static mapping for servers
1447         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1448         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1449
1450         # host to server1
1451         pkts = []
1452         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1453              IP(src=host.ip4, dst=server1_nat_ip) /
1454              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1455         pkts.append(p)
1456         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1457              IP(src=host.ip4, dst=server1_nat_ip) /
1458              UDP(sport=self.udp_port_in, dport=server_udp_port))
1459         pkts.append(p)
1460         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1461              IP(src=host.ip4, dst=server1_nat_ip) /
1462              ICMP(id=self.icmp_id_in, type='echo-request'))
1463         pkts.append(p)
1464         self.pg0.add_stream(pkts)
1465         self.pg_enable_capture(self.pg_interfaces)
1466         self.pg_start()
1467         capture = self.pg0.get_capture(len(pkts))
1468         for packet in capture:
1469             try:
1470                 self.assertEqual(packet[IP].src, self.nat_addr)
1471                 self.assertEqual(packet[IP].dst, server1.ip4)
1472                 if packet.haslayer(TCP):
1473                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1474                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1475                     self.tcp_port_out = packet[TCP].sport
1476                     self.check_tcp_checksum(packet)
1477                 elif packet.haslayer(UDP):
1478                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1479                     self.assertEqual(packet[UDP].dport, server_udp_port)
1480                     self.udp_port_out = packet[UDP].sport
1481                 else:
1482                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1483                     self.icmp_id_out = packet[ICMP].id
1484             except:
1485                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1486                 raise
1487
1488         # server1 to host
1489         pkts = []
1490         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1491              IP(src=server1.ip4, dst=self.nat_addr) /
1492              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1493         pkts.append(p)
1494         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1495              IP(src=server1.ip4, dst=self.nat_addr) /
1496              UDP(sport=server_udp_port, dport=self.udp_port_out))
1497         pkts.append(p)
1498         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1499              IP(src=server1.ip4, dst=self.nat_addr) /
1500              ICMP(id=self.icmp_id_out, type='echo-reply'))
1501         pkts.append(p)
1502         self.pg0.add_stream(pkts)
1503         self.pg_enable_capture(self.pg_interfaces)
1504         self.pg_start()
1505         capture = self.pg0.get_capture(len(pkts))
1506         for packet in capture:
1507             try:
1508                 self.assertEqual(packet[IP].src, server1_nat_ip)
1509                 self.assertEqual(packet[IP].dst, host.ip4)
1510                 if packet.haslayer(TCP):
1511                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1512                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1513                     self.check_tcp_checksum(packet)
1514                 elif packet.haslayer(UDP):
1515                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1516                     self.assertEqual(packet[UDP].sport, server_udp_port)
1517                 else:
1518                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1519             except:
1520                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1521                 raise
1522
1523         # server2 to server1
1524         pkts = []
1525         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1526              IP(src=server2.ip4, dst=server1_nat_ip) /
1527              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1528         pkts.append(p)
1529         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1530              IP(src=server2.ip4, dst=server1_nat_ip) /
1531              UDP(sport=self.udp_port_in, dport=server_udp_port))
1532         pkts.append(p)
1533         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1534              IP(src=server2.ip4, dst=server1_nat_ip) /
1535              ICMP(id=self.icmp_id_in, type='echo-request'))
1536         pkts.append(p)
1537         self.pg0.add_stream(pkts)
1538         self.pg_enable_capture(self.pg_interfaces)
1539         self.pg_start()
1540         capture = self.pg0.get_capture(len(pkts))
1541         for packet in capture:
1542             try:
1543                 self.assertEqual(packet[IP].src, server2_nat_ip)
1544                 self.assertEqual(packet[IP].dst, server1.ip4)
1545                 if packet.haslayer(TCP):
1546                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1547                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1548                     self.tcp_port_out = packet[TCP].sport
1549                     self.check_tcp_checksum(packet)
1550                 elif packet.haslayer(UDP):
1551                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1552                     self.assertEqual(packet[UDP].dport, server_udp_port)
1553                     self.udp_port_out = packet[UDP].sport
1554                 else:
1555                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1556                     self.icmp_id_out = packet[ICMP].id
1557             except:
1558                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1559                 raise
1560
1561         # server1 to server2
1562         pkts = []
1563         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1564              IP(src=server1.ip4, dst=server2_nat_ip) /
1565              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1566         pkts.append(p)
1567         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1568              IP(src=server1.ip4, dst=server2_nat_ip) /
1569              UDP(sport=server_udp_port, dport=self.udp_port_out))
1570         pkts.append(p)
1571         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1572              IP(src=server1.ip4, dst=server2_nat_ip) /
1573              ICMP(id=self.icmp_id_out, type='echo-reply'))
1574         pkts.append(p)
1575         self.pg0.add_stream(pkts)
1576         self.pg_enable_capture(self.pg_interfaces)
1577         self.pg_start()
1578         capture = self.pg0.get_capture(len(pkts))
1579         for packet in capture:
1580             try:
1581                 self.assertEqual(packet[IP].src, server1_nat_ip)
1582                 self.assertEqual(packet[IP].dst, server2.ip4)
1583                 if packet.haslayer(TCP):
1584                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1585                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1586                     self.check_tcp_checksum(packet)
1587                 elif packet.haslayer(UDP):
1588                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1589                     self.assertEqual(packet[UDP].sport, server_udp_port)
1590                 else:
1591                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1592             except:
1593                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1594                 raise
1595
1596     def test_max_translations_per_user(self):
1597         """ MAX translations per user - recycle the least recently used """
1598
1599         self.nat44_add_address(self.nat_addr)
1600         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1601         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1602                                                   is_inside=0)
1603
1604         # get maximum number of translations per user
1605         nat44_config = self.vapi.nat_show_config()
1606
1607         # send more than maximum number of translations per user packets
1608         pkts_num = nat44_config.max_translations_per_user + 5
1609         pkts = []
1610         for port in range(0, pkts_num):
1611             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1612                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1613                  TCP(sport=1025 + port))
1614             pkts.append(p)
1615         self.pg0.add_stream(pkts)
1616         self.pg_enable_capture(self.pg_interfaces)
1617         self.pg_start()
1618
1619         # verify number of translated packet
1620         self.pg1.get_capture(pkts_num)
1621
1622     def test_interface_addr(self):
1623         """ Acquire NAT44 addresses from interface """
1624         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1625
1626         # no address in NAT pool
1627         adresses = self.vapi.nat44_address_dump()
1628         self.assertEqual(0, len(adresses))
1629
1630         # configure interface address and check NAT address pool
1631         self.pg7.config_ip4()
1632         adresses = self.vapi.nat44_address_dump()
1633         self.assertEqual(1, len(adresses))
1634         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1635
1636         # remove interface address and check NAT address pool
1637         self.pg7.unconfig_ip4()
1638         adresses = self.vapi.nat44_address_dump()
1639         self.assertEqual(0, len(adresses))
1640
1641     def test_interface_addr_static_mapping(self):
1642         """ Static mapping with addresses from interface """
1643         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1644         self.nat44_add_static_mapping(
1645             '1.2.3.4',
1646             external_sw_if_index=self.pg7.sw_if_index)
1647
1648         # static mappings with external interface
1649         static_mappings = self.vapi.nat44_static_mapping_dump()
1650         self.assertEqual(1, len(static_mappings))
1651         self.assertEqual(self.pg7.sw_if_index,
1652                          static_mappings[0].external_sw_if_index)
1653
1654         # configure interface address and check static mappings
1655         self.pg7.config_ip4()
1656         static_mappings = self.vapi.nat44_static_mapping_dump()
1657         self.assertEqual(1, len(static_mappings))
1658         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1659                          self.pg7.local_ip4n)
1660         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1661
1662         # remove interface address and check static mappings
1663         self.pg7.unconfig_ip4()
1664         static_mappings = self.vapi.nat44_static_mapping_dump()
1665         self.assertEqual(0, len(static_mappings))
1666
1667     def test_ipfix_nat44_sess(self):
1668         """ IPFIX logging NAT44 session created/delted """
1669         self.ipfix_domain_id = 10
1670         self.ipfix_src_port = 20202
1671         colector_port = 30303
1672         bind_layers(UDP, IPFIX, dport=30303)
1673         self.nat44_add_address(self.nat_addr)
1674         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1675         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1676                                                   is_inside=0)
1677         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1678                                      src_address=self.pg3.local_ip4n,
1679                                      path_mtu=512,
1680                                      template_interval=10,
1681                                      collector_port=colector_port)
1682         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1683                             src_port=self.ipfix_src_port)
1684
1685         pkts = self.create_stream_in(self.pg0, self.pg1)
1686         self.pg0.add_stream(pkts)
1687         self.pg_enable_capture(self.pg_interfaces)
1688         self.pg_start()
1689         capture = self.pg1.get_capture(len(pkts))
1690         self.verify_capture_out(capture)
1691         self.nat44_add_address(self.nat_addr, is_add=0)
1692         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1693         capture = self.pg3.get_capture(3)
1694         ipfix = IPFIXDecoder()
1695         # first load template
1696         for p in capture:
1697             self.assertTrue(p.haslayer(IPFIX))
1698             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1699             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1700             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1701             self.assertEqual(p[UDP].dport, colector_port)
1702             self.assertEqual(p[IPFIX].observationDomainID,
1703                              self.ipfix_domain_id)
1704             if p.haslayer(Template):
1705                 ipfix.add_template(p.getlayer(Template))
1706         # verify events in data set
1707         for p in capture:
1708             if p.haslayer(Data):
1709                 data = ipfix.decode_data_set(p.getlayer(Set))
1710                 self.verify_ipfix_nat44_ses(data)
1711
1712     def test_ipfix_addr_exhausted(self):
1713         """ IPFIX logging NAT addresses exhausted """
1714         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1716                                                   is_inside=0)
1717         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1718                                      src_address=self.pg3.local_ip4n,
1719                                      path_mtu=512,
1720                                      template_interval=10)
1721         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1722                             src_port=self.ipfix_src_port)
1723
1724         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1725              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1726              TCP(sport=3025))
1727         self.pg0.add_stream(p)
1728         self.pg_enable_capture(self.pg_interfaces)
1729         self.pg_start()
1730         capture = self.pg1.get_capture(0)
1731         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1732         capture = self.pg3.get_capture(3)
1733         ipfix = IPFIXDecoder()
1734         # first load template
1735         for p in capture:
1736             self.assertTrue(p.haslayer(IPFIX))
1737             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1738             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1739             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1740             self.assertEqual(p[UDP].dport, 4739)
1741             self.assertEqual(p[IPFIX].observationDomainID,
1742                              self.ipfix_domain_id)
1743             if p.haslayer(Template):
1744                 ipfix.add_template(p.getlayer(Template))
1745         # verify events in data set
1746         for p in capture:
1747             if p.haslayer(Data):
1748                 data = ipfix.decode_data_set(p.getlayer(Set))
1749                 self.verify_ipfix_addr_exhausted(data)
1750
1751     def test_pool_addr_fib(self):
1752         """ NAT44 add pool addresses to FIB """
1753         static_addr = '10.0.0.10'
1754         self.nat44_add_address(self.nat_addr)
1755         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1756         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1757                                                   is_inside=0)
1758         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1759
1760         # NAT44 address
1761         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1762              ARP(op=ARP.who_has, pdst=self.nat_addr,
1763                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1764         self.pg1.add_stream(p)
1765         self.pg_enable_capture(self.pg_interfaces)
1766         self.pg_start()
1767         capture = self.pg1.get_capture(1)
1768         self.assertTrue(capture[0].haslayer(ARP))
1769         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1770
1771         # 1:1 NAT address
1772         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1773              ARP(op=ARP.who_has, pdst=static_addr,
1774                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1775         self.pg1.add_stream(p)
1776         self.pg_enable_capture(self.pg_interfaces)
1777         self.pg_start()
1778         capture = self.pg1.get_capture(1)
1779         self.assertTrue(capture[0].haslayer(ARP))
1780         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1781
1782         # send ARP to non-NAT44 interface
1783         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1784              ARP(op=ARP.who_has, pdst=self.nat_addr,
1785                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1786         self.pg2.add_stream(p)
1787         self.pg_enable_capture(self.pg_interfaces)
1788         self.pg_start()
1789         capture = self.pg1.get_capture(0)
1790
1791         # remove addresses and verify
1792         self.nat44_add_address(self.nat_addr, is_add=0)
1793         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1794                                       is_add=0)
1795
1796         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1797              ARP(op=ARP.who_has, pdst=self.nat_addr,
1798                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1799         self.pg1.add_stream(p)
1800         self.pg_enable_capture(self.pg_interfaces)
1801         self.pg_start()
1802         capture = self.pg1.get_capture(0)
1803
1804         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1805              ARP(op=ARP.who_has, pdst=static_addr,
1806                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1807         self.pg1.add_stream(p)
1808         self.pg_enable_capture(self.pg_interfaces)
1809         self.pg_start()
1810         capture = self.pg1.get_capture(0)
1811
1812     def test_vrf_mode(self):
1813         """ NAT44 tenant VRF aware address pool mode """
1814
1815         vrf_id1 = 1
1816         vrf_id2 = 2
1817         nat_ip1 = "10.0.0.10"
1818         nat_ip2 = "10.0.0.11"
1819
1820         self.pg0.unconfig_ip4()
1821         self.pg1.unconfig_ip4()
1822         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1823         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1824         self.pg0.set_table_ip4(vrf_id1)
1825         self.pg1.set_table_ip4(vrf_id2)
1826         self.pg0.config_ip4()
1827         self.pg1.config_ip4()
1828
1829         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1830         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1831         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1832         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1833         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1834                                                   is_inside=0)
1835
1836         # first VRF
1837         pkts = self.create_stream_in(self.pg0, self.pg2)
1838         self.pg0.add_stream(pkts)
1839         self.pg_enable_capture(self.pg_interfaces)
1840         self.pg_start()
1841         capture = self.pg2.get_capture(len(pkts))
1842         self.verify_capture_out(capture, nat_ip1)
1843
1844         # second VRF
1845         pkts = self.create_stream_in(self.pg1, self.pg2)
1846         self.pg1.add_stream(pkts)
1847         self.pg_enable_capture(self.pg_interfaces)
1848         self.pg_start()
1849         capture = self.pg2.get_capture(len(pkts))
1850         self.verify_capture_out(capture, nat_ip2)
1851
1852         self.pg0.unconfig_ip4()
1853         self.pg1.unconfig_ip4()
1854         self.pg0.set_table_ip4(0)
1855         self.pg1.set_table_ip4(0)
1856         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1857         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1858
1859     def test_vrf_feature_independent(self):
1860         """ NAT44 tenant VRF independent address pool mode """
1861
1862         nat_ip1 = "10.0.0.10"
1863         nat_ip2 = "10.0.0.11"
1864
1865         self.nat44_add_address(nat_ip1)
1866         self.nat44_add_address(nat_ip2)
1867         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1868         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1869         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1870                                                   is_inside=0)
1871
1872         # first VRF
1873         pkts = self.create_stream_in(self.pg0, self.pg2)
1874         self.pg0.add_stream(pkts)
1875         self.pg_enable_capture(self.pg_interfaces)
1876         self.pg_start()
1877         capture = self.pg2.get_capture(len(pkts))
1878         self.verify_capture_out(capture, nat_ip1)
1879
1880         # second VRF
1881         pkts = self.create_stream_in(self.pg1, self.pg2)
1882         self.pg1.add_stream(pkts)
1883         self.pg_enable_capture(self.pg_interfaces)
1884         self.pg_start()
1885         capture = self.pg2.get_capture(len(pkts))
1886         self.verify_capture_out(capture, nat_ip1)
1887
1888     def test_dynamic_ipless_interfaces(self):
1889         """ NAT44 interfaces without configured IP address """
1890
1891         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1892                                       mactobinary(self.pg7.remote_mac),
1893                                       self.pg7.remote_ip4n,
1894                                       is_static=1)
1895         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1896                                       mactobinary(self.pg8.remote_mac),
1897                                       self.pg8.remote_ip4n,
1898                                       is_static=1)
1899
1900         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1901                                    dst_address_length=32,
1902                                    next_hop_address=self.pg7.remote_ip4n,
1903                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1904         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1905                                    dst_address_length=32,
1906                                    next_hop_address=self.pg8.remote_ip4n,
1907                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1908
1909         self.nat44_add_address(self.nat_addr)
1910         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1911         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1912                                                   is_inside=0)
1913
1914         # in2out
1915         pkts = self.create_stream_in(self.pg7, self.pg8)
1916         self.pg7.add_stream(pkts)
1917         self.pg_enable_capture(self.pg_interfaces)
1918         self.pg_start()
1919         capture = self.pg8.get_capture(len(pkts))
1920         self.verify_capture_out(capture)
1921
1922         # out2in
1923         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1924         self.pg8.add_stream(pkts)
1925         self.pg_enable_capture(self.pg_interfaces)
1926         self.pg_start()
1927         capture = self.pg7.get_capture(len(pkts))
1928         self.verify_capture_in(capture, self.pg7)
1929
1930     def test_static_ipless_interfaces(self):
1931         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1932
1933         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1934                                       mactobinary(self.pg7.remote_mac),
1935                                       self.pg7.remote_ip4n,
1936                                       is_static=1)
1937         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1938                                       mactobinary(self.pg8.remote_mac),
1939                                       self.pg8.remote_ip4n,
1940                                       is_static=1)
1941
1942         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1943                                    dst_address_length=32,
1944                                    next_hop_address=self.pg7.remote_ip4n,
1945                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1946         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1947                                    dst_address_length=32,
1948                                    next_hop_address=self.pg8.remote_ip4n,
1949                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1950
1951         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1952         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1953         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1954                                                   is_inside=0)
1955
1956         # out2in
1957         pkts = self.create_stream_out(self.pg8)
1958         self.pg8.add_stream(pkts)
1959         self.pg_enable_capture(self.pg_interfaces)
1960         self.pg_start()
1961         capture = self.pg7.get_capture(len(pkts))
1962         self.verify_capture_in(capture, self.pg7)
1963
1964         # in2out
1965         pkts = self.create_stream_in(self.pg7, self.pg8)
1966         self.pg7.add_stream(pkts)
1967         self.pg_enable_capture(self.pg_interfaces)
1968         self.pg_start()
1969         capture = self.pg8.get_capture(len(pkts))
1970         self.verify_capture_out(capture, self.nat_addr, True)
1971
1972     def test_static_with_port_ipless_interfaces(self):
1973         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1974
1975         self.tcp_port_out = 30606
1976         self.udp_port_out = 30607
1977         self.icmp_id_out = 30608
1978
1979         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1980                                       mactobinary(self.pg7.remote_mac),
1981                                       self.pg7.remote_ip4n,
1982                                       is_static=1)
1983         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1984                                       mactobinary(self.pg8.remote_mac),
1985                                       self.pg8.remote_ip4n,
1986                                       is_static=1)
1987
1988         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1989                                    dst_address_length=32,
1990                                    next_hop_address=self.pg7.remote_ip4n,
1991                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1992         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1993                                    dst_address_length=32,
1994                                    next_hop_address=self.pg8.remote_ip4n,
1995                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1996
1997         self.nat44_add_address(self.nat_addr)
1998         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1999                                       self.tcp_port_in, self.tcp_port_out,
2000                                       proto=IP_PROTOS.tcp)
2001         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2002                                       self.udp_port_in, self.udp_port_out,
2003                                       proto=IP_PROTOS.udp)
2004         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2005                                       self.icmp_id_in, self.icmp_id_out,
2006                                       proto=IP_PROTOS.icmp)
2007         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2008         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2009                                                   is_inside=0)
2010
2011         # out2in
2012         pkts = self.create_stream_out(self.pg8)
2013         self.pg8.add_stream(pkts)
2014         self.pg_enable_capture(self.pg_interfaces)
2015         self.pg_start()
2016         capture = self.pg7.get_capture(len(pkts))
2017         self.verify_capture_in(capture, self.pg7)
2018
2019         # in2out
2020         pkts = self.create_stream_in(self.pg7, self.pg8)
2021         self.pg7.add_stream(pkts)
2022         self.pg_enable_capture(self.pg_interfaces)
2023         self.pg_start()
2024         capture = self.pg8.get_capture(len(pkts))
2025         self.verify_capture_out(capture)
2026
2027     def test_static_unknown_proto(self):
2028         """ 1:1 NAT translate packet with unknown protocol """
2029         nat_ip = "10.0.0.10"
2030         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2031         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2032         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2033                                                   is_inside=0)
2034
2035         # in2out
2036         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2037              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2038              GRE() /
2039              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2040              TCP(sport=1234, dport=1234))
2041         self.pg0.add_stream(p)
2042         self.pg_enable_capture(self.pg_interfaces)
2043         self.pg_start()
2044         p = self.pg1.get_capture(1)
2045         packet = p[0]
2046         try:
2047             self.assertEqual(packet[IP].src, nat_ip)
2048             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2049             self.assertTrue(packet.haslayer(GRE))
2050             self.check_ip_checksum(packet)
2051         except:
2052             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2053             raise
2054
2055         # out2in
2056         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2057              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2058              GRE() /
2059              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2060              TCP(sport=1234, dport=1234))
2061         self.pg1.add_stream(p)
2062         self.pg_enable_capture(self.pg_interfaces)
2063         self.pg_start()
2064         p = self.pg0.get_capture(1)
2065         packet = p[0]
2066         try:
2067             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2068             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2069             self.assertTrue(packet.haslayer(GRE))
2070             self.check_ip_checksum(packet)
2071         except:
2072             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2073             raise
2074
2075     def test_hairpinning_static_unknown_proto(self):
2076         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2077
2078         host = self.pg0.remote_hosts[0]
2079         server = self.pg0.remote_hosts[1]
2080
2081         host_nat_ip = "10.0.0.10"
2082         server_nat_ip = "10.0.0.11"
2083
2084         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2085         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2086         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2087         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2088                                                   is_inside=0)
2089
2090         # host to server
2091         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2092              IP(src=host.ip4, dst=server_nat_ip) /
2093              GRE() /
2094              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2095              TCP(sport=1234, dport=1234))
2096         self.pg0.add_stream(p)
2097         self.pg_enable_capture(self.pg_interfaces)
2098         self.pg_start()
2099         p = self.pg0.get_capture(1)
2100         packet = p[0]
2101         try:
2102             self.assertEqual(packet[IP].src, host_nat_ip)
2103             self.assertEqual(packet[IP].dst, server.ip4)
2104             self.assertTrue(packet.haslayer(GRE))
2105             self.check_ip_checksum(packet)
2106         except:
2107             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2108             raise
2109
2110         # server to host
2111         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2112              IP(src=server.ip4, dst=host_nat_ip) /
2113              GRE() /
2114              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2115              TCP(sport=1234, dport=1234))
2116         self.pg0.add_stream(p)
2117         self.pg_enable_capture(self.pg_interfaces)
2118         self.pg_start()
2119         p = self.pg0.get_capture(1)
2120         packet = p[0]
2121         try:
2122             self.assertEqual(packet[IP].src, server_nat_ip)
2123             self.assertEqual(packet[IP].dst, host.ip4)
2124             self.assertTrue(packet.haslayer(GRE))
2125             self.check_ip_checksum(packet)
2126         except:
2127             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2128             raise
2129
2130     def test_unknown_proto(self):
2131         """ NAT44 translate packet with unknown protocol """
2132         self.nat44_add_address(self.nat_addr)
2133         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2134         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2135                                                   is_inside=0)
2136
2137         # in2out
2138         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2139              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2140              TCP(sport=self.tcp_port_in, dport=20))
2141         self.pg0.add_stream(p)
2142         self.pg_enable_capture(self.pg_interfaces)
2143         self.pg_start()
2144         p = self.pg1.get_capture(1)
2145
2146         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2147              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2148              GRE() /
2149              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2150              TCP(sport=1234, dport=1234))
2151         self.pg0.add_stream(p)
2152         self.pg_enable_capture(self.pg_interfaces)
2153         self.pg_start()
2154         p = self.pg1.get_capture(1)
2155         packet = p[0]
2156         try:
2157             self.assertEqual(packet[IP].src, self.nat_addr)
2158             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2159             self.assertTrue(packet.haslayer(GRE))
2160             self.check_ip_checksum(packet)
2161         except:
2162             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2163             raise
2164
2165         # out2in
2166         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2167              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2168              GRE() /
2169              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2170              TCP(sport=1234, dport=1234))
2171         self.pg1.add_stream(p)
2172         self.pg_enable_capture(self.pg_interfaces)
2173         self.pg_start()
2174         p = self.pg0.get_capture(1)
2175         packet = p[0]
2176         try:
2177             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2178             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2179             self.assertTrue(packet.haslayer(GRE))
2180             self.check_ip_checksum(packet)
2181         except:
2182             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2183             raise
2184
2185     def test_hairpinning_unknown_proto(self):
2186         """ NAT44 translate packet with unknown protocol - hairpinning """
2187         host = self.pg0.remote_hosts[0]
2188         server = self.pg0.remote_hosts[1]
2189         host_in_port = 1234
2190         host_out_port = 0
2191         server_in_port = 5678
2192         server_out_port = 8765
2193         server_nat_ip = "10.0.0.11"
2194
2195         self.nat44_add_address(self.nat_addr)
2196         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2197         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2198                                                   is_inside=0)
2199
2200         # add static mapping for server
2201         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2202
2203         # host to server
2204         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2205              IP(src=host.ip4, dst=server_nat_ip) /
2206              TCP(sport=host_in_port, dport=server_out_port))
2207         self.pg0.add_stream(p)
2208         self.pg_enable_capture(self.pg_interfaces)
2209         self.pg_start()
2210         capture = self.pg0.get_capture(1)
2211
2212         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2213              IP(src=host.ip4, dst=server_nat_ip) /
2214              GRE() /
2215              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2216              TCP(sport=1234, dport=1234))
2217         self.pg0.add_stream(p)
2218         self.pg_enable_capture(self.pg_interfaces)
2219         self.pg_start()
2220         p = self.pg0.get_capture(1)
2221         packet = p[0]
2222         try:
2223             self.assertEqual(packet[IP].src, self.nat_addr)
2224             self.assertEqual(packet[IP].dst, server.ip4)
2225             self.assertTrue(packet.haslayer(GRE))
2226             self.check_ip_checksum(packet)
2227         except:
2228             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2229             raise
2230
2231         # server to host
2232         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2233              IP(src=server.ip4, dst=self.nat_addr) /
2234              GRE() /
2235              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2236              TCP(sport=1234, dport=1234))
2237         self.pg0.add_stream(p)
2238         self.pg_enable_capture(self.pg_interfaces)
2239         self.pg_start()
2240         p = self.pg0.get_capture(1)
2241         packet = p[0]
2242         try:
2243             self.assertEqual(packet[IP].src, server_nat_ip)
2244             self.assertEqual(packet[IP].dst, host.ip4)
2245             self.assertTrue(packet.haslayer(GRE))
2246             self.check_ip_checksum(packet)
2247         except:
2248             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2249             raise
2250
2251     def test_output_feature(self):
2252         """ NAT44 interface output feature (in2out postrouting) """
2253         self.nat44_add_address(self.nat_addr)
2254         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2255         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2256         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2257                                                          is_inside=0)
2258
2259         # in2out
2260         pkts = self.create_stream_in(self.pg0, self.pg3)
2261         self.pg0.add_stream(pkts)
2262         self.pg_enable_capture(self.pg_interfaces)
2263         self.pg_start()
2264         capture = self.pg3.get_capture(len(pkts))
2265         self.verify_capture_out(capture)
2266
2267         # out2in
2268         pkts = self.create_stream_out(self.pg3)
2269         self.pg3.add_stream(pkts)
2270         self.pg_enable_capture(self.pg_interfaces)
2271         self.pg_start()
2272         capture = self.pg0.get_capture(len(pkts))
2273         self.verify_capture_in(capture, self.pg0)
2274
2275         # from non-NAT interface to NAT inside interface
2276         pkts = self.create_stream_in(self.pg2, self.pg0)
2277         self.pg2.add_stream(pkts)
2278         self.pg_enable_capture(self.pg_interfaces)
2279         self.pg_start()
2280         capture = self.pg0.get_capture(len(pkts))
2281         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2282
2283     def test_output_feature_vrf_aware(self):
2284         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2285         nat_ip_vrf10 = "10.0.0.10"
2286         nat_ip_vrf20 = "10.0.0.20"
2287
2288         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2289                                    dst_address_length=32,
2290                                    next_hop_address=self.pg3.remote_ip4n,
2291                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2292                                    table_id=10)
2293         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2294                                    dst_address_length=32,
2295                                    next_hop_address=self.pg3.remote_ip4n,
2296                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2297                                    table_id=20)
2298
2299         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2300         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2301         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2302         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2303         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2304                                                          is_inside=0)
2305
2306         # in2out VRF 10
2307         pkts = self.create_stream_in(self.pg4, self.pg3)
2308         self.pg4.add_stream(pkts)
2309         self.pg_enable_capture(self.pg_interfaces)
2310         self.pg_start()
2311         capture = self.pg3.get_capture(len(pkts))
2312         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2313
2314         # out2in VRF 10
2315         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2316         self.pg3.add_stream(pkts)
2317         self.pg_enable_capture(self.pg_interfaces)
2318         self.pg_start()
2319         capture = self.pg4.get_capture(len(pkts))
2320         self.verify_capture_in(capture, self.pg4)
2321
2322         # in2out VRF 20
2323         pkts = self.create_stream_in(self.pg6, self.pg3)
2324         self.pg6.add_stream(pkts)
2325         self.pg_enable_capture(self.pg_interfaces)
2326         self.pg_start()
2327         capture = self.pg3.get_capture(len(pkts))
2328         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2329
2330         # out2in VRF 20
2331         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2332         self.pg3.add_stream(pkts)
2333         self.pg_enable_capture(self.pg_interfaces)
2334         self.pg_start()
2335         capture = self.pg6.get_capture(len(pkts))
2336         self.verify_capture_in(capture, self.pg6)
2337
2338     def test_output_feature_hairpinning(self):
2339         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2340         host = self.pg0.remote_hosts[0]
2341         server = self.pg0.remote_hosts[1]
2342         host_in_port = 1234
2343         host_out_port = 0
2344         server_in_port = 5678
2345         server_out_port = 8765
2346
2347         self.nat44_add_address(self.nat_addr)
2348         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2349         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2350                                                          is_inside=0)
2351
2352         # add static mapping for server
2353         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2354                                       server_in_port, server_out_port,
2355                                       proto=IP_PROTOS.tcp)
2356
2357         # send packet from host to server
2358         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2359              IP(src=host.ip4, dst=self.nat_addr) /
2360              TCP(sport=host_in_port, dport=server_out_port))
2361         self.pg0.add_stream(p)
2362         self.pg_enable_capture(self.pg_interfaces)
2363         self.pg_start()
2364         capture = self.pg0.get_capture(1)
2365         p = capture[0]
2366         try:
2367             ip = p[IP]
2368             tcp = p[TCP]
2369             self.assertEqual(ip.src, self.nat_addr)
2370             self.assertEqual(ip.dst, server.ip4)
2371             self.assertNotEqual(tcp.sport, host_in_port)
2372             self.assertEqual(tcp.dport, server_in_port)
2373             self.check_tcp_checksum(p)
2374             host_out_port = tcp.sport
2375         except:
2376             self.logger.error(ppp("Unexpected or invalid packet:", p))
2377             raise
2378
2379         # send reply from server to host
2380         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2381              IP(src=server.ip4, dst=self.nat_addr) /
2382              TCP(sport=server_in_port, dport=host_out_port))
2383         self.pg0.add_stream(p)
2384         self.pg_enable_capture(self.pg_interfaces)
2385         self.pg_start()
2386         capture = self.pg0.get_capture(1)
2387         p = capture[0]
2388         try:
2389             ip = p[IP]
2390             tcp = p[TCP]
2391             self.assertEqual(ip.src, self.nat_addr)
2392             self.assertEqual(ip.dst, host.ip4)
2393             self.assertEqual(tcp.sport, server_out_port)
2394             self.assertEqual(tcp.dport, host_in_port)
2395             self.check_tcp_checksum(p)
2396         except:
2397             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2398             raise
2399
2400     def test_one_armed_nat44(self):
2401         """ One armed NAT44 """
2402         remote_host = self.pg9.remote_hosts[0]
2403         local_host = self.pg9.remote_hosts[1]
2404         external_port = 0
2405
2406         self.nat44_add_address(self.nat_addr)
2407         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2408         self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2409                                                   is_inside=0)
2410
2411         # in2out
2412         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2413              IP(src=local_host.ip4, dst=remote_host.ip4) /
2414              TCP(sport=12345, dport=80))
2415         self.pg9.add_stream(p)
2416         self.pg_enable_capture(self.pg_interfaces)
2417         self.pg_start()
2418         capture = self.pg9.get_capture(1)
2419         p = capture[0]
2420         try:
2421             ip = p[IP]
2422             tcp = p[TCP]
2423             self.assertEqual(ip.src, self.nat_addr)
2424             self.assertEqual(ip.dst, remote_host.ip4)
2425             self.assertNotEqual(tcp.sport, 12345)
2426             external_port = tcp.sport
2427             self.assertEqual(tcp.dport, 80)
2428             self.check_tcp_checksum(p)
2429             self.check_ip_checksum(p)
2430         except:
2431             self.logger.error(ppp("Unexpected or invalid packet:", p))
2432             raise
2433
2434         # out2in
2435         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2436              IP(src=remote_host.ip4, dst=self.nat_addr) /
2437              TCP(sport=80, dport=external_port))
2438         self.pg9.add_stream(p)
2439         self.pg_enable_capture(self.pg_interfaces)
2440         self.pg_start()
2441         capture = self.pg9.get_capture(1)
2442         p = capture[0]
2443         try:
2444             ip = p[IP]
2445             tcp = p[TCP]
2446             self.assertEqual(ip.src, remote_host.ip4)
2447             self.assertEqual(ip.dst, local_host.ip4)
2448             self.assertEqual(tcp.sport, 80)
2449             self.assertEqual(tcp.dport, 12345)
2450             self.check_tcp_checksum(p)
2451             self.check_ip_checksum(p)
2452         except:
2453             self.logger.error(ppp("Unexpected or invalid packet:", p))
2454             raise
2455
2456     def test_del_session(self):
2457         """ Delete NAT44 session """
2458         self.nat44_add_address(self.nat_addr)
2459         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2460         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2461                                                   is_inside=0)
2462
2463         pkts = self.create_stream_in(self.pg0, self.pg1)
2464         self.pg0.add_stream(pkts)
2465         self.pg_enable_capture(self.pg_interfaces)
2466         self.pg_start()
2467         capture = self.pg1.get_capture(len(pkts))
2468
2469         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2470         nsessions = len(sessions)
2471
2472         self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2473                                     sessions[0].inside_port,
2474                                     sessions[0].protocol)
2475         self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2476                                     sessions[1].outside_port,
2477                                     sessions[1].protocol,
2478                                     is_in=0)
2479
2480         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2481         self.assertEqual(nsessions - len(sessions), 2)
2482
2483     def tearDown(self):
2484         super(TestNAT44, self).tearDown()
2485         if not self.vpp_dead:
2486             self.logger.info(self.vapi.cli("show nat44 verbose"))
2487             self.clear_nat44()
2488
2489
2490 class TestDeterministicNAT(MethodHolder):
2491     """ Deterministic NAT Test Cases """
2492
2493     @classmethod
2494     def setUpConstants(cls):
2495         super(TestDeterministicNAT, cls).setUpConstants()
2496         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2497
2498     @classmethod
2499     def setUpClass(cls):
2500         super(TestDeterministicNAT, cls).setUpClass()
2501
2502         try:
2503             cls.tcp_port_in = 6303
2504             cls.tcp_external_port = 6303
2505             cls.udp_port_in = 6304
2506             cls.udp_external_port = 6304
2507             cls.icmp_id_in = 6305
2508             cls.nat_addr = '10.0.0.3'
2509
2510             cls.create_pg_interfaces(range(3))
2511             cls.interfaces = list(cls.pg_interfaces)
2512
2513             for i in cls.interfaces:
2514                 i.admin_up()
2515                 i.config_ip4()
2516                 i.resolve_arp()
2517
2518             cls.pg0.generate_remote_hosts(2)
2519             cls.pg0.configure_ipv4_neighbors()
2520
2521         except Exception:
2522             super(TestDeterministicNAT, cls).tearDownClass()
2523             raise
2524
2525     def create_stream_in(self, in_if, out_if, ttl=64):
2526         """
2527         Create packet stream for inside network
2528
2529         :param in_if: Inside interface
2530         :param out_if: Outside interface
2531         :param ttl: TTL of generated packets
2532         """
2533         pkts = []
2534         # TCP
2535         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2536              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2537              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2538         pkts.append(p)
2539
2540         # UDP
2541         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2542              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2543              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2544         pkts.append(p)
2545
2546         # ICMP
2547         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2548              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2549              ICMP(id=self.icmp_id_in, type='echo-request'))
2550         pkts.append(p)
2551
2552         return pkts
2553
2554     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2555         """
2556         Create packet stream for outside network
2557
2558         :param out_if: Outside interface
2559         :param dst_ip: Destination IP address (Default use global NAT address)
2560         :param ttl: TTL of generated packets
2561         """
2562         if dst_ip is None:
2563             dst_ip = self.nat_addr
2564         pkts = []
2565         # TCP
2566         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2567              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2568              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2569         pkts.append(p)
2570
2571         # UDP
2572         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2573              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2574              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2575         pkts.append(p)
2576
2577         # ICMP
2578         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2579              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2580              ICMP(id=self.icmp_external_id, type='echo-reply'))
2581         pkts.append(p)
2582
2583         return pkts
2584
2585     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2586         """
2587         Verify captured packets on outside network
2588
2589         :param capture: Captured packets
2590         :param nat_ip: Translated IP address (Default use global NAT address)
2591         :param same_port: Sorce port number is not translated (Default False)
2592         :param packet_num: Expected number of packets (Default 3)
2593         """
2594         if nat_ip is None:
2595             nat_ip = self.nat_addr
2596         self.assertEqual(packet_num, len(capture))
2597         for packet in capture:
2598             try:
2599                 self.assertEqual(packet[IP].src, nat_ip)
2600                 if packet.haslayer(TCP):
2601                     self.tcp_port_out = packet[TCP].sport
2602                 elif packet.haslayer(UDP):
2603                     self.udp_port_out = packet[UDP].sport
2604                 else:
2605                     self.icmp_external_id = packet[ICMP].id
2606             except:
2607                 self.logger.error(ppp("Unexpected or invalid packet "
2608                                       "(outside network):", packet))
2609                 raise
2610
2611     def initiate_tcp_session(self, in_if, out_if):
2612         """
2613         Initiates TCP session
2614
2615         :param in_if: Inside interface
2616         :param out_if: Outside interface
2617         """
2618         try:
2619             # SYN packet in->out
2620             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2621                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2622                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2623                      flags="S"))
2624             in_if.add_stream(p)
2625             self.pg_enable_capture(self.pg_interfaces)
2626             self.pg_start()
2627             capture = out_if.get_capture(1)
2628             p = capture[0]
2629             self.tcp_port_out = p[TCP].sport
2630
2631             # SYN + ACK packet out->in
2632             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2633                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2634                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2635                      flags="SA"))
2636             out_if.add_stream(p)
2637             self.pg_enable_capture(self.pg_interfaces)
2638             self.pg_start()
2639             in_if.get_capture(1)
2640
2641             # ACK packet in->out
2642             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2643                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2644                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2645                      flags="A"))
2646             in_if.add_stream(p)
2647             self.pg_enable_capture(self.pg_interfaces)
2648             self.pg_start()
2649             out_if.get_capture(1)
2650
2651         except:
2652             self.logger.error("TCP 3 way handshake failed")
2653             raise
2654
2655     def verify_ipfix_max_entries_per_user(self, data):
2656         """
2657         Verify IPFIX maximum entries per user exceeded event
2658
2659         :param data: Decoded IPFIX data records
2660         """
2661         self.assertEqual(1, len(data))
2662         record = data[0]
2663         # natEvent
2664         self.assertEqual(ord(record[230]), 13)
2665         # natQuotaExceededEvent
2666         self.assertEqual('\x03\x00\x00\x00', record[466])
2667         # sourceIPv4Address
2668         self.assertEqual(self.pg0.remote_ip4n, record[8])
2669
2670     def test_deterministic_mode(self):
2671         """ NAT plugin run deterministic mode """
2672         in_addr = '172.16.255.0'
2673         out_addr = '172.17.255.50'
2674         in_addr_t = '172.16.255.20'
2675         in_addr_n = socket.inet_aton(in_addr)
2676         out_addr_n = socket.inet_aton(out_addr)
2677         in_addr_t_n = socket.inet_aton(in_addr_t)
2678         in_plen = 24
2679         out_plen = 32
2680
2681         nat_config = self.vapi.nat_show_config()
2682         self.assertEqual(1, nat_config.deterministic)
2683
2684         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2685
2686         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2687         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2688         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2689         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2690
2691         deterministic_mappings = self.vapi.nat_det_map_dump()
2692         self.assertEqual(len(deterministic_mappings), 1)
2693         dsm = deterministic_mappings[0]
2694         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2695         self.assertEqual(in_plen, dsm.in_plen)
2696         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2697         self.assertEqual(out_plen, dsm.out_plen)
2698
2699         self.clear_nat_det()
2700         deterministic_mappings = self.vapi.nat_det_map_dump()
2701         self.assertEqual(len(deterministic_mappings), 0)
2702
2703     def test_set_timeouts(self):
2704         """ Set deterministic NAT timeouts """
2705         timeouts_before = self.vapi.nat_det_get_timeouts()
2706
2707         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2708                                        timeouts_before.tcp_established + 10,
2709                                        timeouts_before.tcp_transitory + 10,
2710                                        timeouts_before.icmp + 10)
2711
2712         timeouts_after = self.vapi.nat_det_get_timeouts()
2713
2714         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2715         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2716         self.assertNotEqual(timeouts_before.tcp_established,
2717                             timeouts_after.tcp_established)
2718         self.assertNotEqual(timeouts_before.tcp_transitory,
2719                             timeouts_after.tcp_transitory)
2720
2721     def test_det_in(self):
2722         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2723
2724         nat_ip = "10.0.0.10"
2725
2726         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2727                                       32,
2728                                       socket.inet_aton(nat_ip),
2729                                       32)
2730         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2731         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2732                                                   is_inside=0)
2733
2734         # in2out
2735         pkts = self.create_stream_in(self.pg0, self.pg1)
2736         self.pg0.add_stream(pkts)
2737         self.pg_enable_capture(self.pg_interfaces)
2738         self.pg_start()
2739         capture = self.pg1.get_capture(len(pkts))
2740         self.verify_capture_out(capture, nat_ip)
2741
2742         # out2in
2743         pkts = self.create_stream_out(self.pg1, nat_ip)
2744         self.pg1.add_stream(pkts)
2745         self.pg_enable_capture(self.pg_interfaces)
2746         self.pg_start()
2747         capture = self.pg0.get_capture(len(pkts))
2748         self.verify_capture_in(capture, self.pg0)
2749
2750         # session dump test
2751         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2752         self.assertEqual(len(sessions), 3)
2753
2754         # TCP session
2755         s = sessions[0]
2756         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2757         self.assertEqual(s.in_port, self.tcp_port_in)
2758         self.assertEqual(s.out_port, self.tcp_port_out)
2759         self.assertEqual(s.ext_port, self.tcp_external_port)
2760
2761         # UDP session
2762         s = sessions[1]
2763         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2764         self.assertEqual(s.in_port, self.udp_port_in)
2765         self.assertEqual(s.out_port, self.udp_port_out)
2766         self.assertEqual(s.ext_port, self.udp_external_port)
2767
2768         # ICMP session
2769         s = sessions[2]
2770         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2771         self.assertEqual(s.in_port, self.icmp_id_in)
2772         self.assertEqual(s.out_port, self.icmp_external_id)
2773
2774     def test_multiple_users(self):
2775         """ Deterministic NAT multiple users """
2776
2777         nat_ip = "10.0.0.10"
2778         port_in = 80
2779         external_port = 6303
2780
2781         host0 = self.pg0.remote_hosts[0]
2782         host1 = self.pg0.remote_hosts[1]
2783
2784         self.vapi.nat_det_add_del_map(host0.ip4n,
2785                                       24,
2786                                       socket.inet_aton(nat_ip),
2787                                       32)
2788         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2789         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2790                                                   is_inside=0)
2791
2792         # host0 to out
2793         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2794              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2795              TCP(sport=port_in, dport=external_port))
2796         self.pg0.add_stream(p)
2797         self.pg_enable_capture(self.pg_interfaces)
2798         self.pg_start()
2799         capture = self.pg1.get_capture(1)
2800         p = capture[0]
2801         try:
2802             ip = p[IP]
2803             tcp = p[TCP]
2804             self.assertEqual(ip.src, nat_ip)
2805             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2806             self.assertEqual(tcp.dport, external_port)
2807             port_out0 = tcp.sport
2808         except:
2809             self.logger.error(ppp("Unexpected or invalid packet:", p))
2810             raise
2811
2812         # host1 to out
2813         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2814              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2815              TCP(sport=port_in, dport=external_port))
2816         self.pg0.add_stream(p)
2817         self.pg_enable_capture(self.pg_interfaces)
2818         self.pg_start()
2819         capture = self.pg1.get_capture(1)
2820         p = capture[0]
2821         try:
2822             ip = p[IP]
2823             tcp = p[TCP]
2824             self.assertEqual(ip.src, nat_ip)
2825             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2826             self.assertEqual(tcp.dport, external_port)
2827             port_out1 = tcp.sport
2828         except:
2829             self.logger.error(ppp("Unexpected or invalid packet:", p))
2830             raise
2831
2832         dms = self.vapi.nat_det_map_dump()
2833         self.assertEqual(1, len(dms))
2834         self.assertEqual(2, dms[0].ses_num)
2835
2836         # out to host0
2837         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2838              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2839              TCP(sport=external_port, dport=port_out0))
2840         self.pg1.add_stream(p)
2841         self.pg_enable_capture(self.pg_interfaces)
2842         self.pg_start()
2843         capture = self.pg0.get_capture(1)
2844         p = capture[0]
2845         try:
2846             ip = p[IP]
2847             tcp = p[TCP]
2848             self.assertEqual(ip.src, self.pg1.remote_ip4)
2849             self.assertEqual(ip.dst, host0.ip4)
2850             self.assertEqual(tcp.dport, port_in)
2851             self.assertEqual(tcp.sport, external_port)
2852         except:
2853             self.logger.error(ppp("Unexpected or invalid packet:", p))
2854             raise
2855
2856         # out to host1
2857         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2858              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2859              TCP(sport=external_port, dport=port_out1))
2860         self.pg1.add_stream(p)
2861         self.pg_enable_capture(self.pg_interfaces)
2862         self.pg_start()
2863         capture = self.pg0.get_capture(1)
2864         p = capture[0]
2865         try:
2866             ip = p[IP]
2867             tcp = p[TCP]
2868             self.assertEqual(ip.src, self.pg1.remote_ip4)
2869             self.assertEqual(ip.dst, host1.ip4)
2870             self.assertEqual(tcp.dport, port_in)
2871             self.assertEqual(tcp.sport, external_port)
2872         except:
2873             self.logger.error(ppp("Unexpected or invalid packet", p))
2874             raise
2875
2876         # session close api test
2877         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2878                                             port_out1,
2879                                             self.pg1.remote_ip4n,
2880                                             external_port)
2881         dms = self.vapi.nat_det_map_dump()
2882         self.assertEqual(dms[0].ses_num, 1)
2883
2884         self.vapi.nat_det_close_session_in(host0.ip4n,
2885                                            port_in,
2886                                            self.pg1.remote_ip4n,
2887                                            external_port)
2888         dms = self.vapi.nat_det_map_dump()
2889         self.assertEqual(dms[0].ses_num, 0)
2890
2891     def test_tcp_session_close_detection_in(self):
2892         """ Deterministic NAT TCP session close from inside network """
2893         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2894                                       32,
2895                                       socket.inet_aton(self.nat_addr),
2896                                       32)
2897         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2898         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2899                                                   is_inside=0)
2900
2901         self.initiate_tcp_session(self.pg0, self.pg1)
2902
2903         # close the session from inside
2904         try:
2905             # FIN packet in -> out
2906             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2907                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2908                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2909                      flags="F"))
2910             self.pg0.add_stream(p)
2911             self.pg_enable_capture(self.pg_interfaces)
2912             self.pg_start()
2913             self.pg1.get_capture(1)
2914
2915             pkts = []
2916
2917             # ACK packet out -> in
2918             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2919                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2920                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2921                      flags="A"))
2922             pkts.append(p)
2923
2924             # FIN packet out -> in
2925             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2926                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2927                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2928                      flags="F"))
2929             pkts.append(p)
2930
2931             self.pg1.add_stream(pkts)
2932             self.pg_enable_capture(self.pg_interfaces)
2933             self.pg_start()
2934             self.pg0.get_capture(2)
2935
2936             # ACK packet in -> out
2937             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2938                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2939                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2940                      flags="A"))
2941             self.pg0.add_stream(p)
2942             self.pg_enable_capture(self.pg_interfaces)
2943             self.pg_start()
2944             self.pg1.get_capture(1)
2945
2946             # Check if deterministic NAT44 closed the session
2947             dms = self.vapi.nat_det_map_dump()
2948             self.assertEqual(0, dms[0].ses_num)
2949         except:
2950             self.logger.error("TCP session termination failed")
2951             raise
2952
2953     def test_tcp_session_close_detection_out(self):
2954         """ Deterministic NAT TCP session close from outside network """
2955         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2956                                       32,
2957                                       socket.inet_aton(self.nat_addr),
2958                                       32)
2959         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2961                                                   is_inside=0)
2962
2963         self.initiate_tcp_session(self.pg0, self.pg1)
2964
2965         # close the session from outside
2966         try:
2967             # FIN packet out -> in
2968             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2969                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2970                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2971                      flags="F"))
2972             self.pg1.add_stream(p)
2973             self.pg_enable_capture(self.pg_interfaces)
2974             self.pg_start()
2975             self.pg0.get_capture(1)
2976
2977             pkts = []
2978
2979             # ACK packet in -> out
2980             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2981                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2982                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2983                      flags="A"))
2984             pkts.append(p)
2985
2986             # ACK packet in -> out
2987             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2988                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2989                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2990                      flags="F"))
2991             pkts.append(p)
2992
2993             self.pg0.add_stream(pkts)
2994             self.pg_enable_capture(self.pg_interfaces)
2995             self.pg_start()
2996             self.pg1.get_capture(2)
2997
2998             # ACK packet out -> in
2999             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3000                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3001                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3002                      flags="A"))
3003             self.pg1.add_stream(p)
3004             self.pg_enable_capture(self.pg_interfaces)
3005             self.pg_start()
3006             self.pg0.get_capture(1)
3007
3008             # Check if deterministic NAT44 closed the session
3009             dms = self.vapi.nat_det_map_dump()
3010             self.assertEqual(0, dms[0].ses_num)
3011         except:
3012             self.logger.error("TCP session termination failed")
3013             raise
3014
3015     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3016     def test_session_timeout(self):
3017         """ Deterministic NAT session timeouts """
3018         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3019                                       32,
3020                                       socket.inet_aton(self.nat_addr),
3021                                       32)
3022         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3023         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3024                                                   is_inside=0)
3025
3026         self.initiate_tcp_session(self.pg0, self.pg1)
3027         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
3028         pkts = self.create_stream_in(self.pg0, self.pg1)
3029         self.pg0.add_stream(pkts)
3030         self.pg_enable_capture(self.pg_interfaces)
3031         self.pg_start()
3032         capture = self.pg1.get_capture(len(pkts))
3033         sleep(15)
3034
3035         dms = self.vapi.nat_det_map_dump()
3036         self.assertEqual(0, dms[0].ses_num)
3037
3038     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3039     def test_session_limit_per_user(self):
3040         """ Deterministic NAT maximum sessions per user limit """
3041         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3042                                       32,
3043                                       socket.inet_aton(self.nat_addr),
3044                                       32)
3045         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3046         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3047                                                   is_inside=0)
3048         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
3049                                      src_address=self.pg2.local_ip4n,
3050                                      path_mtu=512,
3051                                      template_interval=10)
3052         self.vapi.nat_ipfix()
3053
3054         pkts = []
3055         for port in range(1025, 2025):
3056             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3057                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3058                  UDP(sport=port, dport=port))
3059             pkts.append(p)
3060
3061         self.pg0.add_stream(pkts)
3062         self.pg_enable_capture(self.pg_interfaces)
3063         self.pg_start()
3064         capture = self.pg1.get_capture(len(pkts))
3065
3066         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3067              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3068              UDP(sport=3001, dport=3002))
3069         self.pg0.add_stream(p)
3070         self.pg_enable_capture(self.pg_interfaces)
3071         self.pg_start()
3072         capture = self.pg1.assert_nothing_captured()
3073
3074         # verify ICMP error packet
3075         capture = self.pg0.get_capture(1)
3076         p = capture[0]
3077         self.assertTrue(p.haslayer(ICMP))
3078         icmp = p[ICMP]
3079         self.assertEqual(icmp.type, 3)
3080         self.assertEqual(icmp.code, 1)
3081         self.assertTrue(icmp.haslayer(IPerror))
3082         inner_ip = icmp[IPerror]
3083         self.assertEqual(inner_ip[UDPerror].sport, 3001)
3084         self.assertEqual(inner_ip[UDPerror].dport, 3002)
3085
3086         dms = self.vapi.nat_det_map_dump()
3087
3088         self.assertEqual(1000, dms[0].ses_num)
3089
3090         # verify IPFIX logging
3091         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
3092         sleep(1)
3093         capture = self.pg2.get_capture(2)
3094         ipfix = IPFIXDecoder()
3095         # first load template
3096         for p in capture:
3097             self.assertTrue(p.haslayer(IPFIX))
3098             if p.haslayer(Template):
3099                 ipfix.add_template(p.getlayer(Template))
3100         # verify events in data set
3101         for p in capture:
3102             if p.haslayer(Data):
3103                 data = ipfix.decode_data_set(p.getlayer(Set))
3104                 self.verify_ipfix_max_entries_per_user(data)
3105
3106     def clear_nat_det(self):
3107         """
3108         Clear deterministic NAT configuration.
3109         """
3110         self.vapi.nat_ipfix(enable=0)
3111         self.vapi.nat_det_set_timeouts()
3112         deterministic_mappings = self.vapi.nat_det_map_dump()
3113         for dsm in deterministic_mappings:
3114             self.vapi.nat_det_add_del_map(dsm.in_addr,
3115                                           dsm.in_plen,
3116                                           dsm.out_addr,
3117                                           dsm.out_plen,
3118                                           is_add=0)
3119
3120         interfaces = self.vapi.nat44_interface_dump()
3121         for intf in interfaces:
3122             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3123                                                       intf.is_inside,
3124                                                       is_add=0)
3125
3126     def tearDown(self):
3127         super(TestDeterministicNAT, self).tearDown()
3128         if not self.vpp_dead:
3129             self.logger.info(self.vapi.cli("show nat44 detail"))
3130             self.clear_nat_det()
3131
3132
3133 class TestNAT64(MethodHolder):
3134     """ NAT64 Test Cases """
3135
3136     @classmethod
3137     def setUpClass(cls):
3138         super(TestNAT64, cls).setUpClass()
3139
3140         try:
3141             cls.tcp_port_in = 6303
3142             cls.tcp_port_out = 6303
3143             cls.udp_port_in = 6304
3144             cls.udp_port_out = 6304
3145             cls.icmp_id_in = 6305
3146             cls.icmp_id_out = 6305
3147             cls.nat_addr = '10.0.0.3'
3148             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3149             cls.vrf1_id = 10
3150             cls.vrf1_nat_addr = '10.0.10.3'
3151             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3152                                                    cls.vrf1_nat_addr)
3153
3154             cls.create_pg_interfaces(range(4))
3155             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3156             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3157             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3158
3159             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3160
3161             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3162
3163             cls.pg0.generate_remote_hosts(2)
3164
3165             for i in cls.ip6_interfaces:
3166                 i.admin_up()
3167                 i.config_ip6()
3168                 i.configure_ipv6_neighbors()
3169
3170             for i in cls.ip4_interfaces:
3171                 i.admin_up()
3172                 i.config_ip4()
3173                 i.resolve_arp()
3174
3175             cls.pg3.admin_up()
3176             cls.pg3.config_ip4()
3177             cls.pg3.resolve_arp()
3178             cls.pg3.config_ip6()
3179             cls.pg3.configure_ipv6_neighbors()
3180
3181         except Exception:
3182             super(TestNAT64, cls).tearDownClass()
3183             raise
3184
3185     def test_pool(self):
3186         """ Add/delete address to NAT64 pool """
3187         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3188
3189         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3190
3191         addresses = self.vapi.nat64_pool_addr_dump()
3192         self.assertEqual(len(addresses), 1)
3193         self.assertEqual(addresses[0].address, nat_addr)
3194
3195         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3196
3197         addresses = self.vapi.nat64_pool_addr_dump()
3198         self.assertEqual(len(addresses), 0)
3199
3200     def test_interface(self):
3201         """ Enable/disable NAT64 feature on the interface """
3202         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3203         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3204
3205         interfaces = self.vapi.nat64_interface_dump()
3206         self.assertEqual(len(interfaces), 2)
3207         pg0_found = False
3208         pg1_found = False
3209         for intf in interfaces:
3210             if intf.sw_if_index == self.pg0.sw_if_index:
3211                 self.assertEqual(intf.is_inside, 1)
3212                 pg0_found = True
3213             elif intf.sw_if_index == self.pg1.sw_if_index:
3214                 self.assertEqual(intf.is_inside, 0)
3215                 pg1_found = True
3216         self.assertTrue(pg0_found)
3217         self.assertTrue(pg1_found)
3218
3219         features = self.vapi.cli("show interface features pg0")
3220         self.assertNotEqual(features.find('nat64-in2out'), -1)
3221         features = self.vapi.cli("show interface features pg1")
3222         self.assertNotEqual(features.find('nat64-out2in'), -1)
3223
3224         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3225         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3226
3227         interfaces = self.vapi.nat64_interface_dump()
3228         self.assertEqual(len(interfaces), 0)
3229
3230     def test_static_bib(self):
3231         """ Add/delete static BIB entry """
3232         in_addr = socket.inet_pton(socket.AF_INET6,
3233                                    '2001:db8:85a3::8a2e:370:7334')
3234         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3235         in_port = 1234
3236         out_port = 5678
3237         proto = IP_PROTOS.tcp
3238
3239         self.vapi.nat64_add_del_static_bib(in_addr,
3240                                            out_addr,
3241                                            in_port,
3242                                            out_port,
3243                                            proto)
3244         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3245         static_bib_num = 0
3246         for bibe in bib:
3247             if bibe.is_static:
3248                 static_bib_num += 1
3249                 self.assertEqual(bibe.i_addr, in_addr)
3250                 self.assertEqual(bibe.o_addr, out_addr)
3251                 self.assertEqual(bibe.i_port, in_port)
3252                 self.assertEqual(bibe.o_port, out_port)
3253         self.assertEqual(static_bib_num, 1)
3254
3255         self.vapi.nat64_add_del_static_bib(in_addr,
3256                                            out_addr,
3257                                            in_port,
3258                                            out_port,
3259                                            proto,
3260                                            is_add=0)
3261         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3262         static_bib_num = 0
3263         for bibe in bib:
3264             if bibe.is_static:
3265                 static_bib_num += 1
3266         self.assertEqual(static_bib_num, 0)
3267
3268     def test_set_timeouts(self):
3269         """ Set NAT64 timeouts """
3270         # verify default values
3271         timeouts = self.vapi.nat64_get_timeouts()
3272         self.assertEqual(timeouts.udp, 300)
3273         self.assertEqual(timeouts.icmp, 60)
3274         self.assertEqual(timeouts.tcp_trans, 240)
3275         self.assertEqual(timeouts.tcp_est, 7440)
3276         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3277
3278         # set and verify custom values
3279         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3280                                      tcp_est=7450, tcp_incoming_syn=10)
3281         timeouts = self.vapi.nat64_get_timeouts()
3282         self.assertEqual(timeouts.udp, 200)
3283         self.assertEqual(timeouts.icmp, 30)
3284         self.assertEqual(timeouts.tcp_trans, 250)
3285         self.assertEqual(timeouts.tcp_est, 7450)
3286         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3287
3288     def test_dynamic(self):
3289         """ NAT64 dynamic translation test """
3290         self.tcp_port_in = 6303
3291         self.udp_port_in = 6304
3292         self.icmp_id_in = 6305
3293
3294         ses_num_start = self.nat64_get_ses_num()
3295
3296         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3297                                                 self.nat_addr_n)
3298         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3299         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3300
3301         # in2out
3302         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3303         self.pg0.add_stream(pkts)
3304         self.pg_enable_capture(self.pg_interfaces)
3305         self.pg_start()
3306         capture = self.pg1.get_capture(len(pkts))
3307         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3308                                 dst_ip=self.pg1.remote_ip4)
3309
3310         # out2in
3311         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3312         self.pg1.add_stream(pkts)
3313         self.pg_enable_capture(self.pg_interfaces)
3314         self.pg_start()
3315         capture = self.pg0.get_capture(len(pkts))
3316         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3317         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3318
3319         # in2out
3320         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3321         self.pg0.add_stream(pkts)
3322         self.pg_enable_capture(self.pg_interfaces)
3323         self.pg_start()
3324         capture = self.pg1.get_capture(len(pkts))
3325         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3326                                 dst_ip=self.pg1.remote_ip4)
3327
3328         # out2in
3329         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3330         self.pg1.add_stream(pkts)
3331         self.pg_enable_capture(self.pg_interfaces)
3332         self.pg_start()
3333         capture = self.pg0.get_capture(len(pkts))
3334         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3335
3336         ses_num_end = self.nat64_get_ses_num()
3337
3338         self.assertEqual(ses_num_end - ses_num_start, 3)
3339
3340         # tenant with specific VRF
3341         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3342                                                 self.vrf1_nat_addr_n,
3343                                                 vrf_id=self.vrf1_id)
3344         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3345
3346         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3347         self.pg2.add_stream(pkts)
3348         self.pg_enable_capture(self.pg_interfaces)
3349         self.pg_start()
3350         capture = self.pg1.get_capture(len(pkts))
3351         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3352                                 dst_ip=self.pg1.remote_ip4)
3353
3354         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3355         self.pg1.add_stream(pkts)
3356         self.pg_enable_capture(self.pg_interfaces)
3357         self.pg_start()
3358         capture = self.pg2.get_capture(len(pkts))
3359         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3360
3361     def test_static(self):
3362         """ NAT64 static translation test """
3363         self.tcp_port_in = 60303
3364         self.udp_port_in = 60304
3365         self.icmp_id_in = 60305
3366         self.tcp_port_out = 60303
3367         self.udp_port_out = 60304
3368         self.icmp_id_out = 60305
3369
3370         ses_num_start = self.nat64_get_ses_num()
3371
3372         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3373                                                 self.nat_addr_n)
3374         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3375         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3376
3377         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3378                                            self.nat_addr_n,
3379                                            self.tcp_port_in,
3380                                            self.tcp_port_out,
3381                                            IP_PROTOS.tcp)
3382         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3383                                            self.nat_addr_n,
3384                                            self.udp_port_in,
3385                                            self.udp_port_out,
3386                                            IP_PROTOS.udp)
3387         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3388                                            self.nat_addr_n,
3389                                            self.icmp_id_in,
3390                                            self.icmp_id_out,
3391                                            IP_PROTOS.icmp)
3392
3393         # in2out
3394         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3395         self.pg0.add_stream(pkts)
3396         self.pg_enable_capture(self.pg_interfaces)
3397         self.pg_start()
3398         capture = self.pg1.get_capture(len(pkts))
3399         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3400                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3401
3402         # out2in
3403         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3404         self.pg1.add_stream(pkts)
3405         self.pg_enable_capture(self.pg_interfaces)
3406         self.pg_start()
3407         capture = self.pg0.get_capture(len(pkts))
3408         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3409         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3410
3411         ses_num_end = self.nat64_get_ses_num()
3412
3413         self.assertEqual(ses_num_end - ses_num_start, 3)
3414
3415     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3416     def test_session_timeout(self):
3417         """ NAT64 session timeout """
3418         self.icmp_id_in = 1234
3419         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3420                                                 self.nat_addr_n)
3421         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3422         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3423         self.vapi.nat64_set_timeouts(icmp=5)
3424
3425         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3426         self.pg0.add_stream(pkts)
3427         self.pg_enable_capture(self.pg_interfaces)
3428         self.pg_start()
3429         capture = self.pg1.get_capture(len(pkts))
3430
3431         ses_num_before_timeout = self.nat64_get_ses_num()
3432
3433         sleep(15)
3434
3435         # ICMP session after timeout
3436         ses_num_after_timeout = self.nat64_get_ses_num()
3437         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3438
3439     def test_icmp_error(self):
3440         """ NAT64 ICMP Error message translation """
3441         self.tcp_port_in = 6303
3442         self.udp_port_in = 6304
3443         self.icmp_id_in = 6305
3444
3445         ses_num_start = self.nat64_get_ses_num()
3446
3447         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3448                                                 self.nat_addr_n)
3449         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3450         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3451
3452         # send some packets to create sessions
3453         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3454         self.pg0.add_stream(pkts)
3455         self.pg_enable_capture(self.pg_interfaces)
3456         self.pg_start()
3457         capture_ip4 = self.pg1.get_capture(len(pkts))
3458         self.verify_capture_out(capture_ip4,
3459                                 nat_ip=self.nat_addr,
3460                                 dst_ip=self.pg1.remote_ip4)
3461
3462         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3463         self.pg1.add_stream(pkts)
3464         self.pg_enable_capture(self.pg_interfaces)
3465         self.pg_start()
3466         capture_ip6 = self.pg0.get_capture(len(pkts))
3467         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3468         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3469                                    self.pg0.remote_ip6)
3470
3471         # in2out
3472         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3473                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3474                 ICMPv6DestUnreach(code=1) /
3475                 packet[IPv6] for packet in capture_ip6]
3476         self.pg0.add_stream(pkts)
3477         self.pg_enable_capture(self.pg_interfaces)
3478         self.pg_start()
3479         capture = self.pg1.get_capture(len(pkts))
3480         for packet in capture:
3481             try:
3482                 self.assertEqual(packet[IP].src, self.nat_addr)
3483                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3484                 self.assertEqual(packet[ICMP].type, 3)
3485                 self.assertEqual(packet[ICMP].code, 13)
3486                 inner = packet[IPerror]
3487                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3488                 self.assertEqual(inner.dst, self.nat_addr)
3489                 self.check_icmp_checksum(packet)
3490                 if inner.haslayer(TCPerror):
3491                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3492                 elif inner.haslayer(UDPerror):
3493                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3494                 else:
3495                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3496             except:
3497                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3498                 raise
3499
3500         # out2in
3501         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3502                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3503                 ICMP(type=3, code=13) /
3504                 packet[IP] for packet in capture_ip4]
3505         self.pg1.add_stream(pkts)
3506         self.pg_enable_capture(self.pg_interfaces)
3507         self.pg_start()
3508         capture = self.pg0.get_capture(len(pkts))
3509         for packet in capture:
3510             try:
3511                 self.assertEqual(packet[IPv6].src, ip.src)
3512                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3513                 icmp = packet[ICMPv6DestUnreach]
3514                 self.assertEqual(icmp.code, 1)
3515                 inner = icmp[IPerror6]
3516                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3517                 self.assertEqual(inner.dst, ip.src)
3518                 self.check_icmpv6_checksum(packet)
3519                 if inner.haslayer(TCPerror):
3520                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3521                 elif inner.haslayer(UDPerror):
3522                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3523                 else:
3524                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3525                                      self.icmp_id_in)
3526             except:
3527                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3528                 raise
3529
3530     def test_hairpinning(self):
3531         """ NAT64 hairpinning """
3532
3533         client = self.pg0.remote_hosts[0]
3534         server = self.pg0.remote_hosts[1]
3535         server_tcp_in_port = 22
3536         server_tcp_out_port = 4022
3537         server_udp_in_port = 23
3538         server_udp_out_port = 4023
3539         client_tcp_in_port = 1234
3540         client_udp_in_port = 1235
3541         client_tcp_out_port = 0
3542         client_udp_out_port = 0
3543         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3544         nat_addr_ip6 = ip.src
3545
3546         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3547                                                 self.nat_addr_n)
3548         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3549         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3550
3551         self.vapi.nat64_add_del_static_bib(server.ip6n,
3552                                            self.nat_addr_n,
3553                                            server_tcp_in_port,
3554                                            server_tcp_out_port,
3555                                            IP_PROTOS.tcp)
3556         self.vapi.nat64_add_del_static_bib(server.ip6n,
3557                                            self.nat_addr_n,
3558                                            server_udp_in_port,
3559                                            server_udp_out_port,
3560                                            IP_PROTOS.udp)
3561
3562         # client to server
3563         pkts = []
3564         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3565              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3566              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3567         pkts.append(p)
3568         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3569              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3570              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3571         pkts.append(p)
3572         self.pg0.add_stream(pkts)
3573         self.pg_enable_capture(self.pg_interfaces)
3574         self.pg_start()
3575         capture = self.pg0.get_capture(len(pkts))
3576         for packet in capture:
3577             try:
3578                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3579                 self.assertEqual(packet[IPv6].dst, server.ip6)
3580                 if packet.haslayer(TCP):
3581                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3582                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3583                     self.check_tcp_checksum(packet)
3584                     client_tcp_out_port = packet[TCP].sport
3585                 else:
3586                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3587                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3588                     self.check_udp_checksum(packet)
3589                     client_udp_out_port = packet[UDP].sport
3590             except:
3591                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3592                 raise
3593
3594         # server to client
3595         pkts = []
3596         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3597              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3598              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3599         pkts.append(p)
3600         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3601              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3602              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3603         pkts.append(p)
3604         self.pg0.add_stream(pkts)
3605         self.pg_enable_capture(self.pg_interfaces)
3606         self.pg_start()
3607         capture = self.pg0.get_capture(len(pkts))
3608         for packet in capture:
3609             try:
3610                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3611                 self.assertEqual(packet[IPv6].dst, client.ip6)
3612                 if packet.haslayer(TCP):
3613                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3614                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3615                     self.check_tcp_checksum(packet)
3616                 else:
3617                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3618                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3619                     self.check_udp_checksum(packet)
3620             except:
3621                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3622                 raise
3623
3624         # ICMP error
3625         pkts = []
3626         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3627                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3628                 ICMPv6DestUnreach(code=1) /
3629                 packet[IPv6] for packet in capture]
3630         self.pg0.add_stream(pkts)
3631         self.pg_enable_capture(self.pg_interfaces)
3632         self.pg_start()
3633         capture = self.pg0.get_capture(len(pkts))
3634         for packet in capture:
3635             try:
3636                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3637                 self.assertEqual(packet[IPv6].dst, server.ip6)
3638                 icmp = packet[ICMPv6DestUnreach]
3639                 self.assertEqual(icmp.code, 1)
3640                 inner = icmp[IPerror6]
3641                 self.assertEqual(inner.src, server.ip6)
3642                 self.assertEqual(inner.dst, nat_addr_ip6)
3643                 self.check_icmpv6_checksum(packet)
3644                 if inner.haslayer(TCPerror):
3645                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3646                     self.assertEqual(inner[TCPerror].dport,
3647                                      client_tcp_out_port)
3648                 else:
3649                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3650                     self.assertEqual(inner[UDPerror].dport,
3651                                      client_udp_out_port)
3652             except:
3653                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3654                 raise
3655
3656     def test_prefix(self):
3657         """ NAT64 Network-Specific Prefix """
3658
3659         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3660                                                 self.nat_addr_n)
3661         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3662         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3663         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3664                                                 self.vrf1_nat_addr_n,
3665                                                 vrf_id=self.vrf1_id)
3666         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3667
3668         # Add global prefix
3669         global_pref64 = "2001:db8::"
3670         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3671         global_pref64_len = 32
3672         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3673
3674         prefix = self.vapi.nat64_prefix_dump()
3675         self.assertEqual(len(prefix), 1)
3676         self.assertEqual(prefix[0].prefix, global_pref64_n)
3677         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3678         self.assertEqual(prefix[0].vrf_id, 0)
3679
3680         # Add tenant specific prefix
3681         vrf1_pref64 = "2001:db8:122:300::"
3682         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3683         vrf1_pref64_len = 56
3684         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3685                                        vrf1_pref64_len,
3686                                        vrf_id=self.vrf1_id)
3687         prefix = self.vapi.nat64_prefix_dump()
3688         self.assertEqual(len(prefix), 2)
3689
3690         # Global prefix
3691         pkts = self.create_stream_in_ip6(self.pg0,
3692                                          self.pg1,
3693                                          pref=global_pref64,
3694                                          plen=global_pref64_len)
3695         self.pg0.add_stream(pkts)
3696         self.pg_enable_capture(self.pg_interfaces)
3697         self.pg_start()
3698         capture = self.pg1.get_capture(len(pkts))
3699         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3700                                 dst_ip=self.pg1.remote_ip4)
3701
3702         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3703         self.pg1.add_stream(pkts)
3704         self.pg_enable_capture(self.pg_interfaces)
3705         self.pg_start()
3706         capture = self.pg0.get_capture(len(pkts))
3707         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3708                                   global_pref64,
3709                                   global_pref64_len)
3710         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3711
3712         # Tenant specific prefix
3713         pkts = self.create_stream_in_ip6(self.pg2,
3714                                          self.pg1,
3715                                          pref=vrf1_pref64,
3716                                          plen=vrf1_pref64_len)
3717         self.pg2.add_stream(pkts)
3718         self.pg_enable_capture(self.pg_interfaces)
3719         self.pg_start()
3720         capture = self.pg1.get_capture(len(pkts))
3721         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3722                                 dst_ip=self.pg1.remote_ip4)
3723
3724         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3725         self.pg1.add_stream(pkts)
3726         self.pg_enable_capture(self.pg_interfaces)
3727         self.pg_start()
3728         capture = self.pg2.get_capture(len(pkts))
3729         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3730                                   vrf1_pref64,
3731                                   vrf1_pref64_len)
3732         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3733
3734     def test_unknown_proto(self):
3735         """ NAT64 translate packet with unknown protocol """
3736
3737         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3738                                                 self.nat_addr_n)
3739         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3740         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3741         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3742
3743         # in2out
3744         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3745              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3746              TCP(sport=self.tcp_port_in, dport=20))
3747         self.pg0.add_stream(p)
3748         self.pg_enable_capture(self.pg_interfaces)
3749         self.pg_start()
3750         p = self.pg1.get_capture(1)
3751
3752         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3753              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3754              GRE() /
3755              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3756              TCP(sport=1234, dport=1234))
3757         self.pg0.add_stream(p)
3758         self.pg_enable_capture(self.pg_interfaces)
3759         self.pg_start()
3760         p = self.pg1.get_capture(1)
3761         packet = p[0]
3762         try:
3763             self.assertEqual(packet[IP].src, self.nat_addr)
3764             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3765             self.assertTrue(packet.haslayer(GRE))
3766             self.check_ip_checksum(packet)
3767         except:
3768             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3769             raise
3770
3771         # out2in
3772         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3773              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3774              GRE() /
3775              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3776              TCP(sport=1234, dport=1234))
3777         self.pg1.add_stream(p)
3778         self.pg_enable_capture(self.pg_interfaces)
3779         self.pg_start()
3780         p = self.pg0.get_capture(1)
3781         packet = p[0]
3782         try:
3783             self.assertEqual(packet[IPv6].src, remote_ip6)
3784             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3785             self.assertEqual(packet[IPv6].nh, 47)
3786         except:
3787             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3788             raise
3789
3790     def test_hairpinning_unknown_proto(self):
3791         """ NAT64 translate packet with unknown protocol - hairpinning """
3792
3793         client = self.pg0.remote_hosts[0]
3794         server = self.pg0.remote_hosts[1]
3795         server_tcp_in_port = 22
3796         server_tcp_out_port = 4022
3797         client_tcp_in_port = 1234
3798         client_tcp_out_port = 1235
3799         server_nat_ip = "10.0.0.100"
3800         client_nat_ip = "10.0.0.110"
3801         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3802         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3803         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3804         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3805
3806         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3807                                                 client_nat_ip_n)
3808         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3809         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3810
3811         self.vapi.nat64_add_del_static_bib(server.ip6n,
3812                                            server_nat_ip_n,
3813                                            server_tcp_in_port,
3814                                            server_tcp_out_port,
3815                                            IP_PROTOS.tcp)
3816
3817         self.vapi.nat64_add_del_static_bib(server.ip6n,
3818                                            server_nat_ip_n,
3819                                            0,
3820                                            0,
3821                                            IP_PROTOS.gre)
3822
3823         self.vapi.nat64_add_del_static_bib(client.ip6n,
3824                                            client_nat_ip_n,
3825                                            client_tcp_in_port,
3826                                            client_tcp_out_port,
3827                                            IP_PROTOS.tcp)
3828
3829         # client to server
3830         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3831              IPv6(src=client.ip6, dst=server_nat_ip6) /
3832              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3833         self.pg0.add_stream(p)
3834         self.pg_enable_capture(self.pg_interfaces)
3835         self.pg_start()
3836         p = self.pg0.get_capture(1)
3837
3838         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3839              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3840              GRE() /
3841              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3842              TCP(sport=1234, dport=1234))
3843         self.pg0.add_stream(p)
3844         self.pg_enable_capture(self.pg_interfaces)
3845         self.pg_start()
3846         p = self.pg0.get_capture(1)
3847         packet = p[0]
3848         try:
3849             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3850             self.assertEqual(packet[IPv6].dst, server.ip6)
3851             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3852         except:
3853             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3854             raise
3855
3856         # server to client
3857         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3858              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3859              GRE() /
3860              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3861              TCP(sport=1234, dport=1234))
3862         self.pg0.add_stream(p)
3863         self.pg_enable_capture(self.pg_interfaces)
3864         self.pg_start()
3865         p = self.pg0.get_capture(1)
3866         packet = p[0]
3867         try:
3868             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3869             self.assertEqual(packet[IPv6].dst, client.ip6)
3870             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3871         except:
3872             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3873             raise
3874
3875     def test_one_armed_nat64(self):
3876         """ One armed NAT64 """
3877         external_port = 0
3878         remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
3879                                            '64:ff9b::',
3880                                            96)
3881
3882         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3883                                                 self.nat_addr_n)
3884         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
3885         self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
3886
3887         # in2out
3888         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3889              IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
3890              TCP(sport=12345, dport=80))
3891         self.pg3.add_stream(p)
3892         self.pg_enable_capture(self.pg_interfaces)
3893         self.pg_start()
3894         capture = self.pg3.get_capture(1)
3895         p = capture[0]
3896         try:
3897             ip = p[IP]
3898             tcp = p[TCP]
3899             self.assertEqual(ip.src, self.nat_addr)
3900             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3901             self.assertNotEqual(tcp.sport, 12345)
3902             external_port = tcp.sport
3903             self.assertEqual(tcp.dport, 80)
3904             self.check_tcp_checksum(p)
3905             self.check_ip_checksum(p)
3906         except:
3907             self.logger.error(ppp("Unexpected or invalid packet:", p))
3908             raise
3909
3910         # out2in
3911         p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
3912              IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
3913              TCP(sport=80, dport=external_port))
3914         self.pg3.add_stream(p)
3915         self.pg_enable_capture(self.pg_interfaces)
3916         self.pg_start()
3917         capture = self.pg3.get_capture(1)
3918         p = capture[0]
3919         try:
3920             ip = p[IPv6]
3921             tcp = p[TCP]
3922             self.assertEqual(ip.src, remote_host_ip6)
3923             self.assertEqual(ip.dst, self.pg3.remote_ip6)
3924             self.assertEqual(tcp.sport, 80)
3925             self.assertEqual(tcp.dport, 12345)
3926             self.check_tcp_checksum(p)
3927         except:
3928             self.logger.error(ppp("Unexpected or invalid packet:", p))
3929             raise
3930
3931     def nat64_get_ses_num(self):
3932         """
3933         Return number of active NAT64 sessions.
3934         """
3935         st = self.vapi.nat64_st_dump()
3936         return len(st)
3937
3938     def clear_nat64(self):
3939         """
3940         Clear NAT64 configuration.
3941         """
3942         self.vapi.nat64_set_timeouts()
3943
3944         interfaces = self.vapi.nat64_interface_dump()
3945         for intf in interfaces:
3946             if intf.is_inside > 1:
3947                 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3948                                                   0,
3949                                                   is_add=0)
3950             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3951                                               intf.is_inside,
3952                                               is_add=0)
3953
3954         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3955         for bibe in bib:
3956             if bibe.is_static:
3957                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3958                                                    bibe.o_addr,
3959                                                    bibe.i_port,
3960                                                    bibe.o_port,
3961                                                    bibe.proto,
3962                                                    bibe.vrf_id,
3963                                                    is_add=0)
3964
3965         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3966         for bibe in bib:
3967             if bibe.is_static:
3968                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3969                                                    bibe.o_addr,
3970                                                    bibe.i_port,
3971                                                    bibe.o_port,
3972                                                    bibe.proto,
3973                                                    bibe.vrf_id,
3974                                                    is_add=0)
3975
3976         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3977         for bibe in bib:
3978             if bibe.is_static:
3979                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3980                                                    bibe.o_addr,
3981                                                    bibe.i_port,
3982                                                    bibe.o_port,
3983                                                    bibe.proto,
3984                                                    bibe.vrf_id,
3985                                                    is_add=0)
3986
3987         adresses = self.vapi.nat64_pool_addr_dump()
3988         for addr in adresses:
3989             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3990                                                     addr.address,
3991                                                     vrf_id=addr.vrf_id,
3992                                                     is_add=0)
3993
3994         prefixes = self.vapi.nat64_prefix_dump()
3995         for prefix in prefixes:
3996             self.vapi.nat64_add_del_prefix(prefix.prefix,
3997                                            prefix.prefix_len,
3998                                            vrf_id=prefix.vrf_id,
3999                                            is_add=0)
4000
4001     def tearDown(self):
4002         super(TestNAT64, self).tearDown()
4003         if not self.vpp_dead:
4004             self.logger.info(self.vapi.cli("show nat64 pool"))
4005             self.logger.info(self.vapi.cli("show nat64 interfaces"))
4006             self.logger.info(self.vapi.cli("show nat64 prefix"))
4007             self.logger.info(self.vapi.cli("show nat64 bib all"))
4008             self.logger.info(self.vapi.cli("show nat64 session table all"))
4009             self.clear_nat64()
4010
4011
4012 class TestDSlite(MethodHolder):
4013     """ DS-Lite Test Cases """
4014
4015     @classmethod
4016     def setUpClass(cls):
4017         super(TestDSlite, cls).setUpClass()
4018
4019         try:
4020             cls.nat_addr = '10.0.0.3'
4021             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4022
4023             cls.create_pg_interfaces(range(2))
4024             cls.pg0.admin_up()
4025             cls.pg0.config_ip4()
4026             cls.pg0.resolve_arp()
4027             cls.pg1.admin_up()
4028             cls.pg1.config_ip6()
4029             cls.pg1.generate_remote_hosts(2)
4030             cls.pg1.configure_ipv6_neighbors()
4031
4032         except Exception:
4033             super(TestDSlite, cls).tearDownClass()
4034             raise
4035
4036     def test_dslite(self):
4037         """ Test DS-Lite """
4038         self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
4039                                                  self.nat_addr_n)
4040         aftr_ip4 = '192.0.0.1'
4041         aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
4042         aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
4043         aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
4044         self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
4045
4046         # UDP
4047         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4048              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
4049              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4050              UDP(sport=20000, dport=10000))
4051         self.pg1.add_stream(p)
4052         self.pg_enable_capture(self.pg_interfaces)
4053         self.pg_start()
4054         capture = self.pg0.get_capture(1)
4055         capture = capture[0]
4056         self.assertFalse(capture.haslayer(IPv6))
4057         self.assertEqual(capture[IP].src, self.nat_addr)
4058         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4059         self.assertNotEqual(capture[UDP].sport, 20000)
4060         self.assertEqual(capture[UDP].dport, 10000)
4061         self.check_ip_checksum(capture)
4062         out_port = capture[UDP].sport
4063
4064         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4065              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4066              UDP(sport=10000, dport=out_port))
4067         self.pg0.add_stream(p)
4068         self.pg_enable_capture(self.pg_interfaces)
4069         self.pg_start()
4070         capture = self.pg1.get_capture(1)
4071         capture = capture[0]
4072         self.assertEqual(capture[IPv6].src, aftr_ip6)
4073         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
4074         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4075         self.assertEqual(capture[IP].dst, '192.168.1.1')
4076         self.assertEqual(capture[UDP].sport, 10000)
4077         self.assertEqual(capture[UDP].dport, 20000)
4078         self.check_ip_checksum(capture)
4079
4080         # TCP
4081         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4082              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4083              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4084              TCP(sport=20001, dport=10001))
4085         self.pg1.add_stream(p)
4086         self.pg_enable_capture(self.pg_interfaces)
4087         self.pg_start()
4088         capture = self.pg0.get_capture(1)
4089         capture = capture[0]
4090         self.assertFalse(capture.haslayer(IPv6))
4091         self.assertEqual(capture[IP].src, self.nat_addr)
4092         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4093         self.assertNotEqual(capture[TCP].sport, 20001)
4094         self.assertEqual(capture[TCP].dport, 10001)
4095         self.check_ip_checksum(capture)
4096         self.check_tcp_checksum(capture)
4097         out_port = capture[TCP].sport
4098
4099         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4100              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4101              TCP(sport=10001, dport=out_port))
4102         self.pg0.add_stream(p)
4103         self.pg_enable_capture(self.pg_interfaces)
4104         self.pg_start()
4105         capture = self.pg1.get_capture(1)
4106         capture = capture[0]
4107         self.assertEqual(capture[IPv6].src, aftr_ip6)
4108         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4109         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4110         self.assertEqual(capture[IP].dst, '192.168.1.1')
4111         self.assertEqual(capture[TCP].sport, 10001)
4112         self.assertEqual(capture[TCP].dport, 20001)
4113         self.check_ip_checksum(capture)
4114         self.check_tcp_checksum(capture)
4115
4116         # ICMP
4117         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4118              IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
4119              IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
4120              ICMP(id=4000, type='echo-request'))
4121         self.pg1.add_stream(p)
4122         self.pg_enable_capture(self.pg_interfaces)
4123         self.pg_start()
4124         capture = self.pg0.get_capture(1)
4125         capture = capture[0]
4126         self.assertFalse(capture.haslayer(IPv6))
4127         self.assertEqual(capture[IP].src, self.nat_addr)
4128         self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
4129         self.assertNotEqual(capture[ICMP].id, 4000)
4130         self.check_ip_checksum(capture)
4131         self.check_icmp_checksum(capture)
4132         out_id = capture[ICMP].id
4133
4134         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4135              IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
4136              ICMP(id=out_id, type='echo-reply'))
4137         self.pg0.add_stream(p)
4138         self.pg_enable_capture(self.pg_interfaces)
4139         self.pg_start()
4140         capture = self.pg1.get_capture(1)
4141         capture = capture[0]
4142         self.assertEqual(capture[IPv6].src, aftr_ip6)
4143         self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
4144         self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
4145         self.assertEqual(capture[IP].dst, '192.168.1.1')
4146         self.assertEqual(capture[ICMP].id, 4000)
4147         self.check_ip_checksum(capture)
4148         self.check_icmp_checksum(capture)
4149
4150     def tearDown(self):
4151         super(TestDSlite, self).tearDown()
4152         if not self.vpp_dead:
4153             self.logger.info(self.vapi.cli("show dslite pool"))
4154             self.logger.info(
4155                 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
4156             self.logger.info(self.vapi.cli("show dslite sessions"))
4157
4158 if __name__ == '__main__':
4159     unittest.main(testRunner=VppTestRunner)