NAT: Rename snat plugin to nat (VPP-955)
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ NAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_ip_checksum(self, pkt):
31         """
32         Check IP checksum of the packet
33
34         :param pkt: Packet to check IP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['IP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
40
41     def check_tcp_checksum(self, pkt):
42         """
43         Check TCP checksum in IP packet
44
45         :param pkt: Packet to check TCP checksum
46         """
47         new = pkt.__class__(str(pkt))
48         del new['TCP'].chksum
49         new = new.__class__(str(new))
50         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
51
52     def check_udp_checksum(self, pkt):
53         """
54         Check UDP checksum in IP packet
55
56         :param pkt: Packet to check UDP checksum
57         """
58         new = pkt.__class__(str(pkt))
59         del new['UDP'].chksum
60         new = new.__class__(str(new))
61         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
62
63     def check_icmp_errror_embedded(self, pkt):
64         """
65         Check ICMP error embeded packet checksum
66
67         :param pkt: Packet to check ICMP error embeded packet checksum
68         """
69         if pkt.haslayer(IPerror):
70             new = pkt.__class__(str(pkt))
71             del new['IPerror'].chksum
72             new = new.__class__(str(new))
73             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
74
75         if pkt.haslayer(TCPerror):
76             new = pkt.__class__(str(pkt))
77             del new['TCPerror'].chksum
78             new = new.__class__(str(new))
79             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
80
81         if pkt.haslayer(UDPerror):
82             if pkt['UDPerror'].chksum != 0:
83                 new = pkt.__class__(str(pkt))
84                 del new['UDPerror'].chksum
85                 new = new.__class__(str(new))
86                 self.assertEqual(new['UDPerror'].chksum,
87                                  pkt['UDPerror'].chksum)
88
89         if pkt.haslayer(ICMPerror):
90             del new['ICMPerror'].chksum
91             new = new.__class__(str(new))
92             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
93
94     def check_icmp_checksum(self, pkt):
95         """
96         Check ICMP checksum in IPv4 packet
97
98         :param pkt: Packet to check ICMP checksum
99         """
100         new = pkt.__class__(str(pkt))
101         del new['ICMP'].chksum
102         new = new.__class__(str(new))
103         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
104         if pkt.haslayer(IPerror):
105             self.check_icmp_errror_embedded(pkt)
106
107     def check_icmpv6_checksum(self, pkt):
108         """
109         Check ICMPv6 checksum in IPv4 packet
110
111         :param pkt: Packet to check ICMPv6 checksum
112         """
113         new = pkt.__class__(str(pkt))
114         if pkt.haslayer(ICMPv6DestUnreach):
115             del new['ICMPv6DestUnreach'].cksum
116             new = new.__class__(str(new))
117             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
118                              pkt['ICMPv6DestUnreach'].cksum)
119             self.check_icmp_errror_embedded(pkt)
120         if pkt.haslayer(ICMPv6EchoRequest):
121             del new['ICMPv6EchoRequest'].cksum
122             new = new.__class__(str(new))
123             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
124                              pkt['ICMPv6EchoRequest'].cksum)
125         if pkt.haslayer(ICMPv6EchoReply):
126             del new['ICMPv6EchoReply'].cksum
127             new = new.__class__(str(new))
128             self.assertEqual(new['ICMPv6EchoReply'].cksum,
129                              pkt['ICMPv6EchoReply'].cksum)
130
131     def create_stream_in(self, in_if, out_if, ttl=64):
132         """
133         Create packet stream for inside network
134
135         :param in_if: Inside interface
136         :param out_if: Outside interface
137         :param ttl: TTL of generated packets
138         """
139         pkts = []
140         # TCP
141         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
142              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
143              TCP(sport=self.tcp_port_in, dport=20))
144         pkts.append(p)
145
146         # UDP
147         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
148              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
149              UDP(sport=self.udp_port_in, dport=20))
150         pkts.append(p)
151
152         # ICMP
153         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
154              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
155              ICMP(id=self.icmp_id_in, type='echo-request'))
156         pkts.append(p)
157
158         return pkts
159
160     def compose_ip6(self, ip4, pref, plen):
161         """
162         Compose IPv4-embedded IPv6 addresses
163
164         :param ip4: IPv4 address
165         :param pref: IPv6 prefix
166         :param plen: IPv6 prefix length
167         :returns: IPv4-embedded IPv6 addresses
168         """
169         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
170         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
171         if plen == 32:
172             pref_n[4] = ip4_n[0]
173             pref_n[5] = ip4_n[1]
174             pref_n[6] = ip4_n[2]
175             pref_n[7] = ip4_n[3]
176         elif plen == 40:
177             pref_n[5] = ip4_n[0]
178             pref_n[6] = ip4_n[1]
179             pref_n[7] = ip4_n[2]
180             pref_n[9] = ip4_n[3]
181         elif plen == 48:
182             pref_n[6] = ip4_n[0]
183             pref_n[7] = ip4_n[1]
184             pref_n[9] = ip4_n[2]
185             pref_n[10] = ip4_n[3]
186         elif plen == 56:
187             pref_n[7] = ip4_n[0]
188             pref_n[9] = ip4_n[1]
189             pref_n[10] = ip4_n[2]
190             pref_n[11] = ip4_n[3]
191         elif plen == 64:
192             pref_n[9] = ip4_n[0]
193             pref_n[10] = ip4_n[1]
194             pref_n[11] = ip4_n[2]
195             pref_n[12] = ip4_n[3]
196         elif plen == 96:
197             pref_n[12] = ip4_n[0]
198             pref_n[13] = ip4_n[1]
199             pref_n[14] = ip4_n[2]
200             pref_n[15] = ip4_n[3]
201         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
202
203     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
204         """
205         Create IPv6 packet stream for inside network
206
207         :param in_if: Inside interface
208         :param out_if: Outside interface
209         :param ttl: Hop Limit of generated packets
210         :param pref: NAT64 prefix
211         :param plen: NAT64 prefix length
212         """
213         pkts = []
214         if pref is None:
215             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
216         else:
217             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
218
219         # TCP
220         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
222              TCP(sport=self.tcp_port_in, dport=20))
223         pkts.append(p)
224
225         # UDP
226         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
228              UDP(sport=self.udp_port_in, dport=20))
229         pkts.append(p)
230
231         # ICMP
232         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
233              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
234              ICMPv6EchoRequest(id=self.icmp_id_in))
235         pkts.append(p)
236
237         return pkts
238
239     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
240         """
241         Create packet stream for outside network
242
243         :param out_if: Outside interface
244         :param dst_ip: Destination IP address (Default use global NAT address)
245         :param ttl: TTL of generated packets
246         """
247         if dst_ip is None:
248             dst_ip = self.nat_addr
249         pkts = []
250         # TCP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              TCP(dport=self.tcp_port_out, sport=20))
254         pkts.append(p)
255
256         # UDP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              UDP(dport=self.udp_port_out, sport=20))
260         pkts.append(p)
261
262         # ICMP
263         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
264              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
265              ICMP(id=self.icmp_id_out, type='echo-reply'))
266         pkts.append(p)
267
268         return pkts
269
270     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
271                            packet_num=3, dst_ip=None):
272         """
273         Verify captured packets on outside network
274
275         :param capture: Captured packets
276         :param nat_ip: Translated IP address (Default use global NAT address)
277         :param same_port: Sorce port number is not translated (Default False)
278         :param packet_num: Expected number of packets (Default 3)
279         :param dst_ip: Destination IP address (Default do not verify)
280         """
281         if nat_ip is None:
282             nat_ip = self.nat_addr
283         self.assertEqual(packet_num, len(capture))
284         for packet in capture:
285             try:
286                 self.check_ip_checksum(packet)
287                 self.assertEqual(packet[IP].src, nat_ip)
288                 if dst_ip is not None:
289                     self.assertEqual(packet[IP].dst, dst_ip)
290                 if packet.haslayer(TCP):
291                     if same_port:
292                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
293                     else:
294                         self.assertNotEqual(
295                             packet[TCP].sport, self.tcp_port_in)
296                     self.tcp_port_out = packet[TCP].sport
297                     self.check_tcp_checksum(packet)
298                 elif packet.haslayer(UDP):
299                     if same_port:
300                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
301                     else:
302                         self.assertNotEqual(
303                             packet[UDP].sport, self.udp_port_in)
304                     self.udp_port_out = packet[UDP].sport
305                 else:
306                     if same_port:
307                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
308                     else:
309                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
310                     self.icmp_id_out = packet[ICMP].id
311                     self.check_icmp_checksum(packet)
312             except:
313                 self.logger.error(ppp("Unexpected or invalid packet "
314                                       "(outside network):", packet))
315                 raise
316
317     def verify_capture_in(self, capture, in_if, packet_num=3):
318         """
319         Verify captured packets on inside network
320
321         :param capture: Captured packets
322         :param in_if: Inside interface
323         :param packet_num: Expected number of packets (Default 3)
324         """
325         self.assertEqual(packet_num, len(capture))
326         for packet in capture:
327             try:
328                 self.check_ip_checksum(packet)
329                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
330                 if packet.haslayer(TCP):
331                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
332                     self.check_tcp_checksum(packet)
333                 elif packet.haslayer(UDP):
334                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
335                 else:
336                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
337                     self.check_icmp_checksum(packet)
338             except:
339                 self.logger.error(ppp("Unexpected or invalid packet "
340                                       "(inside network):", packet))
341                 raise
342
343     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
344         """
345         Verify captured IPv6 packets on inside network
346
347         :param capture: Captured packets
348         :param src_ip: Source IP
349         :param dst_ip: Destination IP address
350         :param packet_num: Expected number of packets (Default 3)
351         """
352         self.assertEqual(packet_num, len(capture))
353         for packet in capture:
354             try:
355                 self.assertEqual(packet[IPv6].src, src_ip)
356                 self.assertEqual(packet[IPv6].dst, dst_ip)
357                 if packet.haslayer(TCP):
358                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
359                     self.check_tcp_checksum(packet)
360                 elif packet.haslayer(UDP):
361                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
362                     self.check_udp_checksum(packet)
363                 else:
364                     self.assertEqual(packet[ICMPv6EchoReply].id,
365                                      self.icmp_id_in)
366                     self.check_icmpv6_checksum(packet)
367             except:
368                 self.logger.error(ppp("Unexpected or invalid packet "
369                                       "(inside network):", packet))
370                 raise
371
372     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
373         """
374         Verify captured packet that don't have to be translated
375
376         :param capture: Captured packets
377         :param ingress_if: Ingress interface
378         :param egress_if: Egress interface
379         """
380         for packet in capture:
381             try:
382                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
383                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
384                 if packet.haslayer(TCP):
385                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
386                 elif packet.haslayer(UDP):
387                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
388                 else:
389                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
390             except:
391                 self.logger.error(ppp("Unexpected or invalid packet "
392                                       "(inside network):", packet))
393                 raise
394
395     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
396                                             packet_num=3, icmp_type=11):
397         """
398         Verify captured packets with ICMP errors on outside network
399
400         :param capture: Captured packets
401         :param src_ip: Translated IP address or IP address of VPP
402                        (Default use global NAT address)
403         :param packet_num: Expected number of packets (Default 3)
404         :param icmp_type: Type of error ICMP packet
405                           we are expecting (Default 11)
406         """
407         if src_ip is None:
408             src_ip = self.nat_addr
409         self.assertEqual(packet_num, len(capture))
410         for packet in capture:
411             try:
412                 self.assertEqual(packet[IP].src, src_ip)
413                 self.assertTrue(packet.haslayer(ICMP))
414                 icmp = packet[ICMP]
415                 self.assertEqual(icmp.type, icmp_type)
416                 self.assertTrue(icmp.haslayer(IPerror))
417                 inner_ip = icmp[IPerror]
418                 if inner_ip.haslayer(TCPerror):
419                     self.assertEqual(inner_ip[TCPerror].dport,
420                                      self.tcp_port_out)
421                 elif inner_ip.haslayer(UDPerror):
422                     self.assertEqual(inner_ip[UDPerror].dport,
423                                      self.udp_port_out)
424                 else:
425                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
426             except:
427                 self.logger.error(ppp("Unexpected or invalid packet "
428                                       "(outside network):", packet))
429                 raise
430
431     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
432                                            icmp_type=11):
433         """
434         Verify captured packets with ICMP errors on inside network
435
436         :param capture: Captured packets
437         :param in_if: Inside interface
438         :param packet_num: Expected number of packets (Default 3)
439         :param icmp_type: Type of error ICMP packet
440                           we are expecting (Default 11)
441         """
442         self.assertEqual(packet_num, len(capture))
443         for packet in capture:
444             try:
445                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
446                 self.assertTrue(packet.haslayer(ICMP))
447                 icmp = packet[ICMP]
448                 self.assertEqual(icmp.type, icmp_type)
449                 self.assertTrue(icmp.haslayer(IPerror))
450                 inner_ip = icmp[IPerror]
451                 if inner_ip.haslayer(TCPerror):
452                     self.assertEqual(inner_ip[TCPerror].sport,
453                                      self.tcp_port_in)
454                 elif inner_ip.haslayer(UDPerror):
455                     self.assertEqual(inner_ip[UDPerror].sport,
456                                      self.udp_port_in)
457                 else:
458                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
459             except:
460                 self.logger.error(ppp("Unexpected or invalid packet "
461                                       "(inside network):", packet))
462                 raise
463
464     def verify_ipfix_nat44_ses(self, data):
465         """
466         Verify IPFIX NAT44 session create/delete event
467
468         :param data: Decoded IPFIX data records
469         """
470         nat44_ses_create_num = 0
471         nat44_ses_delete_num = 0
472         self.assertEqual(6, len(data))
473         for record in data:
474             # natEvent
475             self.assertIn(ord(record[230]), [4, 5])
476             if ord(record[230]) == 4:
477                 nat44_ses_create_num += 1
478             else:
479                 nat44_ses_delete_num += 1
480             # sourceIPv4Address
481             self.assertEqual(self.pg0.remote_ip4n, record[8])
482             # postNATSourceIPv4Address
483             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
484                              record[225])
485             # ingressVRFID
486             self.assertEqual(struct.pack("!I", 0), record[234])
487             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
488             if IP_PROTOS.icmp == ord(record[4]):
489                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
490                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
491                                  record[227])
492             elif IP_PROTOS.tcp == ord(record[4]):
493                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
494                                  record[7])
495                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
496                                  record[227])
497             elif IP_PROTOS.udp == ord(record[4]):
498                 self.assertEqual(struct.pack("!H", self.udp_port_in),
499                                  record[7])
500                 self.assertEqual(struct.pack("!H", self.udp_port_out),
501                                  record[227])
502             else:
503                 self.fail("Invalid protocol")
504         self.assertEqual(3, nat44_ses_create_num)
505         self.assertEqual(3, nat44_ses_delete_num)
506
507     def verify_ipfix_addr_exhausted(self, data):
508         """
509         Verify IPFIX NAT addresses event
510
511         :param data: Decoded IPFIX data records
512         """
513         self.assertEqual(1, len(data))
514         record = data[0]
515         # natEvent
516         self.assertEqual(ord(record[230]), 3)
517         # natPoolID
518         self.assertEqual(struct.pack("!I", 0), record[283])
519
520
521 class TestNAT44(MethodHolder):
522     """ NAT44 Test Cases """
523
524     @classmethod
525     def setUpClass(cls):
526         super(TestNAT44, cls).setUpClass()
527
528         try:
529             cls.tcp_port_in = 6303
530             cls.tcp_port_out = 6303
531             cls.udp_port_in = 6304
532             cls.udp_port_out = 6304
533             cls.icmp_id_in = 6305
534             cls.icmp_id_out = 6305
535             cls.nat_addr = '10.0.0.3'
536             cls.ipfix_src_port = 4739
537             cls.ipfix_domain_id = 1
538
539             cls.create_pg_interfaces(range(9))
540             cls.interfaces = list(cls.pg_interfaces[0:4])
541
542             for i in cls.interfaces:
543                 i.admin_up()
544                 i.config_ip4()
545                 i.resolve_arp()
546
547             cls.pg0.generate_remote_hosts(3)
548             cls.pg0.configure_ipv4_neighbors()
549
550             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
551
552             cls.pg4._local_ip4 = "172.16.255.1"
553             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
554             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
555             cls.pg4.set_table_ip4(10)
556             cls.pg5._local_ip4 = "172.17.255.3"
557             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
558             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
559             cls.pg5.set_table_ip4(10)
560             cls.pg6._local_ip4 = "172.16.255.1"
561             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
562             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
563             cls.pg6.set_table_ip4(20)
564             for i in cls.overlapping_interfaces:
565                 i.config_ip4()
566                 i.admin_up()
567                 i.resolve_arp()
568
569             cls.pg7.admin_up()
570             cls.pg8.admin_up()
571
572         except Exception:
573             super(TestNAT44, cls).tearDownClass()
574             raise
575
576     def clear_nat44(self):
577         """
578         Clear NAT44 configuration.
579         """
580         # I found no elegant way to do this
581         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
582                                    dst_address_length=32,
583                                    next_hop_address=self.pg7.remote_ip4n,
584                                    next_hop_sw_if_index=self.pg7.sw_if_index,
585                                    is_add=0)
586         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
587                                    dst_address_length=32,
588                                    next_hop_address=self.pg8.remote_ip4n,
589                                    next_hop_sw_if_index=self.pg8.sw_if_index,
590                                    is_add=0)
591
592         for intf in [self.pg7, self.pg8]:
593             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
594             for n in neighbors:
595                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
596                                               n.mac_address,
597                                               n.ip_address,
598                                               is_add=0)
599
600         if self.pg7.has_ip4_config:
601             self.pg7.unconfig_ip4()
602
603         interfaces = self.vapi.nat44_interface_addr_dump()
604         for intf in interfaces:
605             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
606
607         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
608                             domain_id=self.ipfix_domain_id)
609         self.ipfix_src_port = 4739
610         self.ipfix_domain_id = 1
611
612         interfaces = self.vapi.nat44_interface_dump()
613         for intf in interfaces:
614             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
615                                                       intf.is_inside,
616                                                       is_add=0)
617
618         interfaces = self.vapi.nat44_interface_output_feature_dump()
619         for intf in interfaces:
620             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
621                                                              intf.is_inside,
622                                                              is_add=0)
623
624         static_mappings = self.vapi.nat44_static_mapping_dump()
625         for sm in static_mappings:
626             self.vapi.nat44_add_del_static_mapping(
627                 sm.local_ip_address,
628                 sm.external_ip_address,
629                 local_port=sm.local_port,
630                 external_port=sm.external_port,
631                 addr_only=sm.addr_only,
632                 vrf_id=sm.vrf_id,
633                 protocol=sm.protocol,
634                 is_add=0)
635
636         adresses = self.vapi.nat44_address_dump()
637         for addr in adresses:
638             self.vapi.nat44_add_del_address_range(addr.ip_address,
639                                                   addr.ip_address,
640                                                   is_add=0)
641
642     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
643                                  local_port=0, external_port=0, vrf_id=0,
644                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
645                                  proto=0):
646         """
647         Add/delete NAT44 static mapping
648
649         :param local_ip: Local IP address
650         :param external_ip: External IP address
651         :param local_port: Local port number (Optional)
652         :param external_port: External port number (Optional)
653         :param vrf_id: VRF ID (Default 0)
654         :param is_add: 1 if add, 0 if delete (Default add)
655         :param external_sw_if_index: External interface instead of IP address
656         :param proto: IP protocol (Mandatory if port specified)
657         """
658         addr_only = 1
659         if local_port and external_port:
660             addr_only = 0
661         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
662         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
663         self.vapi.nat44_add_del_static_mapping(
664             l_ip,
665             e_ip,
666             external_sw_if_index,
667             local_port,
668             external_port,
669             addr_only,
670             vrf_id,
671             proto,
672             is_add)
673
674     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
675         """
676         Add/delete NAT44 address
677
678         :param ip: IP address
679         :param is_add: 1 if add, 0 if delete (Default add)
680         """
681         nat_addr = socket.inet_pton(socket.AF_INET, ip)
682         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
683                                               vrf_id=vrf_id)
684
685     def test_dynamic(self):
686         """ NAT44 dynamic translation test """
687
688         self.nat44_add_address(self.nat_addr)
689         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
690         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
691                                                   is_inside=0)
692
693         # in2out
694         pkts = self.create_stream_in(self.pg0, self.pg1)
695         self.pg0.add_stream(pkts)
696         self.pg_enable_capture(self.pg_interfaces)
697         self.pg_start()
698         capture = self.pg1.get_capture(len(pkts))
699         self.verify_capture_out(capture)
700
701         # out2in
702         pkts = self.create_stream_out(self.pg1)
703         self.pg1.add_stream(pkts)
704         self.pg_enable_capture(self.pg_interfaces)
705         self.pg_start()
706         capture = self.pg0.get_capture(len(pkts))
707         self.verify_capture_in(capture, self.pg0)
708
709     def test_dynamic_icmp_errors_in2out_ttl_1(self):
710         """ NAT44 handling of client packets with TTL=1 """
711
712         self.nat44_add_address(self.nat_addr)
713         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
714         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
715                                                   is_inside=0)
716
717         # Client side - generate traffic
718         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
719         self.pg0.add_stream(pkts)
720         self.pg_enable_capture(self.pg_interfaces)
721         self.pg_start()
722
723         # Client side - verify ICMP type 11 packets
724         capture = self.pg0.get_capture(len(pkts))
725         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
726
727     def test_dynamic_icmp_errors_out2in_ttl_1(self):
728         """ NAT44 handling of server packets with TTL=1 """
729
730         self.nat44_add_address(self.nat_addr)
731         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
732         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
733                                                   is_inside=0)
734
735         # Client side - create sessions
736         pkts = self.create_stream_in(self.pg0, self.pg1)
737         self.pg0.add_stream(pkts)
738         self.pg_enable_capture(self.pg_interfaces)
739         self.pg_start()
740
741         # Server side - generate traffic
742         capture = self.pg1.get_capture(len(pkts))
743         self.verify_capture_out(capture)
744         pkts = self.create_stream_out(self.pg1, ttl=1)
745         self.pg1.add_stream(pkts)
746         self.pg_enable_capture(self.pg_interfaces)
747         self.pg_start()
748
749         # Server side - verify ICMP type 11 packets
750         capture = self.pg1.get_capture(len(pkts))
751         self.verify_capture_out_with_icmp_errors(capture,
752                                                  src_ip=self.pg1.local_ip4)
753
754     def test_dynamic_icmp_errors_in2out_ttl_2(self):
755         """ NAT44 handling of error responses to client packets with TTL=2 """
756
757         self.nat44_add_address(self.nat_addr)
758         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
759         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
760                                                   is_inside=0)
761
762         # Client side - generate traffic
763         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
764         self.pg0.add_stream(pkts)
765         self.pg_enable_capture(self.pg_interfaces)
766         self.pg_start()
767
768         # Server side - simulate ICMP type 11 response
769         capture = self.pg1.get_capture(len(pkts))
770         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
771                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
772                 ICMP(type=11) / packet[IP] for packet in capture]
773         self.pg1.add_stream(pkts)
774         self.pg_enable_capture(self.pg_interfaces)
775         self.pg_start()
776
777         # Client side - verify ICMP type 11 packets
778         capture = self.pg0.get_capture(len(pkts))
779         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
780
781     def test_dynamic_icmp_errors_out2in_ttl_2(self):
782         """ NAT44 handling of error responses to server packets with TTL=2 """
783
784         self.nat44_add_address(self.nat_addr)
785         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
786         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
787                                                   is_inside=0)
788
789         # Client side - create sessions
790         pkts = self.create_stream_in(self.pg0, self.pg1)
791         self.pg0.add_stream(pkts)
792         self.pg_enable_capture(self.pg_interfaces)
793         self.pg_start()
794
795         # Server side - generate traffic
796         capture = self.pg1.get_capture(len(pkts))
797         self.verify_capture_out(capture)
798         pkts = self.create_stream_out(self.pg1, ttl=2)
799         self.pg1.add_stream(pkts)
800         self.pg_enable_capture(self.pg_interfaces)
801         self.pg_start()
802
803         # Client side - simulate ICMP type 11 response
804         capture = self.pg0.get_capture(len(pkts))
805         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
806                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
807                 ICMP(type=11) / packet[IP] for packet in capture]
808         self.pg0.add_stream(pkts)
809         self.pg_enable_capture(self.pg_interfaces)
810         self.pg_start()
811
812         # Server side - verify ICMP type 11 packets
813         capture = self.pg1.get_capture(len(pkts))
814         self.verify_capture_out_with_icmp_errors(capture)
815
816     def test_ping_out_interface_from_outside(self):
817         """ Ping NAT44 out interface from outside network """
818
819         self.nat44_add_address(self.nat_addr)
820         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
821         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
822                                                   is_inside=0)
823
824         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
825              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
826              ICMP(id=self.icmp_id_out, type='echo-request'))
827         pkts = [p]
828         self.pg1.add_stream(pkts)
829         self.pg_enable_capture(self.pg_interfaces)
830         self.pg_start()
831         capture = self.pg1.get_capture(len(pkts))
832         self.assertEqual(1, len(capture))
833         packet = capture[0]
834         try:
835             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
836             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
837             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
838             self.assertEqual(packet[ICMP].type, 0)  # echo reply
839         except:
840             self.logger.error(ppp("Unexpected or invalid packet "
841                                   "(outside network):", packet))
842             raise
843
844     def test_ping_internal_host_from_outside(self):
845         """ Ping internal host from outside network """
846
847         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
848         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
849         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
850                                                   is_inside=0)
851
852         # out2in
853         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
854                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
855                ICMP(id=self.icmp_id_out, type='echo-request'))
856         self.pg1.add_stream(pkt)
857         self.pg_enable_capture(self.pg_interfaces)
858         self.pg_start()
859         capture = self.pg0.get_capture(1)
860         self.verify_capture_in(capture, self.pg0, packet_num=1)
861         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
862
863         # in2out
864         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
865                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
866                ICMP(id=self.icmp_id_in, type='echo-reply'))
867         self.pg0.add_stream(pkt)
868         self.pg_enable_capture(self.pg_interfaces)
869         self.pg_start()
870         capture = self.pg1.get_capture(1)
871         self.verify_capture_out(capture, same_port=True, packet_num=1)
872         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
873
874     def test_static_in(self):
875         """ 1:1 NAT initialized from inside network """
876
877         nat_ip = "10.0.0.10"
878         self.tcp_port_out = 6303
879         self.udp_port_out = 6304
880         self.icmp_id_out = 6305
881
882         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
883         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
884         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
885                                                   is_inside=0)
886
887         # in2out
888         pkts = self.create_stream_in(self.pg0, self.pg1)
889         self.pg0.add_stream(pkts)
890         self.pg_enable_capture(self.pg_interfaces)
891         self.pg_start()
892         capture = self.pg1.get_capture(len(pkts))
893         self.verify_capture_out(capture, nat_ip, True)
894
895         # out2in
896         pkts = self.create_stream_out(self.pg1, nat_ip)
897         self.pg1.add_stream(pkts)
898         self.pg_enable_capture(self.pg_interfaces)
899         self.pg_start()
900         capture = self.pg0.get_capture(len(pkts))
901         self.verify_capture_in(capture, self.pg0)
902
903     def test_static_out(self):
904         """ 1:1 NAT initialized from outside network """
905
906         nat_ip = "10.0.0.20"
907         self.tcp_port_out = 6303
908         self.udp_port_out = 6304
909         self.icmp_id_out = 6305
910
911         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
912         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
913         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
914                                                   is_inside=0)
915
916         # out2in
917         pkts = self.create_stream_out(self.pg1, nat_ip)
918         self.pg1.add_stream(pkts)
919         self.pg_enable_capture(self.pg_interfaces)
920         self.pg_start()
921         capture = self.pg0.get_capture(len(pkts))
922         self.verify_capture_in(capture, self.pg0)
923
924         # in2out
925         pkts = self.create_stream_in(self.pg0, self.pg1)
926         self.pg0.add_stream(pkts)
927         self.pg_enable_capture(self.pg_interfaces)
928         self.pg_start()
929         capture = self.pg1.get_capture(len(pkts))
930         self.verify_capture_out(capture, nat_ip, True)
931
932     def test_static_with_port_in(self):
933         """ 1:1 NAPT initialized from inside network """
934
935         self.tcp_port_out = 3606
936         self.udp_port_out = 3607
937         self.icmp_id_out = 3608
938
939         self.nat44_add_address(self.nat_addr)
940         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
941                                       self.tcp_port_in, self.tcp_port_out,
942                                       proto=IP_PROTOS.tcp)
943         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
944                                       self.udp_port_in, self.udp_port_out,
945                                       proto=IP_PROTOS.udp)
946         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
947                                       self.icmp_id_in, self.icmp_id_out,
948                                       proto=IP_PROTOS.icmp)
949         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
950         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
951                                                   is_inside=0)
952
953         # in2out
954         pkts = self.create_stream_in(self.pg0, self.pg1)
955         self.pg0.add_stream(pkts)
956         self.pg_enable_capture(self.pg_interfaces)
957         self.pg_start()
958         capture = self.pg1.get_capture(len(pkts))
959         self.verify_capture_out(capture)
960
961         # out2in
962         pkts = self.create_stream_out(self.pg1)
963         self.pg1.add_stream(pkts)
964         self.pg_enable_capture(self.pg_interfaces)
965         self.pg_start()
966         capture = self.pg0.get_capture(len(pkts))
967         self.verify_capture_in(capture, self.pg0)
968
969     def test_static_with_port_out(self):
970         """ 1:1 NAPT initialized from outside network """
971
972         self.tcp_port_out = 30606
973         self.udp_port_out = 30607
974         self.icmp_id_out = 30608
975
976         self.nat44_add_address(self.nat_addr)
977         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
978                                       self.tcp_port_in, self.tcp_port_out,
979                                       proto=IP_PROTOS.tcp)
980         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
981                                       self.udp_port_in, self.udp_port_out,
982                                       proto=IP_PROTOS.udp)
983         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
984                                       self.icmp_id_in, self.icmp_id_out,
985                                       proto=IP_PROTOS.icmp)
986         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
987         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
988                                                   is_inside=0)
989
990         # out2in
991         pkts = self.create_stream_out(self.pg1)
992         self.pg1.add_stream(pkts)
993         self.pg_enable_capture(self.pg_interfaces)
994         self.pg_start()
995         capture = self.pg0.get_capture(len(pkts))
996         self.verify_capture_in(capture, self.pg0)
997
998         # in2out
999         pkts = self.create_stream_in(self.pg0, self.pg1)
1000         self.pg0.add_stream(pkts)
1001         self.pg_enable_capture(self.pg_interfaces)
1002         self.pg_start()
1003         capture = self.pg1.get_capture(len(pkts))
1004         self.verify_capture_out(capture)
1005
1006     def test_static_vrf_aware(self):
1007         """ 1:1 NAT VRF awareness """
1008
1009         nat_ip1 = "10.0.0.30"
1010         nat_ip2 = "10.0.0.40"
1011         self.tcp_port_out = 6303
1012         self.udp_port_out = 6304
1013         self.icmp_id_out = 6305
1014
1015         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1016                                       vrf_id=10)
1017         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1018                                       vrf_id=10)
1019         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1020                                                   is_inside=0)
1021         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1022         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1023
1024         # inside interface VRF match NAT44 static mapping VRF
1025         pkts = self.create_stream_in(self.pg4, self.pg3)
1026         self.pg4.add_stream(pkts)
1027         self.pg_enable_capture(self.pg_interfaces)
1028         self.pg_start()
1029         capture = self.pg3.get_capture(len(pkts))
1030         self.verify_capture_out(capture, nat_ip1, True)
1031
1032         # inside interface VRF don't match NAT44 static mapping VRF (packets
1033         # are dropped)
1034         pkts = self.create_stream_in(self.pg0, self.pg3)
1035         self.pg0.add_stream(pkts)
1036         self.pg_enable_capture(self.pg_interfaces)
1037         self.pg_start()
1038         self.pg3.assert_nothing_captured()
1039
1040     def test_multiple_inside_interfaces(self):
1041         """ NAT44 multiple non-overlapping address space inside interfaces """
1042
1043         self.nat44_add_address(self.nat_addr)
1044         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1045         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1046         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1047                                                   is_inside=0)
1048
1049         # between two NAT44 inside interfaces (no translation)
1050         pkts = self.create_stream_in(self.pg0, self.pg1)
1051         self.pg0.add_stream(pkts)
1052         self.pg_enable_capture(self.pg_interfaces)
1053         self.pg_start()
1054         capture = self.pg1.get_capture(len(pkts))
1055         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1056
1057         # from NAT44 inside to interface without NAT44 feature (no translation)
1058         pkts = self.create_stream_in(self.pg0, self.pg2)
1059         self.pg0.add_stream(pkts)
1060         self.pg_enable_capture(self.pg_interfaces)
1061         self.pg_start()
1062         capture = self.pg2.get_capture(len(pkts))
1063         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1064
1065         # in2out 1st interface
1066         pkts = self.create_stream_in(self.pg0, self.pg3)
1067         self.pg0.add_stream(pkts)
1068         self.pg_enable_capture(self.pg_interfaces)
1069         self.pg_start()
1070         capture = self.pg3.get_capture(len(pkts))
1071         self.verify_capture_out(capture)
1072
1073         # out2in 1st interface
1074         pkts = self.create_stream_out(self.pg3)
1075         self.pg3.add_stream(pkts)
1076         self.pg_enable_capture(self.pg_interfaces)
1077         self.pg_start()
1078         capture = self.pg0.get_capture(len(pkts))
1079         self.verify_capture_in(capture, self.pg0)
1080
1081         # in2out 2nd interface
1082         pkts = self.create_stream_in(self.pg1, self.pg3)
1083         self.pg1.add_stream(pkts)
1084         self.pg_enable_capture(self.pg_interfaces)
1085         self.pg_start()
1086         capture = self.pg3.get_capture(len(pkts))
1087         self.verify_capture_out(capture)
1088
1089         # out2in 2nd interface
1090         pkts = self.create_stream_out(self.pg3)
1091         self.pg3.add_stream(pkts)
1092         self.pg_enable_capture(self.pg_interfaces)
1093         self.pg_start()
1094         capture = self.pg1.get_capture(len(pkts))
1095         self.verify_capture_in(capture, self.pg1)
1096
1097     def test_inside_overlapping_interfaces(self):
1098         """ NAT44 multiple inside interfaces with overlapping address space """
1099
1100         static_nat_ip = "10.0.0.10"
1101         self.nat44_add_address(self.nat_addr)
1102         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1103                                                   is_inside=0)
1104         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1105         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1106         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1107         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1108                                       vrf_id=20)
1109
1110         # between NAT44 inside interfaces with same VRF (no translation)
1111         pkts = self.create_stream_in(self.pg4, self.pg5)
1112         self.pg4.add_stream(pkts)
1113         self.pg_enable_capture(self.pg_interfaces)
1114         self.pg_start()
1115         capture = self.pg5.get_capture(len(pkts))
1116         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1117
1118         # between NAT44 inside interfaces with different VRF (hairpinning)
1119         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1120              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1121              TCP(sport=1234, dport=5678))
1122         self.pg4.add_stream(p)
1123         self.pg_enable_capture(self.pg_interfaces)
1124         self.pg_start()
1125         capture = self.pg6.get_capture(1)
1126         p = capture[0]
1127         try:
1128             ip = p[IP]
1129             tcp = p[TCP]
1130             self.assertEqual(ip.src, self.nat_addr)
1131             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1132             self.assertNotEqual(tcp.sport, 1234)
1133             self.assertEqual(tcp.dport, 5678)
1134         except:
1135             self.logger.error(ppp("Unexpected or invalid packet:", p))
1136             raise
1137
1138         # in2out 1st interface
1139         pkts = self.create_stream_in(self.pg4, self.pg3)
1140         self.pg4.add_stream(pkts)
1141         self.pg_enable_capture(self.pg_interfaces)
1142         self.pg_start()
1143         capture = self.pg3.get_capture(len(pkts))
1144         self.verify_capture_out(capture)
1145
1146         # out2in 1st interface
1147         pkts = self.create_stream_out(self.pg3)
1148         self.pg3.add_stream(pkts)
1149         self.pg_enable_capture(self.pg_interfaces)
1150         self.pg_start()
1151         capture = self.pg4.get_capture(len(pkts))
1152         self.verify_capture_in(capture, self.pg4)
1153
1154         # in2out 2nd interface
1155         pkts = self.create_stream_in(self.pg5, self.pg3)
1156         self.pg5.add_stream(pkts)
1157         self.pg_enable_capture(self.pg_interfaces)
1158         self.pg_start()
1159         capture = self.pg3.get_capture(len(pkts))
1160         self.verify_capture_out(capture)
1161
1162         # out2in 2nd interface
1163         pkts = self.create_stream_out(self.pg3)
1164         self.pg3.add_stream(pkts)
1165         self.pg_enable_capture(self.pg_interfaces)
1166         self.pg_start()
1167         capture = self.pg5.get_capture(len(pkts))
1168         self.verify_capture_in(capture, self.pg5)
1169
1170         # pg5 session dump
1171         addresses = self.vapi.nat44_address_dump()
1172         self.assertEqual(len(addresses), 1)
1173         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1174         self.assertEqual(len(sessions), 3)
1175         for session in sessions:
1176             self.assertFalse(session.is_static)
1177             self.assertEqual(session.inside_ip_address[0:4],
1178                              self.pg5.remote_ip4n)
1179             self.assertEqual(session.outside_ip_address,
1180                              addresses[0].ip_address)
1181         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1182         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1183         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1184         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1185         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1186         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1187         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1188         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1189         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1190
1191         # in2out 3rd interface
1192         pkts = self.create_stream_in(self.pg6, self.pg3)
1193         self.pg6.add_stream(pkts)
1194         self.pg_enable_capture(self.pg_interfaces)
1195         self.pg_start()
1196         capture = self.pg3.get_capture(len(pkts))
1197         self.verify_capture_out(capture, static_nat_ip, True)
1198
1199         # out2in 3rd interface
1200         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1201         self.pg3.add_stream(pkts)
1202         self.pg_enable_capture(self.pg_interfaces)
1203         self.pg_start()
1204         capture = self.pg6.get_capture(len(pkts))
1205         self.verify_capture_in(capture, self.pg6)
1206
1207         # general user and session dump verifications
1208         users = self.vapi.nat44_user_dump()
1209         self.assertTrue(len(users) >= 3)
1210         addresses = self.vapi.nat44_address_dump()
1211         self.assertEqual(len(addresses), 1)
1212         for user in users:
1213             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1214                                                          user.vrf_id)
1215             for session in sessions:
1216                 self.assertEqual(user.ip_address, session.inside_ip_address)
1217                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1218                 self.assertTrue(session.protocol in
1219                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1220                                  IP_PROTOS.icmp])
1221
1222         # pg4 session dump
1223         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1224         self.assertTrue(len(sessions) >= 4)
1225         for session in sessions:
1226             self.assertFalse(session.is_static)
1227             self.assertEqual(session.inside_ip_address[0:4],
1228                              self.pg4.remote_ip4n)
1229             self.assertEqual(session.outside_ip_address,
1230                              addresses[0].ip_address)
1231
1232         # pg6 session dump
1233         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1234         self.assertTrue(len(sessions) >= 3)
1235         for session in sessions:
1236             self.assertTrue(session.is_static)
1237             self.assertEqual(session.inside_ip_address[0:4],
1238                              self.pg6.remote_ip4n)
1239             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1240                              map(int, static_nat_ip.split('.')))
1241             self.assertTrue(session.inside_port in
1242                             [self.tcp_port_in, self.udp_port_in,
1243                              self.icmp_id_in])
1244
1245     def test_hairpinning(self):
1246         """ NAT44 hairpinning - 1:1 NAPT """
1247
1248         host = self.pg0.remote_hosts[0]
1249         server = self.pg0.remote_hosts[1]
1250         host_in_port = 1234
1251         host_out_port = 0
1252         server_in_port = 5678
1253         server_out_port = 8765
1254
1255         self.nat44_add_address(self.nat_addr)
1256         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1257         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1258                                                   is_inside=0)
1259         # add static mapping for server
1260         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1261                                       server_in_port, server_out_port,
1262                                       proto=IP_PROTOS.tcp)
1263
1264         # send packet from host to server
1265         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1266              IP(src=host.ip4, dst=self.nat_addr) /
1267              TCP(sport=host_in_port, dport=server_out_port))
1268         self.pg0.add_stream(p)
1269         self.pg_enable_capture(self.pg_interfaces)
1270         self.pg_start()
1271         capture = self.pg0.get_capture(1)
1272         p = capture[0]
1273         try:
1274             ip = p[IP]
1275             tcp = p[TCP]
1276             self.assertEqual(ip.src, self.nat_addr)
1277             self.assertEqual(ip.dst, server.ip4)
1278             self.assertNotEqual(tcp.sport, host_in_port)
1279             self.assertEqual(tcp.dport, server_in_port)
1280             self.check_tcp_checksum(p)
1281             host_out_port = tcp.sport
1282         except:
1283             self.logger.error(ppp("Unexpected or invalid packet:", p))
1284             raise
1285
1286         # send reply from server to host
1287         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1288              IP(src=server.ip4, dst=self.nat_addr) /
1289              TCP(sport=server_in_port, dport=host_out_port))
1290         self.pg0.add_stream(p)
1291         self.pg_enable_capture(self.pg_interfaces)
1292         self.pg_start()
1293         capture = self.pg0.get_capture(1)
1294         p = capture[0]
1295         try:
1296             ip = p[IP]
1297             tcp = p[TCP]
1298             self.assertEqual(ip.src, self.nat_addr)
1299             self.assertEqual(ip.dst, host.ip4)
1300             self.assertEqual(tcp.sport, server_out_port)
1301             self.assertEqual(tcp.dport, host_in_port)
1302             self.check_tcp_checksum(p)
1303         except:
1304             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1305             raise
1306
1307     def test_hairpinning2(self):
1308         """ NAT44 hairpinning - 1:1 NAT"""
1309
1310         server1_nat_ip = "10.0.0.10"
1311         server2_nat_ip = "10.0.0.11"
1312         host = self.pg0.remote_hosts[0]
1313         server1 = self.pg0.remote_hosts[1]
1314         server2 = self.pg0.remote_hosts[2]
1315         server_tcp_port = 22
1316         server_udp_port = 20
1317
1318         self.nat44_add_address(self.nat_addr)
1319         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1320         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1321                                                   is_inside=0)
1322
1323         # add static mapping for servers
1324         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1325         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1326
1327         # host to server1
1328         pkts = []
1329         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1330              IP(src=host.ip4, dst=server1_nat_ip) /
1331              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1332         pkts.append(p)
1333         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1334              IP(src=host.ip4, dst=server1_nat_ip) /
1335              UDP(sport=self.udp_port_in, dport=server_udp_port))
1336         pkts.append(p)
1337         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1338              IP(src=host.ip4, dst=server1_nat_ip) /
1339              ICMP(id=self.icmp_id_in, type='echo-request'))
1340         pkts.append(p)
1341         self.pg0.add_stream(pkts)
1342         self.pg_enable_capture(self.pg_interfaces)
1343         self.pg_start()
1344         capture = self.pg0.get_capture(len(pkts))
1345         for packet in capture:
1346             try:
1347                 self.assertEqual(packet[IP].src, self.nat_addr)
1348                 self.assertEqual(packet[IP].dst, server1.ip4)
1349                 if packet.haslayer(TCP):
1350                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1351                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1352                     self.tcp_port_out = packet[TCP].sport
1353                     self.check_tcp_checksum(packet)
1354                 elif packet.haslayer(UDP):
1355                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1356                     self.assertEqual(packet[UDP].dport, server_udp_port)
1357                     self.udp_port_out = packet[UDP].sport
1358                 else:
1359                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1360                     self.icmp_id_out = packet[ICMP].id
1361             except:
1362                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1363                 raise
1364
1365         # server1 to host
1366         pkts = []
1367         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1368              IP(src=server1.ip4, dst=self.nat_addr) /
1369              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1370         pkts.append(p)
1371         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1372              IP(src=server1.ip4, dst=self.nat_addr) /
1373              UDP(sport=server_udp_port, dport=self.udp_port_out))
1374         pkts.append(p)
1375         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1376              IP(src=server1.ip4, dst=self.nat_addr) /
1377              ICMP(id=self.icmp_id_out, type='echo-reply'))
1378         pkts.append(p)
1379         self.pg0.add_stream(pkts)
1380         self.pg_enable_capture(self.pg_interfaces)
1381         self.pg_start()
1382         capture = self.pg0.get_capture(len(pkts))
1383         for packet in capture:
1384             try:
1385                 self.assertEqual(packet[IP].src, server1_nat_ip)
1386                 self.assertEqual(packet[IP].dst, host.ip4)
1387                 if packet.haslayer(TCP):
1388                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1389                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1390                     self.check_tcp_checksum(packet)
1391                 elif packet.haslayer(UDP):
1392                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1393                     self.assertEqual(packet[UDP].sport, server_udp_port)
1394                 else:
1395                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1396             except:
1397                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1398                 raise
1399
1400         # server2 to server1
1401         pkts = []
1402         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1403              IP(src=server2.ip4, dst=server1_nat_ip) /
1404              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1405         pkts.append(p)
1406         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1407              IP(src=server2.ip4, dst=server1_nat_ip) /
1408              UDP(sport=self.udp_port_in, dport=server_udp_port))
1409         pkts.append(p)
1410         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1411              IP(src=server2.ip4, dst=server1_nat_ip) /
1412              ICMP(id=self.icmp_id_in, type='echo-request'))
1413         pkts.append(p)
1414         self.pg0.add_stream(pkts)
1415         self.pg_enable_capture(self.pg_interfaces)
1416         self.pg_start()
1417         capture = self.pg0.get_capture(len(pkts))
1418         for packet in capture:
1419             try:
1420                 self.assertEqual(packet[IP].src, server2_nat_ip)
1421                 self.assertEqual(packet[IP].dst, server1.ip4)
1422                 if packet.haslayer(TCP):
1423                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1424                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1425                     self.tcp_port_out = packet[TCP].sport
1426                     self.check_tcp_checksum(packet)
1427                 elif packet.haslayer(UDP):
1428                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1429                     self.assertEqual(packet[UDP].dport, server_udp_port)
1430                     self.udp_port_out = packet[UDP].sport
1431                 else:
1432                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1433                     self.icmp_id_out = packet[ICMP].id
1434             except:
1435                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1436                 raise
1437
1438         # server1 to server2
1439         pkts = []
1440         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1441              IP(src=server1.ip4, dst=server2_nat_ip) /
1442              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1443         pkts.append(p)
1444         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1445              IP(src=server1.ip4, dst=server2_nat_ip) /
1446              UDP(sport=server_udp_port, dport=self.udp_port_out))
1447         pkts.append(p)
1448         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1449              IP(src=server1.ip4, dst=server2_nat_ip) /
1450              ICMP(id=self.icmp_id_out, type='echo-reply'))
1451         pkts.append(p)
1452         self.pg0.add_stream(pkts)
1453         self.pg_enable_capture(self.pg_interfaces)
1454         self.pg_start()
1455         capture = self.pg0.get_capture(len(pkts))
1456         for packet in capture:
1457             try:
1458                 self.assertEqual(packet[IP].src, server1_nat_ip)
1459                 self.assertEqual(packet[IP].dst, server2.ip4)
1460                 if packet.haslayer(TCP):
1461                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1462                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1463                     self.check_tcp_checksum(packet)
1464                 elif packet.haslayer(UDP):
1465                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1466                     self.assertEqual(packet[UDP].sport, server_udp_port)
1467                 else:
1468                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1469             except:
1470                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1471                 raise
1472
1473     def test_max_translations_per_user(self):
1474         """ MAX translations per user - recycle the least recently used """
1475
1476         self.nat44_add_address(self.nat_addr)
1477         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1478         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1479                                                   is_inside=0)
1480
1481         # get maximum number of translations per user
1482         nat44_config = self.vapi.nat_show_config()
1483
1484         # send more than maximum number of translations per user packets
1485         pkts_num = nat44_config.max_translations_per_user + 5
1486         pkts = []
1487         for port in range(0, pkts_num):
1488             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1489                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1490                  TCP(sport=1025 + port))
1491             pkts.append(p)
1492         self.pg0.add_stream(pkts)
1493         self.pg_enable_capture(self.pg_interfaces)
1494         self.pg_start()
1495
1496         # verify number of translated packet
1497         self.pg1.get_capture(pkts_num)
1498
1499     def test_interface_addr(self):
1500         """ Acquire NAT44 addresses from interface """
1501         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1502
1503         # no address in NAT pool
1504         adresses = self.vapi.nat44_address_dump()
1505         self.assertEqual(0, len(adresses))
1506
1507         # configure interface address and check NAT address pool
1508         self.pg7.config_ip4()
1509         adresses = self.vapi.nat44_address_dump()
1510         self.assertEqual(1, len(adresses))
1511         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1512
1513         # remove interface address and check NAT address pool
1514         self.pg7.unconfig_ip4()
1515         adresses = self.vapi.nat44_address_dump()
1516         self.assertEqual(0, len(adresses))
1517
1518     def test_interface_addr_static_mapping(self):
1519         """ Static mapping with addresses from interface """
1520         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1521         self.nat44_add_static_mapping(
1522             '1.2.3.4',
1523             external_sw_if_index=self.pg7.sw_if_index)
1524
1525         # static mappings with external interface
1526         static_mappings = self.vapi.nat44_static_mapping_dump()
1527         self.assertEqual(1, len(static_mappings))
1528         self.assertEqual(self.pg7.sw_if_index,
1529                          static_mappings[0].external_sw_if_index)
1530
1531         # configure interface address and check static mappings
1532         self.pg7.config_ip4()
1533         static_mappings = self.vapi.nat44_static_mapping_dump()
1534         self.assertEqual(1, len(static_mappings))
1535         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1536                          self.pg7.local_ip4n)
1537         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1538
1539         # remove interface address and check static mappings
1540         self.pg7.unconfig_ip4()
1541         static_mappings = self.vapi.nat44_static_mapping_dump()
1542         self.assertEqual(0, len(static_mappings))
1543
1544     def test_ipfix_nat44_sess(self):
1545         """ IPFIX logging NAT44 session created/delted """
1546         self.ipfix_domain_id = 10
1547         self.ipfix_src_port = 20202
1548         colector_port = 30303
1549         bind_layers(UDP, IPFIX, dport=30303)
1550         self.nat44_add_address(self.nat_addr)
1551         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1552         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1553                                                   is_inside=0)
1554         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1555                                      src_address=self.pg3.local_ip4n,
1556                                      path_mtu=512,
1557                                      template_interval=10,
1558                                      collector_port=colector_port)
1559         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1560                             src_port=self.ipfix_src_port)
1561
1562         pkts = self.create_stream_in(self.pg0, self.pg1)
1563         self.pg0.add_stream(pkts)
1564         self.pg_enable_capture(self.pg_interfaces)
1565         self.pg_start()
1566         capture = self.pg1.get_capture(len(pkts))
1567         self.verify_capture_out(capture)
1568         self.nat44_add_address(self.nat_addr, is_add=0)
1569         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1570         capture = self.pg3.get_capture(3)
1571         ipfix = IPFIXDecoder()
1572         # first load template
1573         for p in capture:
1574             self.assertTrue(p.haslayer(IPFIX))
1575             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1576             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1577             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1578             self.assertEqual(p[UDP].dport, colector_port)
1579             self.assertEqual(p[IPFIX].observationDomainID,
1580                              self.ipfix_domain_id)
1581             if p.haslayer(Template):
1582                 ipfix.add_template(p.getlayer(Template))
1583         # verify events in data set
1584         for p in capture:
1585             if p.haslayer(Data):
1586                 data = ipfix.decode_data_set(p.getlayer(Set))
1587                 self.verify_ipfix_nat44_ses(data)
1588
1589     def test_ipfix_addr_exhausted(self):
1590         """ IPFIX logging NAT addresses exhausted """
1591         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1592         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1593                                                   is_inside=0)
1594         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1595                                      src_address=self.pg3.local_ip4n,
1596                                      path_mtu=512,
1597                                      template_interval=10)
1598         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1599                             src_port=self.ipfix_src_port)
1600
1601         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1602              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1603              TCP(sport=3025))
1604         self.pg0.add_stream(p)
1605         self.pg_enable_capture(self.pg_interfaces)
1606         self.pg_start()
1607         capture = self.pg1.get_capture(0)
1608         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1609         capture = self.pg3.get_capture(3)
1610         ipfix = IPFIXDecoder()
1611         # first load template
1612         for p in capture:
1613             self.assertTrue(p.haslayer(IPFIX))
1614             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1615             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1616             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1617             self.assertEqual(p[UDP].dport, 4739)
1618             self.assertEqual(p[IPFIX].observationDomainID,
1619                              self.ipfix_domain_id)
1620             if p.haslayer(Template):
1621                 ipfix.add_template(p.getlayer(Template))
1622         # verify events in data set
1623         for p in capture:
1624             if p.haslayer(Data):
1625                 data = ipfix.decode_data_set(p.getlayer(Set))
1626                 self.verify_ipfix_addr_exhausted(data)
1627
1628     def test_pool_addr_fib(self):
1629         """ NAT44 add pool addresses to FIB """
1630         static_addr = '10.0.0.10'
1631         self.nat44_add_address(self.nat_addr)
1632         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1633         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1634                                                   is_inside=0)
1635         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1636
1637         # NAT44 address
1638         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1639              ARP(op=ARP.who_has, pdst=self.nat_addr,
1640                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1641         self.pg1.add_stream(p)
1642         self.pg_enable_capture(self.pg_interfaces)
1643         self.pg_start()
1644         capture = self.pg1.get_capture(1)
1645         self.assertTrue(capture[0].haslayer(ARP))
1646         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1647
1648         # 1:1 NAT address
1649         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1650              ARP(op=ARP.who_has, pdst=static_addr,
1651                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1652         self.pg1.add_stream(p)
1653         self.pg_enable_capture(self.pg_interfaces)
1654         self.pg_start()
1655         capture = self.pg1.get_capture(1)
1656         self.assertTrue(capture[0].haslayer(ARP))
1657         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1658
1659         # send ARP to non-NAT44 interface
1660         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1661              ARP(op=ARP.who_has, pdst=self.nat_addr,
1662                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1663         self.pg2.add_stream(p)
1664         self.pg_enable_capture(self.pg_interfaces)
1665         self.pg_start()
1666         capture = self.pg1.get_capture(0)
1667
1668         # remove addresses and verify
1669         self.nat44_add_address(self.nat_addr, is_add=0)
1670         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1671                                       is_add=0)
1672
1673         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1674              ARP(op=ARP.who_has, pdst=self.nat_addr,
1675                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1676         self.pg1.add_stream(p)
1677         self.pg_enable_capture(self.pg_interfaces)
1678         self.pg_start()
1679         capture = self.pg1.get_capture(0)
1680
1681         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1682              ARP(op=ARP.who_has, pdst=static_addr,
1683                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1684         self.pg1.add_stream(p)
1685         self.pg_enable_capture(self.pg_interfaces)
1686         self.pg_start()
1687         capture = self.pg1.get_capture(0)
1688
1689     def test_vrf_mode(self):
1690         """ NAT44 tenant VRF aware address pool mode """
1691
1692         vrf_id1 = 1
1693         vrf_id2 = 2
1694         nat_ip1 = "10.0.0.10"
1695         nat_ip2 = "10.0.0.11"
1696
1697         self.pg0.unconfig_ip4()
1698         self.pg1.unconfig_ip4()
1699         self.pg0.set_table_ip4(vrf_id1)
1700         self.pg1.set_table_ip4(vrf_id2)
1701         self.pg0.config_ip4()
1702         self.pg1.config_ip4()
1703
1704         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1705         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1706         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1707         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1708         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1709                                                   is_inside=0)
1710
1711         # first VRF
1712         pkts = self.create_stream_in(self.pg0, self.pg2)
1713         self.pg0.add_stream(pkts)
1714         self.pg_enable_capture(self.pg_interfaces)
1715         self.pg_start()
1716         capture = self.pg2.get_capture(len(pkts))
1717         self.verify_capture_out(capture, nat_ip1)
1718
1719         # second VRF
1720         pkts = self.create_stream_in(self.pg1, self.pg2)
1721         self.pg1.add_stream(pkts)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         capture = self.pg2.get_capture(len(pkts))
1725         self.verify_capture_out(capture, nat_ip2)
1726
1727     def test_vrf_feature_independent(self):
1728         """ NAT44 tenant VRF independent address pool mode """
1729
1730         nat_ip1 = "10.0.0.10"
1731         nat_ip2 = "10.0.0.11"
1732
1733         self.nat44_add_address(nat_ip1)
1734         self.nat44_add_address(nat_ip2)
1735         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1736         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1737         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1738                                                   is_inside=0)
1739
1740         # first VRF
1741         pkts = self.create_stream_in(self.pg0, self.pg2)
1742         self.pg0.add_stream(pkts)
1743         self.pg_enable_capture(self.pg_interfaces)
1744         self.pg_start()
1745         capture = self.pg2.get_capture(len(pkts))
1746         self.verify_capture_out(capture, nat_ip1)
1747
1748         # second VRF
1749         pkts = self.create_stream_in(self.pg1, self.pg2)
1750         self.pg1.add_stream(pkts)
1751         self.pg_enable_capture(self.pg_interfaces)
1752         self.pg_start()
1753         capture = self.pg2.get_capture(len(pkts))
1754         self.verify_capture_out(capture, nat_ip1)
1755
1756     def test_dynamic_ipless_interfaces(self):
1757         """ NAT44 interfaces without configured IP address """
1758
1759         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1760                                       self.pg7.remote_mac,
1761                                       self.pg7.remote_ip4n,
1762                                       is_static=1)
1763         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1764                                       self.pg8.remote_mac,
1765                                       self.pg8.remote_ip4n,
1766                                       is_static=1)
1767
1768         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1769                                    dst_address_length=32,
1770                                    next_hop_address=self.pg7.remote_ip4n,
1771                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1772         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1773                                    dst_address_length=32,
1774                                    next_hop_address=self.pg8.remote_ip4n,
1775                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1776
1777         self.nat44_add_address(self.nat_addr)
1778         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1779         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1780                                                   is_inside=0)
1781
1782         # in2out
1783         pkts = self.create_stream_in(self.pg7, self.pg8)
1784         self.pg7.add_stream(pkts)
1785         self.pg_enable_capture(self.pg_interfaces)
1786         self.pg_start()
1787         capture = self.pg8.get_capture(len(pkts))
1788         self.verify_capture_out(capture)
1789
1790         # out2in
1791         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1792         self.pg8.add_stream(pkts)
1793         self.pg_enable_capture(self.pg_interfaces)
1794         self.pg_start()
1795         capture = self.pg7.get_capture(len(pkts))
1796         self.verify_capture_in(capture, self.pg7)
1797
1798     def test_static_ipless_interfaces(self):
1799         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1800
1801         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1802                                       self.pg7.remote_mac,
1803                                       self.pg7.remote_ip4n,
1804                                       is_static=1)
1805         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1806                                       self.pg8.remote_mac,
1807                                       self.pg8.remote_ip4n,
1808                                       is_static=1)
1809
1810         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1811                                    dst_address_length=32,
1812                                    next_hop_address=self.pg7.remote_ip4n,
1813                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1814         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1815                                    dst_address_length=32,
1816                                    next_hop_address=self.pg8.remote_ip4n,
1817                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1818
1819         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1820         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1821         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1822                                                   is_inside=0)
1823
1824         # out2in
1825         pkts = self.create_stream_out(self.pg8)
1826         self.pg8.add_stream(pkts)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg7.get_capture(len(pkts))
1830         self.verify_capture_in(capture, self.pg7)
1831
1832         # in2out
1833         pkts = self.create_stream_in(self.pg7, self.pg8)
1834         self.pg7.add_stream(pkts)
1835         self.pg_enable_capture(self.pg_interfaces)
1836         self.pg_start()
1837         capture = self.pg8.get_capture(len(pkts))
1838         self.verify_capture_out(capture, self.nat_addr, True)
1839
1840     def test_static_with_port_ipless_interfaces(self):
1841         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1842
1843         self.tcp_port_out = 30606
1844         self.udp_port_out = 30607
1845         self.icmp_id_out = 30608
1846
1847         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1848                                       self.pg7.remote_mac,
1849                                       self.pg7.remote_ip4n,
1850                                       is_static=1)
1851         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1852                                       self.pg8.remote_mac,
1853                                       self.pg8.remote_ip4n,
1854                                       is_static=1)
1855
1856         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1857                                    dst_address_length=32,
1858                                    next_hop_address=self.pg7.remote_ip4n,
1859                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1860         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1861                                    dst_address_length=32,
1862                                    next_hop_address=self.pg8.remote_ip4n,
1863                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1864
1865         self.nat44_add_address(self.nat_addr)
1866         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1867                                       self.tcp_port_in, self.tcp_port_out,
1868                                       proto=IP_PROTOS.tcp)
1869         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1870                                       self.udp_port_in, self.udp_port_out,
1871                                       proto=IP_PROTOS.udp)
1872         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1873                                       self.icmp_id_in, self.icmp_id_out,
1874                                       proto=IP_PROTOS.icmp)
1875         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1876         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1877                                                   is_inside=0)
1878
1879         # out2in
1880         pkts = self.create_stream_out(self.pg8)
1881         self.pg8.add_stream(pkts)
1882         self.pg_enable_capture(self.pg_interfaces)
1883         self.pg_start()
1884         capture = self.pg7.get_capture(len(pkts))
1885         self.verify_capture_in(capture, self.pg7)
1886
1887         # in2out
1888         pkts = self.create_stream_in(self.pg7, self.pg8)
1889         self.pg7.add_stream(pkts)
1890         self.pg_enable_capture(self.pg_interfaces)
1891         self.pg_start()
1892         capture = self.pg8.get_capture(len(pkts))
1893         self.verify_capture_out(capture)
1894
1895     def test_static_unknown_proto(self):
1896         """ 1:1 NAT translate packet with unknown protocol """
1897         nat_ip = "10.0.0.10"
1898         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1899         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1900         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1901                                                   is_inside=0)
1902
1903         # in2out
1904         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1905              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1906              GRE() /
1907              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1908              TCP(sport=1234, dport=1234))
1909         self.pg0.add_stream(p)
1910         self.pg_enable_capture(self.pg_interfaces)
1911         self.pg_start()
1912         p = self.pg1.get_capture(1)
1913         packet = p[0]
1914         try:
1915             self.assertEqual(packet[IP].src, nat_ip)
1916             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1917             self.assertTrue(packet.haslayer(GRE))
1918             self.check_ip_checksum(packet)
1919         except:
1920             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1921             raise
1922
1923         # out2in
1924         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1925              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
1926              GRE() /
1927              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1928              TCP(sport=1234, dport=1234))
1929         self.pg1.add_stream(p)
1930         self.pg_enable_capture(self.pg_interfaces)
1931         self.pg_start()
1932         p = self.pg0.get_capture(1)
1933         packet = p[0]
1934         try:
1935             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
1936             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
1937             self.assertTrue(packet.haslayer(GRE))
1938             self.check_ip_checksum(packet)
1939         except:
1940             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1941             raise
1942
1943     def test_hairpinning_static_unknown_proto(self):
1944         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
1945
1946         host = self.pg0.remote_hosts[0]
1947         server = self.pg0.remote_hosts[1]
1948
1949         host_nat_ip = "10.0.0.10"
1950         server_nat_ip = "10.0.0.11"
1951
1952         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
1953         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
1954         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1955         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1956                                                   is_inside=0)
1957
1958         # host to server
1959         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
1960              IP(src=host.ip4, dst=server_nat_ip) /
1961              GRE() /
1962              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
1963              TCP(sport=1234, dport=1234))
1964         self.pg0.add_stream(p)
1965         self.pg_enable_capture(self.pg_interfaces)
1966         self.pg_start()
1967         p = self.pg0.get_capture(1)
1968         packet = p[0]
1969         try:
1970             self.assertEqual(packet[IP].src, host_nat_ip)
1971             self.assertEqual(packet[IP].dst, server.ip4)
1972             self.assertTrue(packet.haslayer(GRE))
1973             self.check_ip_checksum(packet)
1974         except:
1975             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1976             raise
1977
1978         # server to host
1979         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
1980              IP(src=server.ip4, dst=host_nat_ip) /
1981              GRE() /
1982              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
1983              TCP(sport=1234, dport=1234))
1984         self.pg0.add_stream(p)
1985         self.pg_enable_capture(self.pg_interfaces)
1986         self.pg_start()
1987         p = self.pg0.get_capture(1)
1988         packet = p[0]
1989         try:
1990             self.assertEqual(packet[IP].src, server_nat_ip)
1991             self.assertEqual(packet[IP].dst, host.ip4)
1992             self.assertTrue(packet.haslayer(GRE))
1993             self.check_ip_checksum(packet)
1994         except:
1995             self.logger.error(ppp("Unexpected or invalid packet:", packet))
1996             raise
1997
1998     def test_unknown_proto(self):
1999         """ NAT44 translate packet with unknown protocol """
2000         self.nat44_add_address(self.nat_addr)
2001         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2002         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2003                                                   is_inside=0)
2004
2005         # in2out
2006         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2007              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2008              TCP(sport=self.tcp_port_in, dport=20))
2009         self.pg0.add_stream(p)
2010         self.pg_enable_capture(self.pg_interfaces)
2011         self.pg_start()
2012         p = self.pg1.get_capture(1)
2013
2014         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2015              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2016              GRE() /
2017              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2018              TCP(sport=1234, dport=1234))
2019         self.pg0.add_stream(p)
2020         self.pg_enable_capture(self.pg_interfaces)
2021         self.pg_start()
2022         p = self.pg1.get_capture(1)
2023         packet = p[0]
2024         try:
2025             self.assertEqual(packet[IP].src, self.nat_addr)
2026             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2027             self.assertTrue(packet.haslayer(GRE))
2028             self.check_ip_checksum(packet)
2029         except:
2030             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2031             raise
2032
2033         # out2in
2034         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2035              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2036              GRE() /
2037              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2038              TCP(sport=1234, dport=1234))
2039         self.pg1.add_stream(p)
2040         self.pg_enable_capture(self.pg_interfaces)
2041         self.pg_start()
2042         p = self.pg0.get_capture(1)
2043         packet = p[0]
2044         try:
2045             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2046             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2047             self.assertTrue(packet.haslayer(GRE))
2048             self.check_ip_checksum(packet)
2049         except:
2050             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2051             raise
2052
2053     def test_hairpinning_unknown_proto(self):
2054         """ NAT44 translate packet with unknown protocol - hairpinning """
2055         host = self.pg0.remote_hosts[0]
2056         server = self.pg0.remote_hosts[1]
2057         host_in_port = 1234
2058         host_out_port = 0
2059         server_in_port = 5678
2060         server_out_port = 8765
2061         server_nat_ip = "10.0.0.11"
2062
2063         self.nat44_add_address(self.nat_addr)
2064         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2065         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2066                                                   is_inside=0)
2067
2068         # add static mapping for server
2069         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2070
2071         # host to server
2072         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2073              IP(src=host.ip4, dst=server_nat_ip) /
2074              TCP(sport=host_in_port, dport=server_out_port))
2075         self.pg0.add_stream(p)
2076         self.pg_enable_capture(self.pg_interfaces)
2077         self.pg_start()
2078         capture = self.pg0.get_capture(1)
2079
2080         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2081              IP(src=host.ip4, dst=server_nat_ip) /
2082              GRE() /
2083              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2084              TCP(sport=1234, dport=1234))
2085         self.pg0.add_stream(p)
2086         self.pg_enable_capture(self.pg_interfaces)
2087         self.pg_start()
2088         p = self.pg0.get_capture(1)
2089         packet = p[0]
2090         try:
2091             self.assertEqual(packet[IP].src, self.nat_addr)
2092             self.assertEqual(packet[IP].dst, server.ip4)
2093             self.assertTrue(packet.haslayer(GRE))
2094             self.check_ip_checksum(packet)
2095         except:
2096             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2097             raise
2098
2099         # server to host
2100         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2101              IP(src=server.ip4, dst=self.nat_addr) /
2102              GRE() /
2103              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2104              TCP(sport=1234, dport=1234))
2105         self.pg0.add_stream(p)
2106         self.pg_enable_capture(self.pg_interfaces)
2107         self.pg_start()
2108         p = self.pg0.get_capture(1)
2109         packet = p[0]
2110         try:
2111             self.assertEqual(packet[IP].src, server_nat_ip)
2112             self.assertEqual(packet[IP].dst, host.ip4)
2113             self.assertTrue(packet.haslayer(GRE))
2114             self.check_ip_checksum(packet)
2115         except:
2116             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2117             raise
2118
2119     def test_output_feature(self):
2120         """ NAT44 interface output feature (in2out postrouting) """
2121         self.nat44_add_address(self.nat_addr)
2122         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2123         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2124                                                          is_inside=0)
2125
2126         # in2out
2127         pkts = self.create_stream_in(self.pg0, self.pg1)
2128         self.pg0.add_stream(pkts)
2129         self.pg_enable_capture(self.pg_interfaces)
2130         self.pg_start()
2131         capture = self.pg1.get_capture(len(pkts))
2132         self.verify_capture_out(capture)
2133
2134         # out2in
2135         pkts = self.create_stream_out(self.pg1)
2136         self.pg1.add_stream(pkts)
2137         self.pg_enable_capture(self.pg_interfaces)
2138         self.pg_start()
2139         capture = self.pg0.get_capture(len(pkts))
2140         self.verify_capture_in(capture, self.pg0)
2141
2142     def test_output_feature_vrf_aware(self):
2143         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2144         nat_ip_vrf10 = "10.0.0.10"
2145         nat_ip_vrf20 = "10.0.0.20"
2146
2147         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2148                                    dst_address_length=32,
2149                                    next_hop_address=self.pg3.remote_ip4n,
2150                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2151                                    table_id=10)
2152         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2153                                    dst_address_length=32,
2154                                    next_hop_address=self.pg3.remote_ip4n,
2155                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2156                                    table_id=20)
2157
2158         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2159         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2160         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2161         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2162         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2163                                                          is_inside=0)
2164
2165         # in2out VRF 10
2166         pkts = self.create_stream_in(self.pg4, self.pg3)
2167         self.pg4.add_stream(pkts)
2168         self.pg_enable_capture(self.pg_interfaces)
2169         self.pg_start()
2170         capture = self.pg3.get_capture(len(pkts))
2171         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2172
2173         # out2in VRF 10
2174         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2175         self.pg3.add_stream(pkts)
2176         self.pg_enable_capture(self.pg_interfaces)
2177         self.pg_start()
2178         capture = self.pg4.get_capture(len(pkts))
2179         self.verify_capture_in(capture, self.pg4)
2180
2181         # in2out VRF 20
2182         pkts = self.create_stream_in(self.pg6, self.pg3)
2183         self.pg6.add_stream(pkts)
2184         self.pg_enable_capture(self.pg_interfaces)
2185         self.pg_start()
2186         capture = self.pg3.get_capture(len(pkts))
2187         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2188
2189         # out2in VRF 20
2190         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2191         self.pg3.add_stream(pkts)
2192         self.pg_enable_capture(self.pg_interfaces)
2193         self.pg_start()
2194         capture = self.pg6.get_capture(len(pkts))
2195         self.verify_capture_in(capture, self.pg6)
2196
2197     def test_output_feature_hairpinning(self):
2198         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2199         host = self.pg0.remote_hosts[0]
2200         server = self.pg0.remote_hosts[1]
2201         host_in_port = 1234
2202         host_out_port = 0
2203         server_in_port = 5678
2204         server_out_port = 8765
2205
2206         self.nat44_add_address(self.nat_addr)
2207         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2208         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2209                                                          is_inside=0)
2210
2211         # add static mapping for server
2212         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2213                                       server_in_port, server_out_port,
2214                                       proto=IP_PROTOS.tcp)
2215
2216         # send packet from host to server
2217         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2218              IP(src=host.ip4, dst=self.nat_addr) /
2219              TCP(sport=host_in_port, dport=server_out_port))
2220         self.pg0.add_stream(p)
2221         self.pg_enable_capture(self.pg_interfaces)
2222         self.pg_start()
2223         capture = self.pg0.get_capture(1)
2224         p = capture[0]
2225         try:
2226             ip = p[IP]
2227             tcp = p[TCP]
2228             self.assertEqual(ip.src, self.nat_addr)
2229             self.assertEqual(ip.dst, server.ip4)
2230             self.assertNotEqual(tcp.sport, host_in_port)
2231             self.assertEqual(tcp.dport, server_in_port)
2232             self.check_tcp_checksum(p)
2233             host_out_port = tcp.sport
2234         except:
2235             self.logger.error(ppp("Unexpected or invalid packet:", p))
2236             raise
2237
2238         # send reply from server to host
2239         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2240              IP(src=server.ip4, dst=self.nat_addr) /
2241              TCP(sport=server_in_port, dport=host_out_port))
2242         self.pg0.add_stream(p)
2243         self.pg_enable_capture(self.pg_interfaces)
2244         self.pg_start()
2245         capture = self.pg0.get_capture(1)
2246         p = capture[0]
2247         try:
2248             ip = p[IP]
2249             tcp = p[TCP]
2250             self.assertEqual(ip.src, self.nat_addr)
2251             self.assertEqual(ip.dst, host.ip4)
2252             self.assertEqual(tcp.sport, server_out_port)
2253             self.assertEqual(tcp.dport, host_in_port)
2254             self.check_tcp_checksum(p)
2255         except:
2256             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2257             raise
2258
2259     def tearDown(self):
2260         super(TestNAT44, self).tearDown()
2261         if not self.vpp_dead:
2262             self.logger.info(self.vapi.cli("show nat44 verbose"))
2263             self.clear_nat44()
2264
2265
2266 class TestDeterministicNAT(MethodHolder):
2267     """ Deterministic NAT Test Cases """
2268
2269     @classmethod
2270     def setUpConstants(cls):
2271         super(TestDeterministicNAT, cls).setUpConstants()
2272         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2273
2274     @classmethod
2275     def setUpClass(cls):
2276         super(TestDeterministicNAT, cls).setUpClass()
2277
2278         try:
2279             cls.tcp_port_in = 6303
2280             cls.tcp_external_port = 6303
2281             cls.udp_port_in = 6304
2282             cls.udp_external_port = 6304
2283             cls.icmp_id_in = 6305
2284             cls.nat_addr = '10.0.0.3'
2285
2286             cls.create_pg_interfaces(range(3))
2287             cls.interfaces = list(cls.pg_interfaces)
2288
2289             for i in cls.interfaces:
2290                 i.admin_up()
2291                 i.config_ip4()
2292                 i.resolve_arp()
2293
2294             cls.pg0.generate_remote_hosts(2)
2295             cls.pg0.configure_ipv4_neighbors()
2296
2297         except Exception:
2298             super(TestDeterministicNAT, cls).tearDownClass()
2299             raise
2300
2301     def create_stream_in(self, in_if, out_if, ttl=64):
2302         """
2303         Create packet stream for inside network
2304
2305         :param in_if: Inside interface
2306         :param out_if: Outside interface
2307         :param ttl: TTL of generated packets
2308         """
2309         pkts = []
2310         # TCP
2311         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2312              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2313              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2314         pkts.append(p)
2315
2316         # UDP
2317         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2318              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2319              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2320         pkts.append(p)
2321
2322         # ICMP
2323         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2324              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2325              ICMP(id=self.icmp_id_in, type='echo-request'))
2326         pkts.append(p)
2327
2328         return pkts
2329
2330     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2331         """
2332         Create packet stream for outside network
2333
2334         :param out_if: Outside interface
2335         :param dst_ip: Destination IP address (Default use global NAT address)
2336         :param ttl: TTL of generated packets
2337         """
2338         if dst_ip is None:
2339             dst_ip = self.nat_addr
2340         pkts = []
2341         # TCP
2342         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2343              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2344              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2345         pkts.append(p)
2346
2347         # UDP
2348         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2349              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2350              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2351         pkts.append(p)
2352
2353         # ICMP
2354         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2355              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2356              ICMP(id=self.icmp_external_id, type='echo-reply'))
2357         pkts.append(p)
2358
2359         return pkts
2360
2361     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2362         """
2363         Verify captured packets on outside network
2364
2365         :param capture: Captured packets
2366         :param nat_ip: Translated IP address (Default use global NAT address)
2367         :param same_port: Sorce port number is not translated (Default False)
2368         :param packet_num: Expected number of packets (Default 3)
2369         """
2370         if nat_ip is None:
2371             nat_ip = self.nat_addr
2372         self.assertEqual(packet_num, len(capture))
2373         for packet in capture:
2374             try:
2375                 self.assertEqual(packet[IP].src, nat_ip)
2376                 if packet.haslayer(TCP):
2377                     self.tcp_port_out = packet[TCP].sport
2378                 elif packet.haslayer(UDP):
2379                     self.udp_port_out = packet[UDP].sport
2380                 else:
2381                     self.icmp_external_id = packet[ICMP].id
2382             except:
2383                 self.logger.error(ppp("Unexpected or invalid packet "
2384                                       "(outside network):", packet))
2385                 raise
2386
2387     def initiate_tcp_session(self, in_if, out_if):
2388         """
2389         Initiates TCP session
2390
2391         :param in_if: Inside interface
2392         :param out_if: Outside interface
2393         """
2394         try:
2395             # SYN packet in->out
2396             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2397                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2398                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2399                      flags="S"))
2400             in_if.add_stream(p)
2401             self.pg_enable_capture(self.pg_interfaces)
2402             self.pg_start()
2403             capture = out_if.get_capture(1)
2404             p = capture[0]
2405             self.tcp_port_out = p[TCP].sport
2406
2407             # SYN + ACK packet out->in
2408             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2409                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2410                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2411                      flags="SA"))
2412             out_if.add_stream(p)
2413             self.pg_enable_capture(self.pg_interfaces)
2414             self.pg_start()
2415             in_if.get_capture(1)
2416
2417             # ACK packet in->out
2418             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2419                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2420                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2421                      flags="A"))
2422             in_if.add_stream(p)
2423             self.pg_enable_capture(self.pg_interfaces)
2424             self.pg_start()
2425             out_if.get_capture(1)
2426
2427         except:
2428             self.logger.error("TCP 3 way handshake failed")
2429             raise
2430
2431     def verify_ipfix_max_entries_per_user(self, data):
2432         """
2433         Verify IPFIX maximum entries per user exceeded event
2434
2435         :param data: Decoded IPFIX data records
2436         """
2437         self.assertEqual(1, len(data))
2438         record = data[0]
2439         # natEvent
2440         self.assertEqual(ord(record[230]), 13)
2441         # natQuotaExceededEvent
2442         self.assertEqual('\x03\x00\x00\x00', record[466])
2443         # sourceIPv4Address
2444         self.assertEqual(self.pg0.remote_ip4n, record[8])
2445
2446     def test_deterministic_mode(self):
2447         """ NAT plugin run deterministic mode """
2448         in_addr = '172.16.255.0'
2449         out_addr = '172.17.255.50'
2450         in_addr_t = '172.16.255.20'
2451         in_addr_n = socket.inet_aton(in_addr)
2452         out_addr_n = socket.inet_aton(out_addr)
2453         in_addr_t_n = socket.inet_aton(in_addr_t)
2454         in_plen = 24
2455         out_plen = 32
2456
2457         nat_config = self.vapi.nat_show_config()
2458         self.assertEqual(1, nat_config.deterministic)
2459
2460         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2461
2462         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2463         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2464         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2465         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2466
2467         deterministic_mappings = self.vapi.nat_det_map_dump()
2468         self.assertEqual(len(deterministic_mappings), 1)
2469         dsm = deterministic_mappings[0]
2470         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2471         self.assertEqual(in_plen, dsm.in_plen)
2472         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2473         self.assertEqual(out_plen, dsm.out_plen)
2474
2475         self.clear_nat_det()
2476         deterministic_mappings = self.vapi.nat_det_map_dump()
2477         self.assertEqual(len(deterministic_mappings), 0)
2478
2479     def test_set_timeouts(self):
2480         """ Set deterministic NAT timeouts """
2481         timeouts_before = self.vapi.nat_det_get_timeouts()
2482
2483         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2484                                        timeouts_before.tcp_established + 10,
2485                                        timeouts_before.tcp_transitory + 10,
2486                                        timeouts_before.icmp + 10)
2487
2488         timeouts_after = self.vapi.nat_det_get_timeouts()
2489
2490         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2491         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2492         self.assertNotEqual(timeouts_before.tcp_established,
2493                             timeouts_after.tcp_established)
2494         self.assertNotEqual(timeouts_before.tcp_transitory,
2495                             timeouts_after.tcp_transitory)
2496
2497     def test_det_in(self):
2498         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2499
2500         nat_ip = "10.0.0.10"
2501
2502         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2503                                       32,
2504                                       socket.inet_aton(nat_ip),
2505                                       32)
2506         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2507         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2508                                                   is_inside=0)
2509
2510         # in2out
2511         pkts = self.create_stream_in(self.pg0, self.pg1)
2512         self.pg0.add_stream(pkts)
2513         self.pg_enable_capture(self.pg_interfaces)
2514         self.pg_start()
2515         capture = self.pg1.get_capture(len(pkts))
2516         self.verify_capture_out(capture, nat_ip)
2517
2518         # out2in
2519         pkts = self.create_stream_out(self.pg1, nat_ip)
2520         self.pg1.add_stream(pkts)
2521         self.pg_enable_capture(self.pg_interfaces)
2522         self.pg_start()
2523         capture = self.pg0.get_capture(len(pkts))
2524         self.verify_capture_in(capture, self.pg0)
2525
2526         # session dump test
2527         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2528         self.assertEqual(len(sessions), 3)
2529
2530         # TCP session
2531         s = sessions[0]
2532         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2533         self.assertEqual(s.in_port, self.tcp_port_in)
2534         self.assertEqual(s.out_port, self.tcp_port_out)
2535         self.assertEqual(s.ext_port, self.tcp_external_port)
2536
2537         # UDP session
2538         s = sessions[1]
2539         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2540         self.assertEqual(s.in_port, self.udp_port_in)
2541         self.assertEqual(s.out_port, self.udp_port_out)
2542         self.assertEqual(s.ext_port, self.udp_external_port)
2543
2544         # ICMP session
2545         s = sessions[2]
2546         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2547         self.assertEqual(s.in_port, self.icmp_id_in)
2548         self.assertEqual(s.out_port, self.icmp_external_id)
2549
2550     def test_multiple_users(self):
2551         """ Deterministic NAT multiple users """
2552
2553         nat_ip = "10.0.0.10"
2554         port_in = 80
2555         external_port = 6303
2556
2557         host0 = self.pg0.remote_hosts[0]
2558         host1 = self.pg0.remote_hosts[1]
2559
2560         self.vapi.nat_det_add_del_map(host0.ip4n,
2561                                       24,
2562                                       socket.inet_aton(nat_ip),
2563                                       32)
2564         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2565         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2566                                                   is_inside=0)
2567
2568         # host0 to out
2569         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2570              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2571              TCP(sport=port_in, dport=external_port))
2572         self.pg0.add_stream(p)
2573         self.pg_enable_capture(self.pg_interfaces)
2574         self.pg_start()
2575         capture = self.pg1.get_capture(1)
2576         p = capture[0]
2577         try:
2578             ip = p[IP]
2579             tcp = p[TCP]
2580             self.assertEqual(ip.src, nat_ip)
2581             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2582             self.assertEqual(tcp.dport, external_port)
2583             port_out0 = tcp.sport
2584         except:
2585             self.logger.error(ppp("Unexpected or invalid packet:", p))
2586             raise
2587
2588         # host1 to out
2589         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2590              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2591              TCP(sport=port_in, dport=external_port))
2592         self.pg0.add_stream(p)
2593         self.pg_enable_capture(self.pg_interfaces)
2594         self.pg_start()
2595         capture = self.pg1.get_capture(1)
2596         p = capture[0]
2597         try:
2598             ip = p[IP]
2599             tcp = p[TCP]
2600             self.assertEqual(ip.src, nat_ip)
2601             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2602             self.assertEqual(tcp.dport, external_port)
2603             port_out1 = tcp.sport
2604         except:
2605             self.logger.error(ppp("Unexpected or invalid packet:", p))
2606             raise
2607
2608         dms = self.vapi.nat_det_map_dump()
2609         self.assertEqual(1, len(dms))
2610         self.assertEqual(2, dms[0].ses_num)
2611
2612         # out to host0
2613         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2614              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2615              TCP(sport=external_port, dport=port_out0))
2616         self.pg1.add_stream(p)
2617         self.pg_enable_capture(self.pg_interfaces)
2618         self.pg_start()
2619         capture = self.pg0.get_capture(1)
2620         p = capture[0]
2621         try:
2622             ip = p[IP]
2623             tcp = p[TCP]
2624             self.assertEqual(ip.src, self.pg1.remote_ip4)
2625             self.assertEqual(ip.dst, host0.ip4)
2626             self.assertEqual(tcp.dport, port_in)
2627             self.assertEqual(tcp.sport, external_port)
2628         except:
2629             self.logger.error(ppp("Unexpected or invalid packet:", p))
2630             raise
2631
2632         # out to host1
2633         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2634              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2635              TCP(sport=external_port, dport=port_out1))
2636         self.pg1.add_stream(p)
2637         self.pg_enable_capture(self.pg_interfaces)
2638         self.pg_start()
2639         capture = self.pg0.get_capture(1)
2640         p = capture[0]
2641         try:
2642             ip = p[IP]
2643             tcp = p[TCP]
2644             self.assertEqual(ip.src, self.pg1.remote_ip4)
2645             self.assertEqual(ip.dst, host1.ip4)
2646             self.assertEqual(tcp.dport, port_in)
2647             self.assertEqual(tcp.sport, external_port)
2648         except:
2649             self.logger.error(ppp("Unexpected or invalid packet", p))
2650             raise
2651
2652         # session close api test
2653         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2654                                             port_out1,
2655                                             self.pg1.remote_ip4n,
2656                                             external_port)
2657         dms = self.vapi.nat_det_map_dump()
2658         self.assertEqual(dms[0].ses_num, 1)
2659
2660         self.vapi.nat_det_close_session_in(host0.ip4n,
2661                                            port_in,
2662                                            self.pg1.remote_ip4n,
2663                                            external_port)
2664         dms = self.vapi.nat_det_map_dump()
2665         self.assertEqual(dms[0].ses_num, 0)
2666
2667     def test_tcp_session_close_detection_in(self):
2668         """ Deterministic NAT TCP session close from inside network """
2669         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2670                                       32,
2671                                       socket.inet_aton(self.nat_addr),
2672                                       32)
2673         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2674         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2675                                                   is_inside=0)
2676
2677         self.initiate_tcp_session(self.pg0, self.pg1)
2678
2679         # close the session from inside
2680         try:
2681             # FIN packet in -> out
2682             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2683                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2684                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2685                      flags="F"))
2686             self.pg0.add_stream(p)
2687             self.pg_enable_capture(self.pg_interfaces)
2688             self.pg_start()
2689             self.pg1.get_capture(1)
2690
2691             pkts = []
2692
2693             # ACK packet out -> in
2694             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2695                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2696                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2697                      flags="A"))
2698             pkts.append(p)
2699
2700             # FIN packet out -> in
2701             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2702                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2703                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2704                      flags="F"))
2705             pkts.append(p)
2706
2707             self.pg1.add_stream(pkts)
2708             self.pg_enable_capture(self.pg_interfaces)
2709             self.pg_start()
2710             self.pg0.get_capture(2)
2711
2712             # ACK packet in -> out
2713             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2714                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2715                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2716                      flags="A"))
2717             self.pg0.add_stream(p)
2718             self.pg_enable_capture(self.pg_interfaces)
2719             self.pg_start()
2720             self.pg1.get_capture(1)
2721
2722             # Check if deterministic NAT44 closed the session
2723             dms = self.vapi.nat_det_map_dump()
2724             self.assertEqual(0, dms[0].ses_num)
2725         except:
2726             self.logger.error("TCP session termination failed")
2727             raise
2728
2729     def test_tcp_session_close_detection_out(self):
2730         """ Deterministic NAT TCP session close from outside network """
2731         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2732                                       32,
2733                                       socket.inet_aton(self.nat_addr),
2734                                       32)
2735         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2736         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2737                                                   is_inside=0)
2738
2739         self.initiate_tcp_session(self.pg0, self.pg1)
2740
2741         # close the session from outside
2742         try:
2743             # FIN packet out -> in
2744             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2745                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2746                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2747                      flags="F"))
2748             self.pg1.add_stream(p)
2749             self.pg_enable_capture(self.pg_interfaces)
2750             self.pg_start()
2751             self.pg0.get_capture(1)
2752
2753             pkts = []
2754
2755             # ACK packet in -> out
2756             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2757                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2758                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2759                      flags="A"))
2760             pkts.append(p)
2761
2762             # ACK packet in -> out
2763             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2764                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2765                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2766                      flags="F"))
2767             pkts.append(p)
2768
2769             self.pg0.add_stream(pkts)
2770             self.pg_enable_capture(self.pg_interfaces)
2771             self.pg_start()
2772             self.pg1.get_capture(2)
2773
2774             # ACK packet out -> in
2775             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2776                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2777                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2778                      flags="A"))
2779             self.pg1.add_stream(p)
2780             self.pg_enable_capture(self.pg_interfaces)
2781             self.pg_start()
2782             self.pg0.get_capture(1)
2783
2784             # Check if deterministic NAT44 closed the session
2785             dms = self.vapi.nat_det_map_dump()
2786             self.assertEqual(0, dms[0].ses_num)
2787         except:
2788             self.logger.error("TCP session termination failed")
2789             raise
2790
2791     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2792     def test_session_timeout(self):
2793         """ Deterministic NAT session timeouts """
2794         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2795                                       32,
2796                                       socket.inet_aton(self.nat_addr),
2797                                       32)
2798         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2799         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2800                                                   is_inside=0)
2801
2802         self.initiate_tcp_session(self.pg0, self.pg1)
2803         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2804         pkts = self.create_stream_in(self.pg0, self.pg1)
2805         self.pg0.add_stream(pkts)
2806         self.pg_enable_capture(self.pg_interfaces)
2807         self.pg_start()
2808         capture = self.pg1.get_capture(len(pkts))
2809         sleep(15)
2810
2811         dms = self.vapi.nat_det_map_dump()
2812         self.assertEqual(0, dms[0].ses_num)
2813
2814     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2815     def test_session_limit_per_user(self):
2816         """ Deterministic NAT maximum sessions per user limit """
2817         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2818                                       32,
2819                                       socket.inet_aton(self.nat_addr),
2820                                       32)
2821         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2822         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2823                                                   is_inside=0)
2824         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2825                                      src_address=self.pg2.local_ip4n,
2826                                      path_mtu=512,
2827                                      template_interval=10)
2828         self.vapi.nat_ipfix()
2829
2830         pkts = []
2831         for port in range(1025, 2025):
2832             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2833                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2834                  UDP(sport=port, dport=port))
2835             pkts.append(p)
2836
2837         self.pg0.add_stream(pkts)
2838         self.pg_enable_capture(self.pg_interfaces)
2839         self.pg_start()
2840         capture = self.pg1.get_capture(len(pkts))
2841
2842         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2843              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2844              UDP(sport=3001, dport=3002))
2845         self.pg0.add_stream(p)
2846         self.pg_enable_capture(self.pg_interfaces)
2847         self.pg_start()
2848         capture = self.pg1.assert_nothing_captured()
2849
2850         # verify ICMP error packet
2851         capture = self.pg0.get_capture(1)
2852         p = capture[0]
2853         self.assertTrue(p.haslayer(ICMP))
2854         icmp = p[ICMP]
2855         self.assertEqual(icmp.type, 3)
2856         self.assertEqual(icmp.code, 1)
2857         self.assertTrue(icmp.haslayer(IPerror))
2858         inner_ip = icmp[IPerror]
2859         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2860         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2861
2862         dms = self.vapi.nat_det_map_dump()
2863
2864         self.assertEqual(1000, dms[0].ses_num)
2865
2866         # verify IPFIX logging
2867         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2868         sleep(1)
2869         capture = self.pg2.get_capture(2)
2870         ipfix = IPFIXDecoder()
2871         # first load template
2872         for p in capture:
2873             self.assertTrue(p.haslayer(IPFIX))
2874             if p.haslayer(Template):
2875                 ipfix.add_template(p.getlayer(Template))
2876         # verify events in data set
2877         for p in capture:
2878             if p.haslayer(Data):
2879                 data = ipfix.decode_data_set(p.getlayer(Set))
2880                 self.verify_ipfix_max_entries_per_user(data)
2881
2882     def clear_nat_det(self):
2883         """
2884         Clear deterministic NAT configuration.
2885         """
2886         self.vapi.nat_ipfix(enable=0)
2887         self.vapi.nat_det_set_timeouts()
2888         deterministic_mappings = self.vapi.nat_det_map_dump()
2889         for dsm in deterministic_mappings:
2890             self.vapi.nat_det_add_del_map(dsm.in_addr,
2891                                           dsm.in_plen,
2892                                           dsm.out_addr,
2893                                           dsm.out_plen,
2894                                           is_add=0)
2895
2896         interfaces = self.vapi.nat44_interface_dump()
2897         for intf in interfaces:
2898             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
2899                                                       intf.is_inside,
2900                                                       is_add=0)
2901
2902     def tearDown(self):
2903         super(TestDeterministicNAT, self).tearDown()
2904         if not self.vpp_dead:
2905             self.logger.info(self.vapi.cli("show nat44 detail"))
2906             self.clear_nat_det()
2907
2908
2909 class TestNAT64(MethodHolder):
2910     """ NAT64 Test Cases """
2911
2912     @classmethod
2913     def setUpClass(cls):
2914         super(TestNAT64, cls).setUpClass()
2915
2916         try:
2917             cls.tcp_port_in = 6303
2918             cls.tcp_port_out = 6303
2919             cls.udp_port_in = 6304
2920             cls.udp_port_out = 6304
2921             cls.icmp_id_in = 6305
2922             cls.icmp_id_out = 6305
2923             cls.nat_addr = '10.0.0.3'
2924             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2925             cls.vrf1_id = 10
2926             cls.vrf1_nat_addr = '10.0.10.3'
2927             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
2928                                                    cls.vrf1_nat_addr)
2929
2930             cls.create_pg_interfaces(range(3))
2931             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2932             cls.ip6_interfaces.append(cls.pg_interfaces[2])
2933             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2934
2935             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
2936
2937             cls.pg0.generate_remote_hosts(2)
2938
2939             for i in cls.ip6_interfaces:
2940                 i.admin_up()
2941                 i.config_ip6()
2942                 i.configure_ipv6_neighbors()
2943
2944             for i in cls.ip4_interfaces:
2945                 i.admin_up()
2946                 i.config_ip4()
2947                 i.resolve_arp()
2948
2949         except Exception:
2950             super(TestNAT64, cls).tearDownClass()
2951             raise
2952
2953     def test_pool(self):
2954         """ Add/delete address to NAT64 pool """
2955         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2956
2957         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2958
2959         addresses = self.vapi.nat64_pool_addr_dump()
2960         self.assertEqual(len(addresses), 1)
2961         self.assertEqual(addresses[0].address, nat_addr)
2962
2963         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2964
2965         addresses = self.vapi.nat64_pool_addr_dump()
2966         self.assertEqual(len(addresses), 0)
2967
2968     def test_interface(self):
2969         """ Enable/disable NAT64 feature on the interface """
2970         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2971         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2972
2973         interfaces = self.vapi.nat64_interface_dump()
2974         self.assertEqual(len(interfaces), 2)
2975         pg0_found = False
2976         pg1_found = False
2977         for intf in interfaces:
2978             if intf.sw_if_index == self.pg0.sw_if_index:
2979                 self.assertEqual(intf.is_inside, 1)
2980                 pg0_found = True
2981             elif intf.sw_if_index == self.pg1.sw_if_index:
2982                 self.assertEqual(intf.is_inside, 0)
2983                 pg1_found = True
2984         self.assertTrue(pg0_found)
2985         self.assertTrue(pg1_found)
2986
2987         features = self.vapi.cli("show interface features pg0")
2988         self.assertNotEqual(features.find('nat64-in2out'), -1)
2989         features = self.vapi.cli("show interface features pg1")
2990         self.assertNotEqual(features.find('nat64-out2in'), -1)
2991
2992         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2993         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2994
2995         interfaces = self.vapi.nat64_interface_dump()
2996         self.assertEqual(len(interfaces), 0)
2997
2998     def test_static_bib(self):
2999         """ Add/delete static BIB entry """
3000         in_addr = socket.inet_pton(socket.AF_INET6,
3001                                    '2001:db8:85a3::8a2e:370:7334')
3002         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3003         in_port = 1234
3004         out_port = 5678
3005         proto = IP_PROTOS.tcp
3006
3007         self.vapi.nat64_add_del_static_bib(in_addr,
3008                                            out_addr,
3009                                            in_port,
3010                                            out_port,
3011                                            proto)
3012         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3013         static_bib_num = 0
3014         for bibe in bib:
3015             if bibe.is_static:
3016                 static_bib_num += 1
3017                 self.assertEqual(bibe.i_addr, in_addr)
3018                 self.assertEqual(bibe.o_addr, out_addr)
3019                 self.assertEqual(bibe.i_port, in_port)
3020                 self.assertEqual(bibe.o_port, out_port)
3021         self.assertEqual(static_bib_num, 1)
3022
3023         self.vapi.nat64_add_del_static_bib(in_addr,
3024                                            out_addr,
3025                                            in_port,
3026                                            out_port,
3027                                            proto,
3028                                            is_add=0)
3029         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3030         static_bib_num = 0
3031         for bibe in bib:
3032             if bibe.is_static:
3033                 static_bib_num += 1
3034         self.assertEqual(static_bib_num, 0)
3035
3036     def test_set_timeouts(self):
3037         """ Set NAT64 timeouts """
3038         # verify default values
3039         timeouts = self.vapi.nat64_get_timeouts()
3040         self.assertEqual(timeouts.udp, 300)
3041         self.assertEqual(timeouts.icmp, 60)
3042         self.assertEqual(timeouts.tcp_trans, 240)
3043         self.assertEqual(timeouts.tcp_est, 7440)
3044         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3045
3046         # set and verify custom values
3047         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3048                                      tcp_est=7450, tcp_incoming_syn=10)
3049         timeouts = self.vapi.nat64_get_timeouts()
3050         self.assertEqual(timeouts.udp, 200)
3051         self.assertEqual(timeouts.icmp, 30)
3052         self.assertEqual(timeouts.tcp_trans, 250)
3053         self.assertEqual(timeouts.tcp_est, 7450)
3054         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3055
3056     def test_dynamic(self):
3057         """ NAT64 dynamic translation test """
3058         self.tcp_port_in = 6303
3059         self.udp_port_in = 6304
3060         self.icmp_id_in = 6305
3061
3062         ses_num_start = self.nat64_get_ses_num()
3063
3064         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3065                                                 self.nat_addr_n)
3066         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3067         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3068
3069         # in2out
3070         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3071         self.pg0.add_stream(pkts)
3072         self.pg_enable_capture(self.pg_interfaces)
3073         self.pg_start()
3074         capture = self.pg1.get_capture(len(pkts))
3075         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3076                                 dst_ip=self.pg1.remote_ip4)
3077
3078         # out2in
3079         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3080         self.pg1.add_stream(pkts)
3081         self.pg_enable_capture(self.pg_interfaces)
3082         self.pg_start()
3083         capture = self.pg0.get_capture(len(pkts))
3084         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3085         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3086
3087         # in2out
3088         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3089         self.pg0.add_stream(pkts)
3090         self.pg_enable_capture(self.pg_interfaces)
3091         self.pg_start()
3092         capture = self.pg1.get_capture(len(pkts))
3093         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3094                                 dst_ip=self.pg1.remote_ip4)
3095
3096         # out2in
3097         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3098         self.pg1.add_stream(pkts)
3099         self.pg_enable_capture(self.pg_interfaces)
3100         self.pg_start()
3101         capture = self.pg0.get_capture(len(pkts))
3102         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3103
3104         ses_num_end = self.nat64_get_ses_num()
3105
3106         self.assertEqual(ses_num_end - ses_num_start, 3)
3107
3108         # tenant with specific VRF
3109         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3110                                                 self.vrf1_nat_addr_n,
3111                                                 vrf_id=self.vrf1_id)
3112         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3113
3114         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3115         self.pg2.add_stream(pkts)
3116         self.pg_enable_capture(self.pg_interfaces)
3117         self.pg_start()
3118         capture = self.pg1.get_capture(len(pkts))
3119         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3120                                 dst_ip=self.pg1.remote_ip4)
3121
3122         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3123         self.pg1.add_stream(pkts)
3124         self.pg_enable_capture(self.pg_interfaces)
3125         self.pg_start()
3126         capture = self.pg2.get_capture(len(pkts))
3127         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3128
3129     def test_static(self):
3130         """ NAT64 static translation test """
3131         self.tcp_port_in = 60303
3132         self.udp_port_in = 60304
3133         self.icmp_id_in = 60305
3134         self.tcp_port_out = 60303
3135         self.udp_port_out = 60304
3136         self.icmp_id_out = 60305
3137
3138         ses_num_start = self.nat64_get_ses_num()
3139
3140         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3141                                                 self.nat_addr_n)
3142         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3143         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3144
3145         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3146                                            self.nat_addr_n,
3147                                            self.tcp_port_in,
3148                                            self.tcp_port_out,
3149                                            IP_PROTOS.tcp)
3150         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3151                                            self.nat_addr_n,
3152                                            self.udp_port_in,
3153                                            self.udp_port_out,
3154                                            IP_PROTOS.udp)
3155         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3156                                            self.nat_addr_n,
3157                                            self.icmp_id_in,
3158                                            self.icmp_id_out,
3159                                            IP_PROTOS.icmp)
3160
3161         # in2out
3162         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3163         self.pg0.add_stream(pkts)
3164         self.pg_enable_capture(self.pg_interfaces)
3165         self.pg_start()
3166         capture = self.pg1.get_capture(len(pkts))
3167         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3168                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3169
3170         # out2in
3171         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3172         self.pg1.add_stream(pkts)
3173         self.pg_enable_capture(self.pg_interfaces)
3174         self.pg_start()
3175         capture = self.pg0.get_capture(len(pkts))
3176         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3177         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3178
3179         ses_num_end = self.nat64_get_ses_num()
3180
3181         self.assertEqual(ses_num_end - ses_num_start, 3)
3182
3183     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3184     def test_session_timeout(self):
3185         """ NAT64 session timeout """
3186         self.icmp_id_in = 1234
3187         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3188                                                 self.nat_addr_n)
3189         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3190         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3191         self.vapi.nat64_set_timeouts(icmp=5)
3192
3193         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3194         self.pg0.add_stream(pkts)
3195         self.pg_enable_capture(self.pg_interfaces)
3196         self.pg_start()
3197         capture = self.pg1.get_capture(len(pkts))
3198
3199         ses_num_before_timeout = self.nat64_get_ses_num()
3200
3201         sleep(15)
3202
3203         # ICMP session after timeout
3204         ses_num_after_timeout = self.nat64_get_ses_num()
3205         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3206
3207     def test_icmp_error(self):
3208         """ NAT64 ICMP Error message translation """
3209         self.tcp_port_in = 6303
3210         self.udp_port_in = 6304
3211         self.icmp_id_in = 6305
3212
3213         ses_num_start = self.nat64_get_ses_num()
3214
3215         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3216                                                 self.nat_addr_n)
3217         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3218         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3219
3220         # send some packets to create sessions
3221         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3222         self.pg0.add_stream(pkts)
3223         self.pg_enable_capture(self.pg_interfaces)
3224         self.pg_start()
3225         capture_ip4 = self.pg1.get_capture(len(pkts))
3226         self.verify_capture_out(capture_ip4,
3227                                 nat_ip=self.nat_addr,
3228                                 dst_ip=self.pg1.remote_ip4)
3229
3230         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3231         self.pg1.add_stream(pkts)
3232         self.pg_enable_capture(self.pg_interfaces)
3233         self.pg_start()
3234         capture_ip6 = self.pg0.get_capture(len(pkts))
3235         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3236         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3237                                    self.pg0.remote_ip6)
3238
3239         # in2out
3240         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3241                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3242                 ICMPv6DestUnreach(code=1) /
3243                 packet[IPv6] for packet in capture_ip6]
3244         self.pg0.add_stream(pkts)
3245         self.pg_enable_capture(self.pg_interfaces)
3246         self.pg_start()
3247         capture = self.pg1.get_capture(len(pkts))
3248         for packet in capture:
3249             try:
3250                 self.assertEqual(packet[IP].src, self.nat_addr)
3251                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3252                 self.assertEqual(packet[ICMP].type, 3)
3253                 self.assertEqual(packet[ICMP].code, 13)
3254                 inner = packet[IPerror]
3255                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3256                 self.assertEqual(inner.dst, self.nat_addr)
3257                 self.check_icmp_checksum(packet)
3258                 if inner.haslayer(TCPerror):
3259                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3260                 elif inner.haslayer(UDPerror):
3261                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3262                 else:
3263                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3264             except:
3265                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3266                 raise
3267
3268         # out2in
3269         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3270                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3271                 ICMP(type=3, code=13) /
3272                 packet[IP] for packet in capture_ip4]
3273         self.pg1.add_stream(pkts)
3274         self.pg_enable_capture(self.pg_interfaces)
3275         self.pg_start()
3276         capture = self.pg0.get_capture(len(pkts))
3277         for packet in capture:
3278             try:
3279                 self.assertEqual(packet[IPv6].src, ip.src)
3280                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3281                 icmp = packet[ICMPv6DestUnreach]
3282                 self.assertEqual(icmp.code, 1)
3283                 inner = icmp[IPerror6]
3284                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3285                 self.assertEqual(inner.dst, ip.src)
3286                 self.check_icmpv6_checksum(packet)
3287                 if inner.haslayer(TCPerror):
3288                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3289                 elif inner.haslayer(UDPerror):
3290                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3291                 else:
3292                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3293                                      self.icmp_id_in)
3294             except:
3295                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3296                 raise
3297
3298     def test_hairpinning(self):
3299         """ NAT64 hairpinning """
3300
3301         client = self.pg0.remote_hosts[0]
3302         server = self.pg0.remote_hosts[1]
3303         server_tcp_in_port = 22
3304         server_tcp_out_port = 4022
3305         server_udp_in_port = 23
3306         server_udp_out_port = 4023
3307         client_tcp_in_port = 1234
3308         client_udp_in_port = 1235
3309         client_tcp_out_port = 0
3310         client_udp_out_port = 0
3311         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3312         nat_addr_ip6 = ip.src
3313
3314         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3315                                                 self.nat_addr_n)
3316         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3317         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3318
3319         self.vapi.nat64_add_del_static_bib(server.ip6n,
3320                                            self.nat_addr_n,
3321                                            server_tcp_in_port,
3322                                            server_tcp_out_port,
3323                                            IP_PROTOS.tcp)
3324         self.vapi.nat64_add_del_static_bib(server.ip6n,
3325                                            self.nat_addr_n,
3326                                            server_udp_in_port,
3327                                            server_udp_out_port,
3328                                            IP_PROTOS.udp)
3329
3330         # client to server
3331         pkts = []
3332         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3333              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3334              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3335         pkts.append(p)
3336         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3337              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3338              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3339         pkts.append(p)
3340         self.pg0.add_stream(pkts)
3341         self.pg_enable_capture(self.pg_interfaces)
3342         self.pg_start()
3343         capture = self.pg0.get_capture(len(pkts))
3344         for packet in capture:
3345             try:
3346                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3347                 self.assertEqual(packet[IPv6].dst, server.ip6)
3348                 if packet.haslayer(TCP):
3349                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3350                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3351                     self.check_tcp_checksum(packet)
3352                     client_tcp_out_port = packet[TCP].sport
3353                 else:
3354                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3355                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3356                     self.check_udp_checksum(packet)
3357                     client_udp_out_port = packet[UDP].sport
3358             except:
3359                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3360                 raise
3361
3362         # server to client
3363         pkts = []
3364         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3365              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3366              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3367         pkts.append(p)
3368         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3369              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3370              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3371         pkts.append(p)
3372         self.pg0.add_stream(pkts)
3373         self.pg_enable_capture(self.pg_interfaces)
3374         self.pg_start()
3375         capture = self.pg0.get_capture(len(pkts))
3376         for packet in capture:
3377             try:
3378                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3379                 self.assertEqual(packet[IPv6].dst, client.ip6)
3380                 if packet.haslayer(TCP):
3381                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3382                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3383                     self.check_tcp_checksum(packet)
3384                 else:
3385                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3386                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3387                     self.check_udp_checksum(packet)
3388             except:
3389                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3390                 raise
3391
3392         # ICMP error
3393         pkts = []
3394         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3395                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3396                 ICMPv6DestUnreach(code=1) /
3397                 packet[IPv6] for packet in capture]
3398         self.pg0.add_stream(pkts)
3399         self.pg_enable_capture(self.pg_interfaces)
3400         self.pg_start()
3401         capture = self.pg0.get_capture(len(pkts))
3402         for packet in capture:
3403             try:
3404                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3405                 self.assertEqual(packet[IPv6].dst, server.ip6)
3406                 icmp = packet[ICMPv6DestUnreach]
3407                 self.assertEqual(icmp.code, 1)
3408                 inner = icmp[IPerror6]
3409                 self.assertEqual(inner.src, server.ip6)
3410                 self.assertEqual(inner.dst, nat_addr_ip6)
3411                 self.check_icmpv6_checksum(packet)
3412                 if inner.haslayer(TCPerror):
3413                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3414                     self.assertEqual(inner[TCPerror].dport,
3415                                      client_tcp_out_port)
3416                 else:
3417                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3418                     self.assertEqual(inner[UDPerror].dport,
3419                                      client_udp_out_port)
3420             except:
3421                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3422                 raise
3423
3424     def test_prefix(self):
3425         """ NAT64 Network-Specific Prefix """
3426
3427         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3428                                                 self.nat_addr_n)
3429         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3430         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3431         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3432                                                 self.vrf1_nat_addr_n,
3433                                                 vrf_id=self.vrf1_id)
3434         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3435
3436         # Add global prefix
3437         global_pref64 = "2001:db8::"
3438         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3439         global_pref64_len = 32
3440         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3441
3442         prefix = self.vapi.nat64_prefix_dump()
3443         self.assertEqual(len(prefix), 1)
3444         self.assertEqual(prefix[0].prefix, global_pref64_n)
3445         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3446         self.assertEqual(prefix[0].vrf_id, 0)
3447
3448         # Add tenant specific prefix
3449         vrf1_pref64 = "2001:db8:122:300::"
3450         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3451         vrf1_pref64_len = 56
3452         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3453                                        vrf1_pref64_len,
3454                                        vrf_id=self.vrf1_id)
3455         prefix = self.vapi.nat64_prefix_dump()
3456         self.assertEqual(len(prefix), 2)
3457
3458         # Global prefix
3459         pkts = self.create_stream_in_ip6(self.pg0,
3460                                          self.pg1,
3461                                          pref=global_pref64,
3462                                          plen=global_pref64_len)
3463         self.pg0.add_stream(pkts)
3464         self.pg_enable_capture(self.pg_interfaces)
3465         self.pg_start()
3466         capture = self.pg1.get_capture(len(pkts))
3467         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3468                                 dst_ip=self.pg1.remote_ip4)
3469
3470         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3471         self.pg1.add_stream(pkts)
3472         self.pg_enable_capture(self.pg_interfaces)
3473         self.pg_start()
3474         capture = self.pg0.get_capture(len(pkts))
3475         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3476                                   global_pref64,
3477                                   global_pref64_len)
3478         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3479
3480         # Tenant specific prefix
3481         pkts = self.create_stream_in_ip6(self.pg2,
3482                                          self.pg1,
3483                                          pref=vrf1_pref64,
3484                                          plen=vrf1_pref64_len)
3485         self.pg2.add_stream(pkts)
3486         self.pg_enable_capture(self.pg_interfaces)
3487         self.pg_start()
3488         capture = self.pg1.get_capture(len(pkts))
3489         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3490                                 dst_ip=self.pg1.remote_ip4)
3491
3492         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3493         self.pg1.add_stream(pkts)
3494         self.pg_enable_capture(self.pg_interfaces)
3495         self.pg_start()
3496         capture = self.pg2.get_capture(len(pkts))
3497         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3498                                   vrf1_pref64,
3499                                   vrf1_pref64_len)
3500         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3501
3502     def test_unknown_proto(self):
3503         """ NAT64 translate packet with unknown protocol """
3504
3505         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3506                                                 self.nat_addr_n)
3507         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3508         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3509         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3510
3511         # in2out
3512         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3513              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3514              TCP(sport=self.tcp_port_in, dport=20))
3515         self.pg0.add_stream(p)
3516         self.pg_enable_capture(self.pg_interfaces)
3517         self.pg_start()
3518         p = self.pg1.get_capture(1)
3519
3520         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3521              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3522              GRE() /
3523              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3524              TCP(sport=1234, dport=1234))
3525         self.pg0.add_stream(p)
3526         self.pg_enable_capture(self.pg_interfaces)
3527         self.pg_start()
3528         p = self.pg1.get_capture(1)
3529         packet = p[0]
3530         try:
3531             self.assertEqual(packet[IP].src, self.nat_addr)
3532             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3533             self.assertTrue(packet.haslayer(GRE))
3534             self.check_ip_checksum(packet)
3535         except:
3536             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3537             raise
3538
3539         # out2in
3540         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3541              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3542              GRE() /
3543              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3544              TCP(sport=1234, dport=1234))
3545         self.pg1.add_stream(p)
3546         self.pg_enable_capture(self.pg_interfaces)
3547         self.pg_start()
3548         p = self.pg0.get_capture(1)
3549         packet = p[0]
3550         try:
3551             self.assertEqual(packet[IPv6].src, remote_ip6)
3552             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3553             self.assertEqual(packet[IPv6].nh, 47)
3554         except:
3555             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3556             raise
3557
3558     def test_hairpinning_unknown_proto(self):
3559         """ NAT64 translate packet with unknown protocol - hairpinning """
3560
3561         client = self.pg0.remote_hosts[0]
3562         server = self.pg0.remote_hosts[1]
3563         server_tcp_in_port = 22
3564         server_tcp_out_port = 4022
3565         client_tcp_in_port = 1234
3566         client_tcp_out_port = 1235
3567         server_nat_ip = "10.0.0.100"
3568         client_nat_ip = "10.0.0.110"
3569         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3570         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3571         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3572         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3573
3574         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3575                                                 client_nat_ip_n)
3576         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3577         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3578
3579         self.vapi.nat64_add_del_static_bib(server.ip6n,
3580                                            server_nat_ip_n,
3581                                            server_tcp_in_port,
3582                                            server_tcp_out_port,
3583                                            IP_PROTOS.tcp)
3584
3585         self.vapi.nat64_add_del_static_bib(server.ip6n,
3586                                            server_nat_ip_n,
3587                                            0,
3588                                            0,
3589                                            IP_PROTOS.gre)
3590
3591         self.vapi.nat64_add_del_static_bib(client.ip6n,
3592                                            client_nat_ip_n,
3593                                            client_tcp_in_port,
3594                                            client_tcp_out_port,
3595                                            IP_PROTOS.tcp)
3596
3597         # client to server
3598         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3599              IPv6(src=client.ip6, dst=server_nat_ip6) /
3600              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3601         self.pg0.add_stream(p)
3602         self.pg_enable_capture(self.pg_interfaces)
3603         self.pg_start()
3604         p = self.pg0.get_capture(1)
3605
3606         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3607              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3608              GRE() /
3609              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3610              TCP(sport=1234, dport=1234))
3611         self.pg0.add_stream(p)
3612         self.pg_enable_capture(self.pg_interfaces)
3613         self.pg_start()
3614         p = self.pg0.get_capture(1)
3615         packet = p[0]
3616         try:
3617             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3618             self.assertEqual(packet[IPv6].dst, server.ip6)
3619             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3620         except:
3621             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3622             raise
3623
3624         # server to client
3625         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3626              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3627              GRE() /
3628              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3629              TCP(sport=1234, dport=1234))
3630         self.pg0.add_stream(p)
3631         self.pg_enable_capture(self.pg_interfaces)
3632         self.pg_start()
3633         p = self.pg0.get_capture(1)
3634         packet = p[0]
3635         try:
3636             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3637             self.assertEqual(packet[IPv6].dst, client.ip6)
3638             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3639         except:
3640             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3641             raise
3642
3643     def nat64_get_ses_num(self):
3644         """
3645         Return number of active NAT64 sessions.
3646         """
3647         st = self.vapi.nat64_st_dump()
3648         return len(st)
3649
3650     def clear_nat64(self):
3651         """
3652         Clear NAT64 configuration.
3653         """
3654         self.vapi.nat64_set_timeouts()
3655
3656         interfaces = self.vapi.nat64_interface_dump()
3657         for intf in interfaces:
3658             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3659                                               intf.is_inside,
3660                                               is_add=0)
3661
3662         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3663         for bibe in bib:
3664             if bibe.is_static:
3665                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3666                                                    bibe.o_addr,
3667                                                    bibe.i_port,
3668                                                    bibe.o_port,
3669                                                    bibe.proto,
3670                                                    bibe.vrf_id,
3671                                                    is_add=0)
3672
3673         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3674         for bibe in bib:
3675             if bibe.is_static:
3676                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3677                                                    bibe.o_addr,
3678                                                    bibe.i_port,
3679                                                    bibe.o_port,
3680                                                    bibe.proto,
3681                                                    bibe.vrf_id,
3682                                                    is_add=0)
3683
3684         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3685         for bibe in bib:
3686             if bibe.is_static:
3687                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3688                                                    bibe.o_addr,
3689                                                    bibe.i_port,
3690                                                    bibe.o_port,
3691                                                    bibe.proto,
3692                                                    bibe.vrf_id,
3693                                                    is_add=0)
3694
3695         adresses = self.vapi.nat64_pool_addr_dump()
3696         for addr in adresses:
3697             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3698                                                     addr.address,
3699                                                     vrf_id=addr.vrf_id,
3700                                                     is_add=0)
3701
3702         prefixes = self.vapi.nat64_prefix_dump()
3703         for prefix in prefixes:
3704             self.vapi.nat64_add_del_prefix(prefix.prefix,
3705                                            prefix.prefix_len,
3706                                            vrf_id=prefix.vrf_id,
3707                                            is_add=0)
3708
3709     def tearDown(self):
3710         super(TestNAT64, self).tearDown()
3711         if not self.vpp_dead:
3712             self.logger.info(self.vapi.cli("show nat64 pool"))
3713             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3714             self.logger.info(self.vapi.cli("show nat64 prefix"))
3715             self.logger.info(self.vapi.cli("show nat64 bib all"))
3716             self.logger.info(self.vapi.cli("show nat64 session table all"))
3717             self.clear_nat64()
3718
3719 if __name__ == '__main__':
3720     unittest.main(testRunner=VppTestRunner)