7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
21 class MethodHolder(VppTestCase):
22 """ NAT create capture and verify method holder """
26 super(MethodHolder, cls).setUpClass()
29 super(MethodHolder, self).tearDown()
31 def check_ip_checksum(self, pkt):
33 Check IP checksum of the packet
35 :param pkt: Packet to check IP checksum
37 new = pkt.__class__(str(pkt))
39 new = new.__class__(str(new))
40 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
42 def check_tcp_checksum(self, pkt):
44 Check TCP checksum in IP packet
46 :param pkt: Packet to check TCP checksum
48 new = pkt.__class__(str(pkt))
50 new = new.__class__(str(new))
51 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
53 def check_udp_checksum(self, pkt):
55 Check UDP checksum in IP packet
57 :param pkt: Packet to check UDP checksum
59 new = pkt.__class__(str(pkt))
61 new = new.__class__(str(new))
62 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
64 def check_icmp_errror_embedded(self, pkt):
66 Check ICMP error embeded packet checksum
68 :param pkt: Packet to check ICMP error embeded packet checksum
70 if pkt.haslayer(IPerror):
71 new = pkt.__class__(str(pkt))
72 del new['IPerror'].chksum
73 new = new.__class__(str(new))
74 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
76 if pkt.haslayer(TCPerror):
77 new = pkt.__class__(str(pkt))
78 del new['TCPerror'].chksum
79 new = new.__class__(str(new))
80 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
82 if pkt.haslayer(UDPerror):
83 if pkt['UDPerror'].chksum != 0:
84 new = pkt.__class__(str(pkt))
85 del new['UDPerror'].chksum
86 new = new.__class__(str(new))
87 self.assertEqual(new['UDPerror'].chksum,
88 pkt['UDPerror'].chksum)
90 if pkt.haslayer(ICMPerror):
91 del new['ICMPerror'].chksum
92 new = new.__class__(str(new))
93 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
95 def check_icmp_checksum(self, pkt):
97 Check ICMP checksum in IPv4 packet
99 :param pkt: Packet to check ICMP checksum
101 new = pkt.__class__(str(pkt))
102 del new['ICMP'].chksum
103 new = new.__class__(str(new))
104 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105 if pkt.haslayer(IPerror):
106 self.check_icmp_errror_embedded(pkt)
108 def check_icmpv6_checksum(self, pkt):
110 Check ICMPv6 checksum in IPv4 packet
112 :param pkt: Packet to check ICMPv6 checksum
114 new = pkt.__class__(str(pkt))
115 if pkt.haslayer(ICMPv6DestUnreach):
116 del new['ICMPv6DestUnreach'].cksum
117 new = new.__class__(str(new))
118 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119 pkt['ICMPv6DestUnreach'].cksum)
120 self.check_icmp_errror_embedded(pkt)
121 if pkt.haslayer(ICMPv6EchoRequest):
122 del new['ICMPv6EchoRequest'].cksum
123 new = new.__class__(str(new))
124 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125 pkt['ICMPv6EchoRequest'].cksum)
126 if pkt.haslayer(ICMPv6EchoReply):
127 del new['ICMPv6EchoReply'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoReply'].cksum,
130 pkt['ICMPv6EchoReply'].cksum)
132 def create_stream_in(self, in_if, out_if, ttl=64):
134 Create packet stream for inside network
136 :param in_if: Inside interface
137 :param out_if: Outside interface
138 :param ttl: TTL of generated packets
142 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144 TCP(sport=self.tcp_port_in, dport=20))
148 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150 UDP(sport=self.udp_port_in, dport=20))
154 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156 ICMP(id=self.icmp_id_in, type='echo-request'))
161 def compose_ip6(self, ip4, pref, plen):
163 Compose IPv4-embedded IPv6 addresses
165 :param ip4: IPv4 address
166 :param pref: IPv6 prefix
167 :param plen: IPv6 prefix length
168 :returns: IPv4-embedded IPv6 addresses
170 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
186 pref_n[10] = ip4_n[3]
190 pref_n[10] = ip4_n[2]
191 pref_n[11] = ip4_n[3]
194 pref_n[10] = ip4_n[1]
195 pref_n[11] = ip4_n[2]
196 pref_n[12] = ip4_n[3]
198 pref_n[12] = ip4_n[0]
199 pref_n[13] = ip4_n[1]
200 pref_n[14] = ip4_n[2]
201 pref_n[15] = ip4_n[3]
202 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
204 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
206 Create IPv6 packet stream for inside network
208 :param in_if: Inside interface
209 :param out_if: Outside interface
210 :param ttl: Hop Limit of generated packets
211 :param pref: NAT64 prefix
212 :param plen: NAT64 prefix length
216 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
218 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
221 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223 TCP(sport=self.tcp_port_in, dport=20))
227 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229 UDP(sport=self.udp_port_in, dport=20))
233 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235 ICMPv6EchoRequest(id=self.icmp_id_in))
240 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
242 Create packet stream for outside network
244 :param out_if: Outside interface
245 :param dst_ip: Destination IP address (Default use global NAT address)
246 :param ttl: TTL of generated packets
249 dst_ip = self.nat_addr
252 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254 TCP(dport=self.tcp_port_out, sport=20))
258 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260 UDP(dport=self.udp_port_out, sport=20))
264 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266 ICMP(id=self.icmp_id_out, type='echo-reply'))
271 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272 packet_num=3, dst_ip=None):
274 Verify captured packets on outside network
276 :param capture: Captured packets
277 :param nat_ip: Translated IP address (Default use global NAT address)
278 :param same_port: Sorce port number is not translated (Default False)
279 :param packet_num: Expected number of packets (Default 3)
280 :param dst_ip: Destination IP address (Default do not verify)
283 nat_ip = self.nat_addr
284 self.assertEqual(packet_num, len(capture))
285 for packet in capture:
287 self.check_ip_checksum(packet)
288 self.assertEqual(packet[IP].src, nat_ip)
289 if dst_ip is not None:
290 self.assertEqual(packet[IP].dst, dst_ip)
291 if packet.haslayer(TCP):
293 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
296 packet[TCP].sport, self.tcp_port_in)
297 self.tcp_port_out = packet[TCP].sport
298 self.check_tcp_checksum(packet)
299 elif packet.haslayer(UDP):
301 self.assertEqual(packet[UDP].sport, self.udp_port_in)
304 packet[UDP].sport, self.udp_port_in)
305 self.udp_port_out = packet[UDP].sport
308 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
310 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311 self.icmp_id_out = packet[ICMP].id
312 self.check_icmp_checksum(packet)
314 self.logger.error(ppp("Unexpected or invalid packet "
315 "(outside network):", packet))
318 def verify_capture_in(self, capture, in_if, packet_num=3):
320 Verify captured packets on inside network
322 :param capture: Captured packets
323 :param in_if: Inside interface
324 :param packet_num: Expected number of packets (Default 3)
326 self.assertEqual(packet_num, len(capture))
327 for packet in capture:
329 self.check_ip_checksum(packet)
330 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331 if packet.haslayer(TCP):
332 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333 self.check_tcp_checksum(packet)
334 elif packet.haslayer(UDP):
335 self.assertEqual(packet[UDP].dport, self.udp_port_in)
337 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338 self.check_icmp_checksum(packet)
340 self.logger.error(ppp("Unexpected or invalid packet "
341 "(inside network):", packet))
344 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
346 Verify captured IPv6 packets on inside network
348 :param capture: Captured packets
349 :param src_ip: Source IP
350 :param dst_ip: Destination IP address
351 :param packet_num: Expected number of packets (Default 3)
353 self.assertEqual(packet_num, len(capture))
354 for packet in capture:
356 self.assertEqual(packet[IPv6].src, src_ip)
357 self.assertEqual(packet[IPv6].dst, dst_ip)
358 if packet.haslayer(TCP):
359 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360 self.check_tcp_checksum(packet)
361 elif packet.haslayer(UDP):
362 self.assertEqual(packet[UDP].dport, self.udp_port_in)
363 self.check_udp_checksum(packet)
365 self.assertEqual(packet[ICMPv6EchoReply].id,
367 self.check_icmpv6_checksum(packet)
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def verify_ipfix_nat44_ses(self, data):
467 Verify IPFIX NAT44 session create/delete event
469 :param data: Decoded IPFIX data records
471 nat44_ses_create_num = 0
472 nat44_ses_delete_num = 0
473 self.assertEqual(6, len(data))
476 self.assertIn(ord(record[230]), [4, 5])
477 if ord(record[230]) == 4:
478 nat44_ses_create_num += 1
480 nat44_ses_delete_num += 1
482 self.assertEqual(self.pg0.remote_ip4n, record[8])
483 # postNATSourceIPv4Address
484 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
487 self.assertEqual(struct.pack("!I", 0), record[234])
488 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489 if IP_PROTOS.icmp == ord(record[4]):
490 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491 self.assertEqual(struct.pack("!H", self.icmp_id_out),
493 elif IP_PROTOS.tcp == ord(record[4]):
494 self.assertEqual(struct.pack("!H", self.tcp_port_in),
496 self.assertEqual(struct.pack("!H", self.tcp_port_out),
498 elif IP_PROTOS.udp == ord(record[4]):
499 self.assertEqual(struct.pack("!H", self.udp_port_in),
501 self.assertEqual(struct.pack("!H", self.udp_port_out),
504 self.fail("Invalid protocol")
505 self.assertEqual(3, nat44_ses_create_num)
506 self.assertEqual(3, nat44_ses_delete_num)
508 def verify_ipfix_addr_exhausted(self, data):
510 Verify IPFIX NAT addresses event
512 :param data: Decoded IPFIX data records
514 self.assertEqual(1, len(data))
517 self.assertEqual(ord(record[230]), 3)
519 self.assertEqual(struct.pack("!I", 0), record[283])
522 class TestNAT44(MethodHolder):
523 """ NAT44 Test Cases """
527 super(TestNAT44, cls).setUpClass()
530 cls.tcp_port_in = 6303
531 cls.tcp_port_out = 6303
532 cls.udp_port_in = 6304
533 cls.udp_port_out = 6304
534 cls.icmp_id_in = 6305
535 cls.icmp_id_out = 6305
536 cls.nat_addr = '10.0.0.3'
537 cls.ipfix_src_port = 4739
538 cls.ipfix_domain_id = 1
540 cls.create_pg_interfaces(range(9))
541 cls.interfaces = list(cls.pg_interfaces[0:4])
543 for i in cls.interfaces:
548 cls.pg0.generate_remote_hosts(3)
549 cls.pg0.configure_ipv4_neighbors()
551 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552 cls.vapi.ip_table_add_del(10, is_add=1)
553 cls.vapi.ip_table_add_del(20, is_add=1)
555 cls.pg4._local_ip4 = "172.16.255.1"
556 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
557 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
558 cls.pg4.set_table_ip4(10)
559 cls.pg5._local_ip4 = "172.17.255.3"
560 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
561 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
562 cls.pg5.set_table_ip4(10)
563 cls.pg6._local_ip4 = "172.16.255.1"
564 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
565 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
566 cls.pg6.set_table_ip4(20)
567 for i in cls.overlapping_interfaces:
576 super(TestNAT44, cls).tearDownClass()
579 def clear_nat44(self):
581 Clear NAT44 configuration.
583 # I found no elegant way to do this
584 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
585 dst_address_length=32,
586 next_hop_address=self.pg7.remote_ip4n,
587 next_hop_sw_if_index=self.pg7.sw_if_index,
589 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
590 dst_address_length=32,
591 next_hop_address=self.pg8.remote_ip4n,
592 next_hop_sw_if_index=self.pg8.sw_if_index,
595 for intf in [self.pg7, self.pg8]:
596 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
598 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
603 if self.pg7.has_ip4_config:
604 self.pg7.unconfig_ip4()
606 interfaces = self.vapi.nat44_interface_addr_dump()
607 for intf in interfaces:
608 self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
610 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
611 domain_id=self.ipfix_domain_id)
612 self.ipfix_src_port = 4739
613 self.ipfix_domain_id = 1
615 interfaces = self.vapi.nat44_interface_dump()
616 for intf in interfaces:
617 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
621 interfaces = self.vapi.nat44_interface_output_feature_dump()
622 for intf in interfaces:
623 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
627 static_mappings = self.vapi.nat44_static_mapping_dump()
628 for sm in static_mappings:
629 self.vapi.nat44_add_del_static_mapping(
631 sm.external_ip_address,
632 local_port=sm.local_port,
633 external_port=sm.external_port,
634 addr_only=sm.addr_only,
636 protocol=sm.protocol,
639 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
640 for lb_sm in lb_static_mappings:
641 self.vapi.nat44_add_del_lb_static_mapping(
648 adresses = self.vapi.nat44_address_dump()
649 for addr in adresses:
650 self.vapi.nat44_add_del_address_range(addr.ip_address,
654 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
655 local_port=0, external_port=0, vrf_id=0,
656 is_add=1, external_sw_if_index=0xFFFFFFFF,
659 Add/delete NAT44 static mapping
661 :param local_ip: Local IP address
662 :param external_ip: External IP address
663 :param local_port: Local port number (Optional)
664 :param external_port: External port number (Optional)
665 :param vrf_id: VRF ID (Default 0)
666 :param is_add: 1 if add, 0 if delete (Default add)
667 :param external_sw_if_index: External interface instead of IP address
668 :param proto: IP protocol (Mandatory if port specified)
671 if local_port and external_port:
673 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
674 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
675 self.vapi.nat44_add_del_static_mapping(
678 external_sw_if_index,
686 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
688 Add/delete NAT44 address
690 :param ip: IP address
691 :param is_add: 1 if add, 0 if delete (Default add)
693 nat_addr = socket.inet_pton(socket.AF_INET, ip)
694 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
697 def test_dynamic(self):
698 """ NAT44 dynamic translation test """
700 self.nat44_add_address(self.nat_addr)
701 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
702 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
706 pkts = self.create_stream_in(self.pg0, self.pg1)
707 self.pg0.add_stream(pkts)
708 self.pg_enable_capture(self.pg_interfaces)
710 capture = self.pg1.get_capture(len(pkts))
711 self.verify_capture_out(capture)
714 pkts = self.create_stream_out(self.pg1)
715 self.pg1.add_stream(pkts)
716 self.pg_enable_capture(self.pg_interfaces)
718 capture = self.pg0.get_capture(len(pkts))
719 self.verify_capture_in(capture, self.pg0)
721 def test_dynamic_icmp_errors_in2out_ttl_1(self):
722 """ NAT44 handling of client packets with TTL=1 """
724 self.nat44_add_address(self.nat_addr)
725 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
726 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
729 # Client side - generate traffic
730 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
731 self.pg0.add_stream(pkts)
732 self.pg_enable_capture(self.pg_interfaces)
735 # Client side - verify ICMP type 11 packets
736 capture = self.pg0.get_capture(len(pkts))
737 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
739 def test_dynamic_icmp_errors_out2in_ttl_1(self):
740 """ NAT44 handling of server packets with TTL=1 """
742 self.nat44_add_address(self.nat_addr)
743 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
744 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
747 # Client side - create sessions
748 pkts = self.create_stream_in(self.pg0, self.pg1)
749 self.pg0.add_stream(pkts)
750 self.pg_enable_capture(self.pg_interfaces)
753 # Server side - generate traffic
754 capture = self.pg1.get_capture(len(pkts))
755 self.verify_capture_out(capture)
756 pkts = self.create_stream_out(self.pg1, ttl=1)
757 self.pg1.add_stream(pkts)
758 self.pg_enable_capture(self.pg_interfaces)
761 # Server side - verify ICMP type 11 packets
762 capture = self.pg1.get_capture(len(pkts))
763 self.verify_capture_out_with_icmp_errors(capture,
764 src_ip=self.pg1.local_ip4)
766 def test_dynamic_icmp_errors_in2out_ttl_2(self):
767 """ NAT44 handling of error responses to client packets with TTL=2 """
769 self.nat44_add_address(self.nat_addr)
770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
774 # Client side - generate traffic
775 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
776 self.pg0.add_stream(pkts)
777 self.pg_enable_capture(self.pg_interfaces)
780 # Server side - simulate ICMP type 11 response
781 capture = self.pg1.get_capture(len(pkts))
782 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
783 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
784 ICMP(type=11) / packet[IP] for packet in capture]
785 self.pg1.add_stream(pkts)
786 self.pg_enable_capture(self.pg_interfaces)
789 # Client side - verify ICMP type 11 packets
790 capture = self.pg0.get_capture(len(pkts))
791 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
793 def test_dynamic_icmp_errors_out2in_ttl_2(self):
794 """ NAT44 handling of error responses to server packets with TTL=2 """
796 self.nat44_add_address(self.nat_addr)
797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
798 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
801 # Client side - create sessions
802 pkts = self.create_stream_in(self.pg0, self.pg1)
803 self.pg0.add_stream(pkts)
804 self.pg_enable_capture(self.pg_interfaces)
807 # Server side - generate traffic
808 capture = self.pg1.get_capture(len(pkts))
809 self.verify_capture_out(capture)
810 pkts = self.create_stream_out(self.pg1, ttl=2)
811 self.pg1.add_stream(pkts)
812 self.pg_enable_capture(self.pg_interfaces)
815 # Client side - simulate ICMP type 11 response
816 capture = self.pg0.get_capture(len(pkts))
817 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
818 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
819 ICMP(type=11) / packet[IP] for packet in capture]
820 self.pg0.add_stream(pkts)
821 self.pg_enable_capture(self.pg_interfaces)
824 # Server side - verify ICMP type 11 packets
825 capture = self.pg1.get_capture(len(pkts))
826 self.verify_capture_out_with_icmp_errors(capture)
828 def test_ping_out_interface_from_outside(self):
829 """ Ping NAT44 out interface from outside network """
831 self.nat44_add_address(self.nat_addr)
832 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
833 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
836 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
837 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
838 ICMP(id=self.icmp_id_out, type='echo-request'))
840 self.pg1.add_stream(pkts)
841 self.pg_enable_capture(self.pg_interfaces)
843 capture = self.pg1.get_capture(len(pkts))
844 self.assertEqual(1, len(capture))
847 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
848 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
849 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
850 self.assertEqual(packet[ICMP].type, 0) # echo reply
852 self.logger.error(ppp("Unexpected or invalid packet "
853 "(outside network):", packet))
856 def test_ping_internal_host_from_outside(self):
857 """ Ping internal host from outside network """
859 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
860 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
861 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
865 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
866 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
867 ICMP(id=self.icmp_id_out, type='echo-request'))
868 self.pg1.add_stream(pkt)
869 self.pg_enable_capture(self.pg_interfaces)
871 capture = self.pg0.get_capture(1)
872 self.verify_capture_in(capture, self.pg0, packet_num=1)
873 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
876 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
877 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
878 ICMP(id=self.icmp_id_in, type='echo-reply'))
879 self.pg0.add_stream(pkt)
880 self.pg_enable_capture(self.pg_interfaces)
882 capture = self.pg1.get_capture(1)
883 self.verify_capture_out(capture, same_port=True, packet_num=1)
884 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
886 def test_static_in(self):
887 """ 1:1 NAT initialized from inside network """
890 self.tcp_port_out = 6303
891 self.udp_port_out = 6304
892 self.icmp_id_out = 6305
894 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
895 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
896 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
900 pkts = self.create_stream_in(self.pg0, self.pg1)
901 self.pg0.add_stream(pkts)
902 self.pg_enable_capture(self.pg_interfaces)
904 capture = self.pg1.get_capture(len(pkts))
905 self.verify_capture_out(capture, nat_ip, True)
908 pkts = self.create_stream_out(self.pg1, nat_ip)
909 self.pg1.add_stream(pkts)
910 self.pg_enable_capture(self.pg_interfaces)
912 capture = self.pg0.get_capture(len(pkts))
913 self.verify_capture_in(capture, self.pg0)
915 def test_static_out(self):
916 """ 1:1 NAT initialized from outside network """
919 self.tcp_port_out = 6303
920 self.udp_port_out = 6304
921 self.icmp_id_out = 6305
923 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
924 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
925 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
929 pkts = self.create_stream_out(self.pg1, nat_ip)
930 self.pg1.add_stream(pkts)
931 self.pg_enable_capture(self.pg_interfaces)
933 capture = self.pg0.get_capture(len(pkts))
934 self.verify_capture_in(capture, self.pg0)
937 pkts = self.create_stream_in(self.pg0, self.pg1)
938 self.pg0.add_stream(pkts)
939 self.pg_enable_capture(self.pg_interfaces)
941 capture = self.pg1.get_capture(len(pkts))
942 self.verify_capture_out(capture, nat_ip, True)
944 def test_static_with_port_in(self):
945 """ 1:1 NAPT initialized from inside network """
947 self.tcp_port_out = 3606
948 self.udp_port_out = 3607
949 self.icmp_id_out = 3608
951 self.nat44_add_address(self.nat_addr)
952 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
953 self.tcp_port_in, self.tcp_port_out,
955 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
956 self.udp_port_in, self.udp_port_out,
958 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
959 self.icmp_id_in, self.icmp_id_out,
960 proto=IP_PROTOS.icmp)
961 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
962 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
966 pkts = self.create_stream_in(self.pg0, self.pg1)
967 self.pg0.add_stream(pkts)
968 self.pg_enable_capture(self.pg_interfaces)
970 capture = self.pg1.get_capture(len(pkts))
971 self.verify_capture_out(capture)
974 pkts = self.create_stream_out(self.pg1)
975 self.pg1.add_stream(pkts)
976 self.pg_enable_capture(self.pg_interfaces)
978 capture = self.pg0.get_capture(len(pkts))
979 self.verify_capture_in(capture, self.pg0)
981 def test_static_with_port_out(self):
982 """ 1:1 NAPT initialized from outside network """
984 self.tcp_port_out = 30606
985 self.udp_port_out = 30607
986 self.icmp_id_out = 30608
988 self.nat44_add_address(self.nat_addr)
989 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
990 self.tcp_port_in, self.tcp_port_out,
992 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
993 self.udp_port_in, self.udp_port_out,
995 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
996 self.icmp_id_in, self.icmp_id_out,
997 proto=IP_PROTOS.icmp)
998 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
999 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1003 pkts = self.create_stream_out(self.pg1)
1004 self.pg1.add_stream(pkts)
1005 self.pg_enable_capture(self.pg_interfaces)
1007 capture = self.pg0.get_capture(len(pkts))
1008 self.verify_capture_in(capture, self.pg0)
1011 pkts = self.create_stream_in(self.pg0, self.pg1)
1012 self.pg0.add_stream(pkts)
1013 self.pg_enable_capture(self.pg_interfaces)
1015 capture = self.pg1.get_capture(len(pkts))
1016 self.verify_capture_out(capture)
1018 def test_static_vrf_aware(self):
1019 """ 1:1 NAT VRF awareness """
1021 nat_ip1 = "10.0.0.30"
1022 nat_ip2 = "10.0.0.40"
1023 self.tcp_port_out = 6303
1024 self.udp_port_out = 6304
1025 self.icmp_id_out = 6305
1027 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1029 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1031 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1033 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1034 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1036 # inside interface VRF match NAT44 static mapping VRF
1037 pkts = self.create_stream_in(self.pg4, self.pg3)
1038 self.pg4.add_stream(pkts)
1039 self.pg_enable_capture(self.pg_interfaces)
1041 capture = self.pg3.get_capture(len(pkts))
1042 self.verify_capture_out(capture, nat_ip1, True)
1044 # inside interface VRF don't match NAT44 static mapping VRF (packets
1046 pkts = self.create_stream_in(self.pg0, self.pg3)
1047 self.pg0.add_stream(pkts)
1048 self.pg_enable_capture(self.pg_interfaces)
1050 self.pg3.assert_nothing_captured()
1052 def test_static_lb(self):
1053 """ NAT44 local service load balancing """
1054 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1057 server1 = self.pg0.remote_hosts[0]
1058 server2 = self.pg0.remote_hosts[1]
1060 locals = [{'addr': server1.ip4n,
1063 {'addr': server2.ip4n,
1067 self.nat44_add_address(self.nat_addr)
1068 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1071 local_num=len(locals),
1073 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1074 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1077 # from client to service
1078 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1079 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1080 TCP(sport=12345, dport=external_port))
1081 self.pg1.add_stream(p)
1082 self.pg_enable_capture(self.pg_interfaces)
1084 capture = self.pg0.get_capture(1)
1090 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1091 if ip.dst == server1.ip4:
1095 self.assertEqual(tcp.dport, local_port)
1096 self.check_tcp_checksum(p)
1097 self.check_ip_checksum(p)
1099 self.logger.error(ppp("Unexpected or invalid packet:", p))
1102 # from service back to client
1103 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1104 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1105 TCP(sport=local_port, dport=12345))
1106 self.pg0.add_stream(p)
1107 self.pg_enable_capture(self.pg_interfaces)
1109 capture = self.pg1.get_capture(1)
1114 self.assertEqual(ip.src, self.nat_addr)
1115 self.assertEqual(tcp.sport, external_port)
1116 self.check_tcp_checksum(p)
1117 self.check_ip_checksum(p)
1119 self.logger.error(ppp("Unexpected or invalid packet:", p))
1125 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1127 for client in clients:
1128 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1129 IP(src=client, dst=self.nat_addr) /
1130 TCP(sport=12345, dport=external_port))
1132 self.pg1.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg0.get_capture(len(pkts))
1137 if p[IP].dst == server1.ip4:
1141 self.assertTrue(server1_n > server2_n)
1143 def test_multiple_inside_interfaces(self):
1144 """ NAT44 multiple non-overlapping address space inside interfaces """
1146 self.nat44_add_address(self.nat_addr)
1147 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1148 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1149 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1152 # between two NAT44 inside interfaces (no translation)
1153 pkts = self.create_stream_in(self.pg0, self.pg1)
1154 self.pg0.add_stream(pkts)
1155 self.pg_enable_capture(self.pg_interfaces)
1157 capture = self.pg1.get_capture(len(pkts))
1158 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1160 # from NAT44 inside to interface without NAT44 feature (no translation)
1161 pkts = self.create_stream_in(self.pg0, self.pg2)
1162 self.pg0.add_stream(pkts)
1163 self.pg_enable_capture(self.pg_interfaces)
1165 capture = self.pg2.get_capture(len(pkts))
1166 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1168 # in2out 1st interface
1169 pkts = self.create_stream_in(self.pg0, self.pg3)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1173 capture = self.pg3.get_capture(len(pkts))
1174 self.verify_capture_out(capture)
1176 # out2in 1st interface
1177 pkts = self.create_stream_out(self.pg3)
1178 self.pg3.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1181 capture = self.pg0.get_capture(len(pkts))
1182 self.verify_capture_in(capture, self.pg0)
1184 # in2out 2nd interface
1185 pkts = self.create_stream_in(self.pg1, self.pg3)
1186 self.pg1.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1189 capture = self.pg3.get_capture(len(pkts))
1190 self.verify_capture_out(capture)
1192 # out2in 2nd interface
1193 pkts = self.create_stream_out(self.pg3)
1194 self.pg3.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1197 capture = self.pg1.get_capture(len(pkts))
1198 self.verify_capture_in(capture, self.pg1)
1200 def test_inside_overlapping_interfaces(self):
1201 """ NAT44 multiple inside interfaces with overlapping address space """
1203 static_nat_ip = "10.0.0.10"
1204 self.nat44_add_address(self.nat_addr)
1205 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1207 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1208 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1209 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1210 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1213 # between NAT44 inside interfaces with same VRF (no translation)
1214 pkts = self.create_stream_in(self.pg4, self.pg5)
1215 self.pg4.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1218 capture = self.pg5.get_capture(len(pkts))
1219 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1221 # between NAT44 inside interfaces with different VRF (hairpinning)
1222 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1223 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1224 TCP(sport=1234, dport=5678))
1225 self.pg4.add_stream(p)
1226 self.pg_enable_capture(self.pg_interfaces)
1228 capture = self.pg6.get_capture(1)
1233 self.assertEqual(ip.src, self.nat_addr)
1234 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1235 self.assertNotEqual(tcp.sport, 1234)
1236 self.assertEqual(tcp.dport, 5678)
1238 self.logger.error(ppp("Unexpected or invalid packet:", p))
1241 # in2out 1st interface
1242 pkts = self.create_stream_in(self.pg4, self.pg3)
1243 self.pg4.add_stream(pkts)
1244 self.pg_enable_capture(self.pg_interfaces)
1246 capture = self.pg3.get_capture(len(pkts))
1247 self.verify_capture_out(capture)
1249 # out2in 1st interface
1250 pkts = self.create_stream_out(self.pg3)
1251 self.pg3.add_stream(pkts)
1252 self.pg_enable_capture(self.pg_interfaces)
1254 capture = self.pg4.get_capture(len(pkts))
1255 self.verify_capture_in(capture, self.pg4)
1257 # in2out 2nd interface
1258 pkts = self.create_stream_in(self.pg5, self.pg3)
1259 self.pg5.add_stream(pkts)
1260 self.pg_enable_capture(self.pg_interfaces)
1262 capture = self.pg3.get_capture(len(pkts))
1263 self.verify_capture_out(capture)
1265 # out2in 2nd interface
1266 pkts = self.create_stream_out(self.pg3)
1267 self.pg3.add_stream(pkts)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg5.get_capture(len(pkts))
1271 self.verify_capture_in(capture, self.pg5)
1274 addresses = self.vapi.nat44_address_dump()
1275 self.assertEqual(len(addresses), 1)
1276 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1277 self.assertEqual(len(sessions), 3)
1278 for session in sessions:
1279 self.assertFalse(session.is_static)
1280 self.assertEqual(session.inside_ip_address[0:4],
1281 self.pg5.remote_ip4n)
1282 self.assertEqual(session.outside_ip_address,
1283 addresses[0].ip_address)
1284 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1285 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1286 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1287 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1288 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1289 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1290 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1291 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1292 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1294 # in2out 3rd interface
1295 pkts = self.create_stream_in(self.pg6, self.pg3)
1296 self.pg6.add_stream(pkts)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg3.get_capture(len(pkts))
1300 self.verify_capture_out(capture, static_nat_ip, True)
1302 # out2in 3rd interface
1303 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1304 self.pg3.add_stream(pkts)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg6.get_capture(len(pkts))
1308 self.verify_capture_in(capture, self.pg6)
1310 # general user and session dump verifications
1311 users = self.vapi.nat44_user_dump()
1312 self.assertTrue(len(users) >= 3)
1313 addresses = self.vapi.nat44_address_dump()
1314 self.assertEqual(len(addresses), 1)
1316 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1318 for session in sessions:
1319 self.assertEqual(user.ip_address, session.inside_ip_address)
1320 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1321 self.assertTrue(session.protocol in
1322 [IP_PROTOS.tcp, IP_PROTOS.udp,
1326 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1327 self.assertTrue(len(sessions) >= 4)
1328 for session in sessions:
1329 self.assertFalse(session.is_static)
1330 self.assertEqual(session.inside_ip_address[0:4],
1331 self.pg4.remote_ip4n)
1332 self.assertEqual(session.outside_ip_address,
1333 addresses[0].ip_address)
1336 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1337 self.assertTrue(len(sessions) >= 3)
1338 for session in sessions:
1339 self.assertTrue(session.is_static)
1340 self.assertEqual(session.inside_ip_address[0:4],
1341 self.pg6.remote_ip4n)
1342 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1343 map(int, static_nat_ip.split('.')))
1344 self.assertTrue(session.inside_port in
1345 [self.tcp_port_in, self.udp_port_in,
1348 def test_hairpinning(self):
1349 """ NAT44 hairpinning - 1:1 NAPT """
1351 host = self.pg0.remote_hosts[0]
1352 server = self.pg0.remote_hosts[1]
1355 server_in_port = 5678
1356 server_out_port = 8765
1358 self.nat44_add_address(self.nat_addr)
1359 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1360 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1362 # add static mapping for server
1363 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1364 server_in_port, server_out_port,
1365 proto=IP_PROTOS.tcp)
1367 # send packet from host to server
1368 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1369 IP(src=host.ip4, dst=self.nat_addr) /
1370 TCP(sport=host_in_port, dport=server_out_port))
1371 self.pg0.add_stream(p)
1372 self.pg_enable_capture(self.pg_interfaces)
1374 capture = self.pg0.get_capture(1)
1379 self.assertEqual(ip.src, self.nat_addr)
1380 self.assertEqual(ip.dst, server.ip4)
1381 self.assertNotEqual(tcp.sport, host_in_port)
1382 self.assertEqual(tcp.dport, server_in_port)
1383 self.check_tcp_checksum(p)
1384 host_out_port = tcp.sport
1386 self.logger.error(ppp("Unexpected or invalid packet:", p))
1389 # send reply from server to host
1390 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1391 IP(src=server.ip4, dst=self.nat_addr) /
1392 TCP(sport=server_in_port, dport=host_out_port))
1393 self.pg0.add_stream(p)
1394 self.pg_enable_capture(self.pg_interfaces)
1396 capture = self.pg0.get_capture(1)
1401 self.assertEqual(ip.src, self.nat_addr)
1402 self.assertEqual(ip.dst, host.ip4)
1403 self.assertEqual(tcp.sport, server_out_port)
1404 self.assertEqual(tcp.dport, host_in_port)
1405 self.check_tcp_checksum(p)
1407 self.logger.error(ppp("Unexpected or invalid packet:"), p)
1410 def test_hairpinning2(self):
1411 """ NAT44 hairpinning - 1:1 NAT"""
1413 server1_nat_ip = "10.0.0.10"
1414 server2_nat_ip = "10.0.0.11"
1415 host = self.pg0.remote_hosts[0]
1416 server1 = self.pg0.remote_hosts[1]
1417 server2 = self.pg0.remote_hosts[2]
1418 server_tcp_port = 22
1419 server_udp_port = 20
1421 self.nat44_add_address(self.nat_addr)
1422 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1423 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1426 # add static mapping for servers
1427 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1428 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433 IP(src=host.ip4, dst=server1_nat_ip) /
1434 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1436 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1437 IP(src=host.ip4, dst=server1_nat_ip) /
1438 UDP(sport=self.udp_port_in, dport=server_udp_port))
1440 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1441 IP(src=host.ip4, dst=server1_nat_ip) /
1442 ICMP(id=self.icmp_id_in, type='echo-request'))
1444 self.pg0.add_stream(pkts)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 capture = self.pg0.get_capture(len(pkts))
1448 for packet in capture:
1450 self.assertEqual(packet[IP].src, self.nat_addr)
1451 self.assertEqual(packet[IP].dst, server1.ip4)
1452 if packet.haslayer(TCP):
1453 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1454 self.assertEqual(packet[TCP].dport, server_tcp_port)
1455 self.tcp_port_out = packet[TCP].sport
1456 self.check_tcp_checksum(packet)
1457 elif packet.haslayer(UDP):
1458 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1459 self.assertEqual(packet[UDP].dport, server_udp_port)
1460 self.udp_port_out = packet[UDP].sport
1462 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1463 self.icmp_id_out = packet[ICMP].id
1465 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1470 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1471 IP(src=server1.ip4, dst=self.nat_addr) /
1472 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1474 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1475 IP(src=server1.ip4, dst=self.nat_addr) /
1476 UDP(sport=server_udp_port, dport=self.udp_port_out))
1478 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1479 IP(src=server1.ip4, dst=self.nat_addr) /
1480 ICMP(id=self.icmp_id_out, type='echo-reply'))
1482 self.pg0.add_stream(pkts)
1483 self.pg_enable_capture(self.pg_interfaces)
1485 capture = self.pg0.get_capture(len(pkts))
1486 for packet in capture:
1488 self.assertEqual(packet[IP].src, server1_nat_ip)
1489 self.assertEqual(packet[IP].dst, host.ip4)
1490 if packet.haslayer(TCP):
1491 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1492 self.assertEqual(packet[TCP].sport, server_tcp_port)
1493 self.check_tcp_checksum(packet)
1494 elif packet.haslayer(UDP):
1495 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1496 self.assertEqual(packet[UDP].sport, server_udp_port)
1498 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1500 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1503 # server2 to server1
1505 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1506 IP(src=server2.ip4, dst=server1_nat_ip) /
1507 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1509 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1510 IP(src=server2.ip4, dst=server1_nat_ip) /
1511 UDP(sport=self.udp_port_in, dport=server_udp_port))
1513 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1514 IP(src=server2.ip4, dst=server1_nat_ip) /
1515 ICMP(id=self.icmp_id_in, type='echo-request'))
1517 self.pg0.add_stream(pkts)
1518 self.pg_enable_capture(self.pg_interfaces)
1520 capture = self.pg0.get_capture(len(pkts))
1521 for packet in capture:
1523 self.assertEqual(packet[IP].src, server2_nat_ip)
1524 self.assertEqual(packet[IP].dst, server1.ip4)
1525 if packet.haslayer(TCP):
1526 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1527 self.assertEqual(packet[TCP].dport, server_tcp_port)
1528 self.tcp_port_out = packet[TCP].sport
1529 self.check_tcp_checksum(packet)
1530 elif packet.haslayer(UDP):
1531 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1532 self.assertEqual(packet[UDP].dport, server_udp_port)
1533 self.udp_port_out = packet[UDP].sport
1535 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1536 self.icmp_id_out = packet[ICMP].id
1538 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1541 # server1 to server2
1543 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1544 IP(src=server1.ip4, dst=server2_nat_ip) /
1545 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1547 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1548 IP(src=server1.ip4, dst=server2_nat_ip) /
1549 UDP(sport=server_udp_port, dport=self.udp_port_out))
1551 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1552 IP(src=server1.ip4, dst=server2_nat_ip) /
1553 ICMP(id=self.icmp_id_out, type='echo-reply'))
1555 self.pg0.add_stream(pkts)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg0.get_capture(len(pkts))
1559 for packet in capture:
1561 self.assertEqual(packet[IP].src, server1_nat_ip)
1562 self.assertEqual(packet[IP].dst, server2.ip4)
1563 if packet.haslayer(TCP):
1564 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1565 self.assertEqual(packet[TCP].sport, server_tcp_port)
1566 self.check_tcp_checksum(packet)
1567 elif packet.haslayer(UDP):
1568 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1569 self.assertEqual(packet[UDP].sport, server_udp_port)
1571 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1573 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1576 def test_max_translations_per_user(self):
1577 """ MAX translations per user - recycle the least recently used """
1579 self.nat44_add_address(self.nat_addr)
1580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1584 # get maximum number of translations per user
1585 nat44_config = self.vapi.nat_show_config()
1587 # send more than maximum number of translations per user packets
1588 pkts_num = nat44_config.max_translations_per_user + 5
1590 for port in range(0, pkts_num):
1591 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1592 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1593 TCP(sport=1025 + port))
1595 self.pg0.add_stream(pkts)
1596 self.pg_enable_capture(self.pg_interfaces)
1599 # verify number of translated packet
1600 self.pg1.get_capture(pkts_num)
1602 def test_interface_addr(self):
1603 """ Acquire NAT44 addresses from interface """
1604 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1606 # no address in NAT pool
1607 adresses = self.vapi.nat44_address_dump()
1608 self.assertEqual(0, len(adresses))
1610 # configure interface address and check NAT address pool
1611 self.pg7.config_ip4()
1612 adresses = self.vapi.nat44_address_dump()
1613 self.assertEqual(1, len(adresses))
1614 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1616 # remove interface address and check NAT address pool
1617 self.pg7.unconfig_ip4()
1618 adresses = self.vapi.nat44_address_dump()
1619 self.assertEqual(0, len(adresses))
1621 def test_interface_addr_static_mapping(self):
1622 """ Static mapping with addresses from interface """
1623 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1624 self.nat44_add_static_mapping(
1626 external_sw_if_index=self.pg7.sw_if_index)
1628 # static mappings with external interface
1629 static_mappings = self.vapi.nat44_static_mapping_dump()
1630 self.assertEqual(1, len(static_mappings))
1631 self.assertEqual(self.pg7.sw_if_index,
1632 static_mappings[0].external_sw_if_index)
1634 # configure interface address and check static mappings
1635 self.pg7.config_ip4()
1636 static_mappings = self.vapi.nat44_static_mapping_dump()
1637 self.assertEqual(1, len(static_mappings))
1638 self.assertEqual(static_mappings[0].external_ip_address[0:4],
1639 self.pg7.local_ip4n)
1640 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1642 # remove interface address and check static mappings
1643 self.pg7.unconfig_ip4()
1644 static_mappings = self.vapi.nat44_static_mapping_dump()
1645 self.assertEqual(0, len(static_mappings))
1647 def test_ipfix_nat44_sess(self):
1648 """ IPFIX logging NAT44 session created/delted """
1649 self.ipfix_domain_id = 10
1650 self.ipfix_src_port = 20202
1651 colector_port = 30303
1652 bind_layers(UDP, IPFIX, dport=30303)
1653 self.nat44_add_address(self.nat_addr)
1654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1657 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1658 src_address=self.pg3.local_ip4n,
1660 template_interval=10,
1661 collector_port=colector_port)
1662 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1663 src_port=self.ipfix_src_port)
1665 pkts = self.create_stream_in(self.pg0, self.pg1)
1666 self.pg0.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg1.get_capture(len(pkts))
1670 self.verify_capture_out(capture)
1671 self.nat44_add_address(self.nat_addr, is_add=0)
1672 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1673 capture = self.pg3.get_capture(3)
1674 ipfix = IPFIXDecoder()
1675 # first load template
1677 self.assertTrue(p.haslayer(IPFIX))
1678 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1679 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1680 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1681 self.assertEqual(p[UDP].dport, colector_port)
1682 self.assertEqual(p[IPFIX].observationDomainID,
1683 self.ipfix_domain_id)
1684 if p.haslayer(Template):
1685 ipfix.add_template(p.getlayer(Template))
1686 # verify events in data set
1688 if p.haslayer(Data):
1689 data = ipfix.decode_data_set(p.getlayer(Set))
1690 self.verify_ipfix_nat44_ses(data)
1692 def test_ipfix_addr_exhausted(self):
1693 """ IPFIX logging NAT addresses exhausted """
1694 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1695 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1697 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1698 src_address=self.pg3.local_ip4n,
1700 template_interval=10)
1701 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1702 src_port=self.ipfix_src_port)
1704 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1705 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1707 self.pg0.add_stream(p)
1708 self.pg_enable_capture(self.pg_interfaces)
1710 capture = self.pg1.get_capture(0)
1711 self.vapi.cli("ipfix flush") # FIXME this should be an API call
1712 capture = self.pg3.get_capture(3)
1713 ipfix = IPFIXDecoder()
1714 # first load template
1716 self.assertTrue(p.haslayer(IPFIX))
1717 self.assertEqual(p[IP].src, self.pg3.local_ip4)
1718 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1719 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1720 self.assertEqual(p[UDP].dport, 4739)
1721 self.assertEqual(p[IPFIX].observationDomainID,
1722 self.ipfix_domain_id)
1723 if p.haslayer(Template):
1724 ipfix.add_template(p.getlayer(Template))
1725 # verify events in data set
1727 if p.haslayer(Data):
1728 data = ipfix.decode_data_set(p.getlayer(Set))
1729 self.verify_ipfix_addr_exhausted(data)
1731 def test_pool_addr_fib(self):
1732 """ NAT44 add pool addresses to FIB """
1733 static_addr = '10.0.0.10'
1734 self.nat44_add_address(self.nat_addr)
1735 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1736 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1738 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1741 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1742 ARP(op=ARP.who_has, pdst=self.nat_addr,
1743 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1744 self.pg1.add_stream(p)
1745 self.pg_enable_capture(self.pg_interfaces)
1747 capture = self.pg1.get_capture(1)
1748 self.assertTrue(capture[0].haslayer(ARP))
1749 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1752 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1753 ARP(op=ARP.who_has, pdst=static_addr,
1754 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1755 self.pg1.add_stream(p)
1756 self.pg_enable_capture(self.pg_interfaces)
1758 capture = self.pg1.get_capture(1)
1759 self.assertTrue(capture[0].haslayer(ARP))
1760 self.assertTrue(capture[0][ARP].op, ARP.is_at)
1762 # send ARP to non-NAT44 interface
1763 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1764 ARP(op=ARP.who_has, pdst=self.nat_addr,
1765 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1766 self.pg2.add_stream(p)
1767 self.pg_enable_capture(self.pg_interfaces)
1769 capture = self.pg1.get_capture(0)
1771 # remove addresses and verify
1772 self.nat44_add_address(self.nat_addr, is_add=0)
1773 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1776 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1777 ARP(op=ARP.who_has, pdst=self.nat_addr,
1778 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg1.get_capture(0)
1784 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1785 ARP(op=ARP.who_has, pdst=static_addr,
1786 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1787 self.pg1.add_stream(p)
1788 self.pg_enable_capture(self.pg_interfaces)
1790 capture = self.pg1.get_capture(0)
1792 def test_vrf_mode(self):
1793 """ NAT44 tenant VRF aware address pool mode """
1797 nat_ip1 = "10.0.0.10"
1798 nat_ip2 = "10.0.0.11"
1800 self.pg0.unconfig_ip4()
1801 self.pg1.unconfig_ip4()
1802 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1803 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1804 self.pg0.set_table_ip4(vrf_id1)
1805 self.pg1.set_table_ip4(vrf_id2)
1806 self.pg0.config_ip4()
1807 self.pg1.config_ip4()
1809 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1810 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1811 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1812 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1813 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1817 pkts = self.create_stream_in(self.pg0, self.pg2)
1818 self.pg0.add_stream(pkts)
1819 self.pg_enable_capture(self.pg_interfaces)
1821 capture = self.pg2.get_capture(len(pkts))
1822 self.verify_capture_out(capture, nat_ip1)
1825 pkts = self.create_stream_in(self.pg1, self.pg2)
1826 self.pg1.add_stream(pkts)
1827 self.pg_enable_capture(self.pg_interfaces)
1829 capture = self.pg2.get_capture(len(pkts))
1830 self.verify_capture_out(capture, nat_ip2)
1832 self.pg0.unconfig_ip4()
1833 self.pg1.unconfig_ip4()
1834 self.pg0.set_table_ip4(0)
1835 self.pg1.set_table_ip4(0)
1836 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1837 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1839 def test_vrf_feature_independent(self):
1840 """ NAT44 tenant VRF independent address pool mode """
1842 nat_ip1 = "10.0.0.10"
1843 nat_ip2 = "10.0.0.11"
1845 self.nat44_add_address(nat_ip1)
1846 self.nat44_add_address(nat_ip2)
1847 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1848 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1849 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1853 pkts = self.create_stream_in(self.pg0, self.pg2)
1854 self.pg0.add_stream(pkts)
1855 self.pg_enable_capture(self.pg_interfaces)
1857 capture = self.pg2.get_capture(len(pkts))
1858 self.verify_capture_out(capture, nat_ip1)
1861 pkts = self.create_stream_in(self.pg1, self.pg2)
1862 self.pg1.add_stream(pkts)
1863 self.pg_enable_capture(self.pg_interfaces)
1865 capture = self.pg2.get_capture(len(pkts))
1866 self.verify_capture_out(capture, nat_ip1)
1868 def test_dynamic_ipless_interfaces(self):
1869 """ NAT44 interfaces without configured IP address """
1871 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1872 self.pg7.remote_mac,
1873 self.pg7.remote_ip4n,
1875 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1876 self.pg8.remote_mac,
1877 self.pg8.remote_ip4n,
1880 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1881 dst_address_length=32,
1882 next_hop_address=self.pg7.remote_ip4n,
1883 next_hop_sw_if_index=self.pg7.sw_if_index)
1884 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1885 dst_address_length=32,
1886 next_hop_address=self.pg8.remote_ip4n,
1887 next_hop_sw_if_index=self.pg8.sw_if_index)
1889 self.nat44_add_address(self.nat_addr)
1890 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1891 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1895 pkts = self.create_stream_in(self.pg7, self.pg8)
1896 self.pg7.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg8.get_capture(len(pkts))
1900 self.verify_capture_out(capture)
1903 pkts = self.create_stream_out(self.pg8, self.nat_addr)
1904 self.pg8.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg7.get_capture(len(pkts))
1908 self.verify_capture_in(capture, self.pg7)
1910 def test_static_ipless_interfaces(self):
1911 """ NAT44 interfaces without configured IP address - 1:1 NAT """
1913 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1914 self.pg7.remote_mac,
1915 self.pg7.remote_ip4n,
1917 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1918 self.pg8.remote_mac,
1919 self.pg8.remote_ip4n,
1922 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1923 dst_address_length=32,
1924 next_hop_address=self.pg7.remote_ip4n,
1925 next_hop_sw_if_index=self.pg7.sw_if_index)
1926 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1927 dst_address_length=32,
1928 next_hop_address=self.pg8.remote_ip4n,
1929 next_hop_sw_if_index=self.pg8.sw_if_index)
1931 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1932 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1933 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1937 pkts = self.create_stream_out(self.pg8)
1938 self.pg8.add_stream(pkts)
1939 self.pg_enable_capture(self.pg_interfaces)
1941 capture = self.pg7.get_capture(len(pkts))
1942 self.verify_capture_in(capture, self.pg7)
1945 pkts = self.create_stream_in(self.pg7, self.pg8)
1946 self.pg7.add_stream(pkts)
1947 self.pg_enable_capture(self.pg_interfaces)
1949 capture = self.pg8.get_capture(len(pkts))
1950 self.verify_capture_out(capture, self.nat_addr, True)
1952 def test_static_with_port_ipless_interfaces(self):
1953 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1955 self.tcp_port_out = 30606
1956 self.udp_port_out = 30607
1957 self.icmp_id_out = 30608
1959 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1960 self.pg7.remote_mac,
1961 self.pg7.remote_ip4n,
1963 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1964 self.pg8.remote_mac,
1965 self.pg8.remote_ip4n,
1968 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1969 dst_address_length=32,
1970 next_hop_address=self.pg7.remote_ip4n,
1971 next_hop_sw_if_index=self.pg7.sw_if_index)
1972 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1973 dst_address_length=32,
1974 next_hop_address=self.pg8.remote_ip4n,
1975 next_hop_sw_if_index=self.pg8.sw_if_index)
1977 self.nat44_add_address(self.nat_addr)
1978 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1979 self.tcp_port_in, self.tcp_port_out,
1980 proto=IP_PROTOS.tcp)
1981 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1982 self.udp_port_in, self.udp_port_out,
1983 proto=IP_PROTOS.udp)
1984 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1985 self.icmp_id_in, self.icmp_id_out,
1986 proto=IP_PROTOS.icmp)
1987 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1988 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1992 pkts = self.create_stream_out(self.pg8)
1993 self.pg8.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg7.get_capture(len(pkts))
1997 self.verify_capture_in(capture, self.pg7)
2000 pkts = self.create_stream_in(self.pg7, self.pg8)
2001 self.pg7.add_stream(pkts)
2002 self.pg_enable_capture(self.pg_interfaces)
2004 capture = self.pg8.get_capture(len(pkts))
2005 self.verify_capture_out(capture)
2007 def test_static_unknown_proto(self):
2008 """ 1:1 NAT translate packet with unknown protocol """
2009 nat_ip = "10.0.0.10"
2010 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2011 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2012 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2016 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2017 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2019 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2020 TCP(sport=1234, dport=1234))
2021 self.pg0.add_stream(p)
2022 self.pg_enable_capture(self.pg_interfaces)
2024 p = self.pg1.get_capture(1)
2027 self.assertEqual(packet[IP].src, nat_ip)
2028 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2029 self.assertTrue(packet.haslayer(GRE))
2030 self.check_ip_checksum(packet)
2032 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2036 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2037 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2039 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2040 TCP(sport=1234, dport=1234))
2041 self.pg1.add_stream(p)
2042 self.pg_enable_capture(self.pg_interfaces)
2044 p = self.pg0.get_capture(1)
2047 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2048 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2049 self.assertTrue(packet.haslayer(GRE))
2050 self.check_ip_checksum(packet)
2052 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2055 def test_hairpinning_static_unknown_proto(self):
2056 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2058 host = self.pg0.remote_hosts[0]
2059 server = self.pg0.remote_hosts[1]
2061 host_nat_ip = "10.0.0.10"
2062 server_nat_ip = "10.0.0.11"
2064 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2065 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2066 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2067 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2071 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2072 IP(src=host.ip4, dst=server_nat_ip) /
2074 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2075 TCP(sport=1234, dport=1234))
2076 self.pg0.add_stream(p)
2077 self.pg_enable_capture(self.pg_interfaces)
2079 p = self.pg0.get_capture(1)
2082 self.assertEqual(packet[IP].src, host_nat_ip)
2083 self.assertEqual(packet[IP].dst, server.ip4)
2084 self.assertTrue(packet.haslayer(GRE))
2085 self.check_ip_checksum(packet)
2087 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2091 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2092 IP(src=server.ip4, dst=host_nat_ip) /
2094 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2095 TCP(sport=1234, dport=1234))
2096 self.pg0.add_stream(p)
2097 self.pg_enable_capture(self.pg_interfaces)
2099 p = self.pg0.get_capture(1)
2102 self.assertEqual(packet[IP].src, server_nat_ip)
2103 self.assertEqual(packet[IP].dst, host.ip4)
2104 self.assertTrue(packet.haslayer(GRE))
2105 self.check_ip_checksum(packet)
2107 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2110 def test_unknown_proto(self):
2111 """ NAT44 translate packet with unknown protocol """
2112 self.nat44_add_address(self.nat_addr)
2113 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2114 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2118 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2119 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2120 TCP(sport=self.tcp_port_in, dport=20))
2121 self.pg0.add_stream(p)
2122 self.pg_enable_capture(self.pg_interfaces)
2124 p = self.pg1.get_capture(1)
2126 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2127 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2129 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2130 TCP(sport=1234, dport=1234))
2131 self.pg0.add_stream(p)
2132 self.pg_enable_capture(self.pg_interfaces)
2134 p = self.pg1.get_capture(1)
2137 self.assertEqual(packet[IP].src, self.nat_addr)
2138 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2139 self.assertTrue(packet.haslayer(GRE))
2140 self.check_ip_checksum(packet)
2142 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2146 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2147 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2149 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2150 TCP(sport=1234, dport=1234))
2151 self.pg1.add_stream(p)
2152 self.pg_enable_capture(self.pg_interfaces)
2154 p = self.pg0.get_capture(1)
2157 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2158 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2159 self.assertTrue(packet.haslayer(GRE))
2160 self.check_ip_checksum(packet)
2162 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2165 def test_hairpinning_unknown_proto(self):
2166 """ NAT44 translate packet with unknown protocol - hairpinning """
2167 host = self.pg0.remote_hosts[0]
2168 server = self.pg0.remote_hosts[1]
2171 server_in_port = 5678
2172 server_out_port = 8765
2173 server_nat_ip = "10.0.0.11"
2175 self.nat44_add_address(self.nat_addr)
2176 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2177 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2180 # add static mapping for server
2181 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2184 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2185 IP(src=host.ip4, dst=server_nat_ip) /
2186 TCP(sport=host_in_port, dport=server_out_port))
2187 self.pg0.add_stream(p)
2188 self.pg_enable_capture(self.pg_interfaces)
2190 capture = self.pg0.get_capture(1)
2192 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2193 IP(src=host.ip4, dst=server_nat_ip) /
2195 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2196 TCP(sport=1234, dport=1234))
2197 self.pg0.add_stream(p)
2198 self.pg_enable_capture(self.pg_interfaces)
2200 p = self.pg0.get_capture(1)
2203 self.assertEqual(packet[IP].src, self.nat_addr)
2204 self.assertEqual(packet[IP].dst, server.ip4)
2205 self.assertTrue(packet.haslayer(GRE))
2206 self.check_ip_checksum(packet)
2208 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2212 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2213 IP(src=server.ip4, dst=self.nat_addr) /
2215 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2216 TCP(sport=1234, dport=1234))
2217 self.pg0.add_stream(p)
2218 self.pg_enable_capture(self.pg_interfaces)
2220 p = self.pg0.get_capture(1)
2223 self.assertEqual(packet[IP].src, server_nat_ip)
2224 self.assertEqual(packet[IP].dst, host.ip4)
2225 self.assertTrue(packet.haslayer(GRE))
2226 self.check_ip_checksum(packet)
2228 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2231 def test_output_feature(self):
2232 """ NAT44 interface output feature (in2out postrouting) """
2233 self.nat44_add_address(self.nat_addr)
2234 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2235 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2236 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2240 pkts = self.create_stream_in(self.pg0, self.pg3)
2241 self.pg0.add_stream(pkts)
2242 self.pg_enable_capture(self.pg_interfaces)
2244 capture = self.pg3.get_capture(len(pkts))
2245 self.verify_capture_out(capture)
2248 pkts = self.create_stream_out(self.pg3)
2249 self.pg3.add_stream(pkts)
2250 self.pg_enable_capture(self.pg_interfaces)
2252 capture = self.pg0.get_capture(len(pkts))
2253 self.verify_capture_in(capture, self.pg0)
2255 # from non-NAT interface to NAT inside interface
2256 pkts = self.create_stream_in(self.pg2, self.pg0)
2257 self.pg2.add_stream(pkts)
2258 self.pg_enable_capture(self.pg_interfaces)
2260 capture = self.pg0.get_capture(len(pkts))
2261 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2263 def test_output_feature_vrf_aware(self):
2264 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2265 nat_ip_vrf10 = "10.0.0.10"
2266 nat_ip_vrf20 = "10.0.0.20"
2268 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2269 dst_address_length=32,
2270 next_hop_address=self.pg3.remote_ip4n,
2271 next_hop_sw_if_index=self.pg3.sw_if_index,
2273 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2274 dst_address_length=32,
2275 next_hop_address=self.pg3.remote_ip4n,
2276 next_hop_sw_if_index=self.pg3.sw_if_index,
2279 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2280 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2281 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2282 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2283 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2287 pkts = self.create_stream_in(self.pg4, self.pg3)
2288 self.pg4.add_stream(pkts)
2289 self.pg_enable_capture(self.pg_interfaces)
2291 capture = self.pg3.get_capture(len(pkts))
2292 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2295 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2296 self.pg3.add_stream(pkts)
2297 self.pg_enable_capture(self.pg_interfaces)
2299 capture = self.pg4.get_capture(len(pkts))
2300 self.verify_capture_in(capture, self.pg4)
2303 pkts = self.create_stream_in(self.pg6, self.pg3)
2304 self.pg6.add_stream(pkts)
2305 self.pg_enable_capture(self.pg_interfaces)
2307 capture = self.pg3.get_capture(len(pkts))
2308 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2311 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2312 self.pg3.add_stream(pkts)
2313 self.pg_enable_capture(self.pg_interfaces)
2315 capture = self.pg6.get_capture(len(pkts))
2316 self.verify_capture_in(capture, self.pg6)
2318 def test_output_feature_hairpinning(self):
2319 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2320 host = self.pg0.remote_hosts[0]
2321 server = self.pg0.remote_hosts[1]
2324 server_in_port = 5678
2325 server_out_port = 8765
2327 self.nat44_add_address(self.nat_addr)
2328 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2329 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2332 # add static mapping for server
2333 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2334 server_in_port, server_out_port,
2335 proto=IP_PROTOS.tcp)
2337 # send packet from host to server
2338 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2339 IP(src=host.ip4, dst=self.nat_addr) /
2340 TCP(sport=host_in_port, dport=server_out_port))
2341 self.pg0.add_stream(p)
2342 self.pg_enable_capture(self.pg_interfaces)
2344 capture = self.pg0.get_capture(1)
2349 self.assertEqual(ip.src, self.nat_addr)
2350 self.assertEqual(ip.dst, server.ip4)
2351 self.assertNotEqual(tcp.sport, host_in_port)
2352 self.assertEqual(tcp.dport, server_in_port)
2353 self.check_tcp_checksum(p)
2354 host_out_port = tcp.sport
2356 self.logger.error(ppp("Unexpected or invalid packet:", p))
2359 # send reply from server to host
2360 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2361 IP(src=server.ip4, dst=self.nat_addr) /
2362 TCP(sport=server_in_port, dport=host_out_port))
2363 self.pg0.add_stream(p)
2364 self.pg_enable_capture(self.pg_interfaces)
2366 capture = self.pg0.get_capture(1)
2371 self.assertEqual(ip.src, self.nat_addr)
2372 self.assertEqual(ip.dst, host.ip4)
2373 self.assertEqual(tcp.sport, server_out_port)
2374 self.assertEqual(tcp.dport, host_in_port)
2375 self.check_tcp_checksum(p)
2377 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2381 super(TestNAT44, self).tearDown()
2382 if not self.vpp_dead:
2383 self.logger.info(self.vapi.cli("show nat44 verbose"))
2387 class TestDeterministicNAT(MethodHolder):
2388 """ Deterministic NAT Test Cases """
2391 def setUpConstants(cls):
2392 super(TestDeterministicNAT, cls).setUpConstants()
2393 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2396 def setUpClass(cls):
2397 super(TestDeterministicNAT, cls).setUpClass()
2400 cls.tcp_port_in = 6303
2401 cls.tcp_external_port = 6303
2402 cls.udp_port_in = 6304
2403 cls.udp_external_port = 6304
2404 cls.icmp_id_in = 6305
2405 cls.nat_addr = '10.0.0.3'
2407 cls.create_pg_interfaces(range(3))
2408 cls.interfaces = list(cls.pg_interfaces)
2410 for i in cls.interfaces:
2415 cls.pg0.generate_remote_hosts(2)
2416 cls.pg0.configure_ipv4_neighbors()
2419 super(TestDeterministicNAT, cls).tearDownClass()
2422 def create_stream_in(self, in_if, out_if, ttl=64):
2424 Create packet stream for inside network
2426 :param in_if: Inside interface
2427 :param out_if: Outside interface
2428 :param ttl: TTL of generated packets
2432 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2433 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2434 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2438 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2439 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2440 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2444 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2445 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2446 ICMP(id=self.icmp_id_in, type='echo-request'))
2451 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2453 Create packet stream for outside network
2455 :param out_if: Outside interface
2456 :param dst_ip: Destination IP address (Default use global NAT address)
2457 :param ttl: TTL of generated packets
2460 dst_ip = self.nat_addr
2463 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2464 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2465 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2469 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2470 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2471 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2475 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2476 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2477 ICMP(id=self.icmp_external_id, type='echo-reply'))
2482 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2484 Verify captured packets on outside network
2486 :param capture: Captured packets
2487 :param nat_ip: Translated IP address (Default use global NAT address)
2488 :param same_port: Sorce port number is not translated (Default False)
2489 :param packet_num: Expected number of packets (Default 3)
2492 nat_ip = self.nat_addr
2493 self.assertEqual(packet_num, len(capture))
2494 for packet in capture:
2496 self.assertEqual(packet[IP].src, nat_ip)
2497 if packet.haslayer(TCP):
2498 self.tcp_port_out = packet[TCP].sport
2499 elif packet.haslayer(UDP):
2500 self.udp_port_out = packet[UDP].sport
2502 self.icmp_external_id = packet[ICMP].id
2504 self.logger.error(ppp("Unexpected or invalid packet "
2505 "(outside network):", packet))
2508 def initiate_tcp_session(self, in_if, out_if):
2510 Initiates TCP session
2512 :param in_if: Inside interface
2513 :param out_if: Outside interface
2516 # SYN packet in->out
2517 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2518 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2519 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2522 self.pg_enable_capture(self.pg_interfaces)
2524 capture = out_if.get_capture(1)
2526 self.tcp_port_out = p[TCP].sport
2528 # SYN + ACK packet out->in
2529 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2530 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2531 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2533 out_if.add_stream(p)
2534 self.pg_enable_capture(self.pg_interfaces)
2536 in_if.get_capture(1)
2538 # ACK packet in->out
2539 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2540 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2541 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2544 self.pg_enable_capture(self.pg_interfaces)
2546 out_if.get_capture(1)
2549 self.logger.error("TCP 3 way handshake failed")
2552 def verify_ipfix_max_entries_per_user(self, data):
2554 Verify IPFIX maximum entries per user exceeded event
2556 :param data: Decoded IPFIX data records
2558 self.assertEqual(1, len(data))
2561 self.assertEqual(ord(record[230]), 13)
2562 # natQuotaExceededEvent
2563 self.assertEqual('\x03\x00\x00\x00', record[466])
2565 self.assertEqual(self.pg0.remote_ip4n, record[8])
2567 def test_deterministic_mode(self):
2568 """ NAT plugin run deterministic mode """
2569 in_addr = '172.16.255.0'
2570 out_addr = '172.17.255.50'
2571 in_addr_t = '172.16.255.20'
2572 in_addr_n = socket.inet_aton(in_addr)
2573 out_addr_n = socket.inet_aton(out_addr)
2574 in_addr_t_n = socket.inet_aton(in_addr_t)
2578 nat_config = self.vapi.nat_show_config()
2579 self.assertEqual(1, nat_config.deterministic)
2581 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2583 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2584 self.assertEqual(rep1.out_addr[:4], out_addr_n)
2585 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2586 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2588 deterministic_mappings = self.vapi.nat_det_map_dump()
2589 self.assertEqual(len(deterministic_mappings), 1)
2590 dsm = deterministic_mappings[0]
2591 self.assertEqual(in_addr_n, dsm.in_addr[:4])
2592 self.assertEqual(in_plen, dsm.in_plen)
2593 self.assertEqual(out_addr_n, dsm.out_addr[:4])
2594 self.assertEqual(out_plen, dsm.out_plen)
2596 self.clear_nat_det()
2597 deterministic_mappings = self.vapi.nat_det_map_dump()
2598 self.assertEqual(len(deterministic_mappings), 0)
2600 def test_set_timeouts(self):
2601 """ Set deterministic NAT timeouts """
2602 timeouts_before = self.vapi.nat_det_get_timeouts()
2604 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2605 timeouts_before.tcp_established + 10,
2606 timeouts_before.tcp_transitory + 10,
2607 timeouts_before.icmp + 10)
2609 timeouts_after = self.vapi.nat_det_get_timeouts()
2611 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2612 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2613 self.assertNotEqual(timeouts_before.tcp_established,
2614 timeouts_after.tcp_established)
2615 self.assertNotEqual(timeouts_before.tcp_transitory,
2616 timeouts_after.tcp_transitory)
2618 def test_det_in(self):
2619 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2621 nat_ip = "10.0.0.10"
2623 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2625 socket.inet_aton(nat_ip),
2627 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2628 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2632 pkts = self.create_stream_in(self.pg0, self.pg1)
2633 self.pg0.add_stream(pkts)
2634 self.pg_enable_capture(self.pg_interfaces)
2636 capture = self.pg1.get_capture(len(pkts))
2637 self.verify_capture_out(capture, nat_ip)
2640 pkts = self.create_stream_out(self.pg1, nat_ip)
2641 self.pg1.add_stream(pkts)
2642 self.pg_enable_capture(self.pg_interfaces)
2644 capture = self.pg0.get_capture(len(pkts))
2645 self.verify_capture_in(capture, self.pg0)
2648 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2649 self.assertEqual(len(sessions), 3)
2653 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2654 self.assertEqual(s.in_port, self.tcp_port_in)
2655 self.assertEqual(s.out_port, self.tcp_port_out)
2656 self.assertEqual(s.ext_port, self.tcp_external_port)
2660 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2661 self.assertEqual(s.in_port, self.udp_port_in)
2662 self.assertEqual(s.out_port, self.udp_port_out)
2663 self.assertEqual(s.ext_port, self.udp_external_port)
2667 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2668 self.assertEqual(s.in_port, self.icmp_id_in)
2669 self.assertEqual(s.out_port, self.icmp_external_id)
2671 def test_multiple_users(self):
2672 """ Deterministic NAT multiple users """
2674 nat_ip = "10.0.0.10"
2676 external_port = 6303
2678 host0 = self.pg0.remote_hosts[0]
2679 host1 = self.pg0.remote_hosts[1]
2681 self.vapi.nat_det_add_del_map(host0.ip4n,
2683 socket.inet_aton(nat_ip),
2685 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2686 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2690 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2691 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2692 TCP(sport=port_in, dport=external_port))
2693 self.pg0.add_stream(p)
2694 self.pg_enable_capture(self.pg_interfaces)
2696 capture = self.pg1.get_capture(1)
2701 self.assertEqual(ip.src, nat_ip)
2702 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2703 self.assertEqual(tcp.dport, external_port)
2704 port_out0 = tcp.sport
2706 self.logger.error(ppp("Unexpected or invalid packet:", p))
2710 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2711 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2712 TCP(sport=port_in, dport=external_port))
2713 self.pg0.add_stream(p)
2714 self.pg_enable_capture(self.pg_interfaces)
2716 capture = self.pg1.get_capture(1)
2721 self.assertEqual(ip.src, nat_ip)
2722 self.assertEqual(ip.dst, self.pg1.remote_ip4)
2723 self.assertEqual(tcp.dport, external_port)
2724 port_out1 = tcp.sport
2726 self.logger.error(ppp("Unexpected or invalid packet:", p))
2729 dms = self.vapi.nat_det_map_dump()
2730 self.assertEqual(1, len(dms))
2731 self.assertEqual(2, dms[0].ses_num)
2734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2735 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2736 TCP(sport=external_port, dport=port_out0))
2737 self.pg1.add_stream(p)
2738 self.pg_enable_capture(self.pg_interfaces)
2740 capture = self.pg0.get_capture(1)
2745 self.assertEqual(ip.src, self.pg1.remote_ip4)
2746 self.assertEqual(ip.dst, host0.ip4)
2747 self.assertEqual(tcp.dport, port_in)
2748 self.assertEqual(tcp.sport, external_port)
2750 self.logger.error(ppp("Unexpected or invalid packet:", p))
2754 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2755 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2756 TCP(sport=external_port, dport=port_out1))
2757 self.pg1.add_stream(p)
2758 self.pg_enable_capture(self.pg_interfaces)
2760 capture = self.pg0.get_capture(1)
2765 self.assertEqual(ip.src, self.pg1.remote_ip4)
2766 self.assertEqual(ip.dst, host1.ip4)
2767 self.assertEqual(tcp.dport, port_in)
2768 self.assertEqual(tcp.sport, external_port)
2770 self.logger.error(ppp("Unexpected or invalid packet", p))
2773 # session close api test
2774 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2776 self.pg1.remote_ip4n,
2778 dms = self.vapi.nat_det_map_dump()
2779 self.assertEqual(dms[0].ses_num, 1)
2781 self.vapi.nat_det_close_session_in(host0.ip4n,
2783 self.pg1.remote_ip4n,
2785 dms = self.vapi.nat_det_map_dump()
2786 self.assertEqual(dms[0].ses_num, 0)
2788 def test_tcp_session_close_detection_in(self):
2789 """ Deterministic NAT TCP session close from inside network """
2790 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2792 socket.inet_aton(self.nat_addr),
2794 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2798 self.initiate_tcp_session(self.pg0, self.pg1)
2800 # close the session from inside
2802 # FIN packet in -> out
2803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2804 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2805 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2807 self.pg0.add_stream(p)
2808 self.pg_enable_capture(self.pg_interfaces)
2810 self.pg1.get_capture(1)
2814 # ACK packet out -> in
2815 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2816 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2817 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2821 # FIN packet out -> in
2822 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2823 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2824 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2828 self.pg1.add_stream(pkts)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 self.pg0.get_capture(2)
2833 # ACK packet in -> out
2834 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2835 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2836 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2838 self.pg0.add_stream(p)
2839 self.pg_enable_capture(self.pg_interfaces)
2841 self.pg1.get_capture(1)
2843 # Check if deterministic NAT44 closed the session
2844 dms = self.vapi.nat_det_map_dump()
2845 self.assertEqual(0, dms[0].ses_num)
2847 self.logger.error("TCP session termination failed")
2850 def test_tcp_session_close_detection_out(self):
2851 """ Deterministic NAT TCP session close from outside network """
2852 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2854 socket.inet_aton(self.nat_addr),
2856 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2857 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2860 self.initiate_tcp_session(self.pg0, self.pg1)
2862 # close the session from outside
2864 # FIN packet out -> in
2865 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2866 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2867 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2869 self.pg1.add_stream(p)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 self.pg0.get_capture(1)
2876 # ACK packet in -> out
2877 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2878 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2879 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2883 # ACK packet in -> out
2884 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2885 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2886 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2890 self.pg0.add_stream(pkts)
2891 self.pg_enable_capture(self.pg_interfaces)
2893 self.pg1.get_capture(2)
2895 # ACK packet out -> in
2896 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2897 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2898 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2900 self.pg1.add_stream(p)
2901 self.pg_enable_capture(self.pg_interfaces)
2903 self.pg0.get_capture(1)
2905 # Check if deterministic NAT44 closed the session
2906 dms = self.vapi.nat_det_map_dump()
2907 self.assertEqual(0, dms[0].ses_num)
2909 self.logger.error("TCP session termination failed")
2912 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2913 def test_session_timeout(self):
2914 """ Deterministic NAT session timeouts """
2915 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2917 socket.inet_aton(self.nat_addr),
2919 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2920 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2923 self.initiate_tcp_session(self.pg0, self.pg1)
2924 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2925 pkts = self.create_stream_in(self.pg0, self.pg1)
2926 self.pg0.add_stream(pkts)
2927 self.pg_enable_capture(self.pg_interfaces)
2929 capture = self.pg1.get_capture(len(pkts))
2932 dms = self.vapi.nat_det_map_dump()
2933 self.assertEqual(0, dms[0].ses_num)
2935 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2936 def test_session_limit_per_user(self):
2937 """ Deterministic NAT maximum sessions per user limit """
2938 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2940 socket.inet_aton(self.nat_addr),
2942 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2943 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2945 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2946 src_address=self.pg2.local_ip4n,
2948 template_interval=10)
2949 self.vapi.nat_ipfix()
2952 for port in range(1025, 2025):
2953 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2954 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2955 UDP(sport=port, dport=port))
2958 self.pg0.add_stream(pkts)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 capture = self.pg1.get_capture(len(pkts))
2963 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2964 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2965 UDP(sport=3001, dport=3002))
2966 self.pg0.add_stream(p)
2967 self.pg_enable_capture(self.pg_interfaces)
2969 capture = self.pg1.assert_nothing_captured()
2971 # verify ICMP error packet
2972 capture = self.pg0.get_capture(1)
2974 self.assertTrue(p.haslayer(ICMP))
2976 self.assertEqual(icmp.type, 3)
2977 self.assertEqual(icmp.code, 1)
2978 self.assertTrue(icmp.haslayer(IPerror))
2979 inner_ip = icmp[IPerror]
2980 self.assertEqual(inner_ip[UDPerror].sport, 3001)
2981 self.assertEqual(inner_ip[UDPerror].dport, 3002)
2983 dms = self.vapi.nat_det_map_dump()
2985 self.assertEqual(1000, dms[0].ses_num)
2987 # verify IPFIX logging
2988 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2990 capture = self.pg2.get_capture(2)
2991 ipfix = IPFIXDecoder()
2992 # first load template
2994 self.assertTrue(p.haslayer(IPFIX))
2995 if p.haslayer(Template):
2996 ipfix.add_template(p.getlayer(Template))
2997 # verify events in data set
2999 if p.haslayer(Data):
3000 data = ipfix.decode_data_set(p.getlayer(Set))
3001 self.verify_ipfix_max_entries_per_user(data)
3003 def clear_nat_det(self):
3005 Clear deterministic NAT configuration.
3007 self.vapi.nat_ipfix(enable=0)
3008 self.vapi.nat_det_set_timeouts()
3009 deterministic_mappings = self.vapi.nat_det_map_dump()
3010 for dsm in deterministic_mappings:
3011 self.vapi.nat_det_add_del_map(dsm.in_addr,
3017 interfaces = self.vapi.nat44_interface_dump()
3018 for intf in interfaces:
3019 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3024 super(TestDeterministicNAT, self).tearDown()
3025 if not self.vpp_dead:
3026 self.logger.info(self.vapi.cli("show nat44 detail"))
3027 self.clear_nat_det()
3030 class TestNAT64(MethodHolder):
3031 """ NAT64 Test Cases """
3034 def setUpClass(cls):
3035 super(TestNAT64, cls).setUpClass()
3038 cls.tcp_port_in = 6303
3039 cls.tcp_port_out = 6303
3040 cls.udp_port_in = 6304
3041 cls.udp_port_out = 6304
3042 cls.icmp_id_in = 6305
3043 cls.icmp_id_out = 6305
3044 cls.nat_addr = '10.0.0.3'
3045 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3047 cls.vrf1_nat_addr = '10.0.10.3'
3048 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3051 cls.create_pg_interfaces(range(3))
3052 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3053 cls.ip6_interfaces.append(cls.pg_interfaces[2])
3054 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3056 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3058 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3060 cls.pg0.generate_remote_hosts(2)
3062 for i in cls.ip6_interfaces:
3065 i.configure_ipv6_neighbors()
3067 for i in cls.ip4_interfaces:
3073 super(TestNAT64, cls).tearDownClass()
3076 def test_pool(self):
3077 """ Add/delete address to NAT64 pool """
3078 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3080 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3082 addresses = self.vapi.nat64_pool_addr_dump()
3083 self.assertEqual(len(addresses), 1)
3084 self.assertEqual(addresses[0].address, nat_addr)
3086 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3088 addresses = self.vapi.nat64_pool_addr_dump()
3089 self.assertEqual(len(addresses), 0)
3091 def test_interface(self):
3092 """ Enable/disable NAT64 feature on the interface """
3093 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3094 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3096 interfaces = self.vapi.nat64_interface_dump()
3097 self.assertEqual(len(interfaces), 2)
3100 for intf in interfaces:
3101 if intf.sw_if_index == self.pg0.sw_if_index:
3102 self.assertEqual(intf.is_inside, 1)
3104 elif intf.sw_if_index == self.pg1.sw_if_index:
3105 self.assertEqual(intf.is_inside, 0)
3107 self.assertTrue(pg0_found)
3108 self.assertTrue(pg1_found)
3110 features = self.vapi.cli("show interface features pg0")
3111 self.assertNotEqual(features.find('nat64-in2out'), -1)
3112 features = self.vapi.cli("show interface features pg1")
3113 self.assertNotEqual(features.find('nat64-out2in'), -1)
3115 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3116 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3118 interfaces = self.vapi.nat64_interface_dump()
3119 self.assertEqual(len(interfaces), 0)
3121 def test_static_bib(self):
3122 """ Add/delete static BIB entry """
3123 in_addr = socket.inet_pton(socket.AF_INET6,
3124 '2001:db8:85a3::8a2e:370:7334')
3125 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3128 proto = IP_PROTOS.tcp
3130 self.vapi.nat64_add_del_static_bib(in_addr,
3135 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3140 self.assertEqual(bibe.i_addr, in_addr)
3141 self.assertEqual(bibe.o_addr, out_addr)
3142 self.assertEqual(bibe.i_port, in_port)
3143 self.assertEqual(bibe.o_port, out_port)
3144 self.assertEqual(static_bib_num, 1)
3146 self.vapi.nat64_add_del_static_bib(in_addr,
3152 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3157 self.assertEqual(static_bib_num, 0)
3159 def test_set_timeouts(self):
3160 """ Set NAT64 timeouts """
3161 # verify default values
3162 timeouts = self.vapi.nat64_get_timeouts()
3163 self.assertEqual(timeouts.udp, 300)
3164 self.assertEqual(timeouts.icmp, 60)
3165 self.assertEqual(timeouts.tcp_trans, 240)
3166 self.assertEqual(timeouts.tcp_est, 7440)
3167 self.assertEqual(timeouts.tcp_incoming_syn, 6)
3169 # set and verify custom values
3170 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3171 tcp_est=7450, tcp_incoming_syn=10)
3172 timeouts = self.vapi.nat64_get_timeouts()
3173 self.assertEqual(timeouts.udp, 200)
3174 self.assertEqual(timeouts.icmp, 30)
3175 self.assertEqual(timeouts.tcp_trans, 250)
3176 self.assertEqual(timeouts.tcp_est, 7450)
3177 self.assertEqual(timeouts.tcp_incoming_syn, 10)
3179 def test_dynamic(self):
3180 """ NAT64 dynamic translation test """
3181 self.tcp_port_in = 6303
3182 self.udp_port_in = 6304
3183 self.icmp_id_in = 6305
3185 ses_num_start = self.nat64_get_ses_num()
3187 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3189 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3190 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3193 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3194 self.pg0.add_stream(pkts)
3195 self.pg_enable_capture(self.pg_interfaces)
3197 capture = self.pg1.get_capture(len(pkts))
3198 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3199 dst_ip=self.pg1.remote_ip4)
3202 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3203 self.pg1.add_stream(pkts)
3204 self.pg_enable_capture(self.pg_interfaces)
3206 capture = self.pg0.get_capture(len(pkts))
3207 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3208 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3211 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3212 self.pg0.add_stream(pkts)
3213 self.pg_enable_capture(self.pg_interfaces)
3215 capture = self.pg1.get_capture(len(pkts))
3216 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3217 dst_ip=self.pg1.remote_ip4)
3220 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3221 self.pg1.add_stream(pkts)
3222 self.pg_enable_capture(self.pg_interfaces)
3224 capture = self.pg0.get_capture(len(pkts))
3225 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3227 ses_num_end = self.nat64_get_ses_num()
3229 self.assertEqual(ses_num_end - ses_num_start, 3)
3231 # tenant with specific VRF
3232 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3233 self.vrf1_nat_addr_n,
3234 vrf_id=self.vrf1_id)
3235 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3237 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3238 self.pg2.add_stream(pkts)
3239 self.pg_enable_capture(self.pg_interfaces)
3241 capture = self.pg1.get_capture(len(pkts))
3242 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3243 dst_ip=self.pg1.remote_ip4)
3245 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3246 self.pg1.add_stream(pkts)
3247 self.pg_enable_capture(self.pg_interfaces)
3249 capture = self.pg2.get_capture(len(pkts))
3250 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3252 def test_static(self):
3253 """ NAT64 static translation test """
3254 self.tcp_port_in = 60303
3255 self.udp_port_in = 60304
3256 self.icmp_id_in = 60305
3257 self.tcp_port_out = 60303
3258 self.udp_port_out = 60304
3259 self.icmp_id_out = 60305
3261 ses_num_start = self.nat64_get_ses_num()
3263 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3265 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3266 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3268 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3273 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3278 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3285 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3286 self.pg0.add_stream(pkts)
3287 self.pg_enable_capture(self.pg_interfaces)
3289 capture = self.pg1.get_capture(len(pkts))
3290 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3291 dst_ip=self.pg1.remote_ip4, same_port=True)
3294 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3295 self.pg1.add_stream(pkts)
3296 self.pg_enable_capture(self.pg_interfaces)
3298 capture = self.pg0.get_capture(len(pkts))
3299 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3300 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3302 ses_num_end = self.nat64_get_ses_num()
3304 self.assertEqual(ses_num_end - ses_num_start, 3)
3306 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3307 def test_session_timeout(self):
3308 """ NAT64 session timeout """
3309 self.icmp_id_in = 1234
3310 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3312 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3313 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3314 self.vapi.nat64_set_timeouts(icmp=5)
3316 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3317 self.pg0.add_stream(pkts)
3318 self.pg_enable_capture(self.pg_interfaces)
3320 capture = self.pg1.get_capture(len(pkts))
3322 ses_num_before_timeout = self.nat64_get_ses_num()
3326 # ICMP session after timeout
3327 ses_num_after_timeout = self.nat64_get_ses_num()
3328 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3330 def test_icmp_error(self):
3331 """ NAT64 ICMP Error message translation """
3332 self.tcp_port_in = 6303
3333 self.udp_port_in = 6304
3334 self.icmp_id_in = 6305
3336 ses_num_start = self.nat64_get_ses_num()
3338 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3340 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3341 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3343 # send some packets to create sessions
3344 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3345 self.pg0.add_stream(pkts)
3346 self.pg_enable_capture(self.pg_interfaces)
3348 capture_ip4 = self.pg1.get_capture(len(pkts))
3349 self.verify_capture_out(capture_ip4,
3350 nat_ip=self.nat_addr,
3351 dst_ip=self.pg1.remote_ip4)
3353 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3354 self.pg1.add_stream(pkts)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture_ip6 = self.pg0.get_capture(len(pkts))
3358 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3359 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3360 self.pg0.remote_ip6)
3363 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3364 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3365 ICMPv6DestUnreach(code=1) /
3366 packet[IPv6] for packet in capture_ip6]
3367 self.pg0.add_stream(pkts)
3368 self.pg_enable_capture(self.pg_interfaces)
3370 capture = self.pg1.get_capture(len(pkts))
3371 for packet in capture:
3373 self.assertEqual(packet[IP].src, self.nat_addr)
3374 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3375 self.assertEqual(packet[ICMP].type, 3)
3376 self.assertEqual(packet[ICMP].code, 13)
3377 inner = packet[IPerror]
3378 self.assertEqual(inner.src, self.pg1.remote_ip4)
3379 self.assertEqual(inner.dst, self.nat_addr)
3380 self.check_icmp_checksum(packet)
3381 if inner.haslayer(TCPerror):
3382 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3383 elif inner.haslayer(UDPerror):
3384 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3386 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3388 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3392 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3393 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3394 ICMP(type=3, code=13) /
3395 packet[IP] for packet in capture_ip4]
3396 self.pg1.add_stream(pkts)
3397 self.pg_enable_capture(self.pg_interfaces)
3399 capture = self.pg0.get_capture(len(pkts))
3400 for packet in capture:
3402 self.assertEqual(packet[IPv6].src, ip.src)
3403 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3404 icmp = packet[ICMPv6DestUnreach]
3405 self.assertEqual(icmp.code, 1)
3406 inner = icmp[IPerror6]
3407 self.assertEqual(inner.src, self.pg0.remote_ip6)
3408 self.assertEqual(inner.dst, ip.src)
3409 self.check_icmpv6_checksum(packet)
3410 if inner.haslayer(TCPerror):
3411 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3412 elif inner.haslayer(UDPerror):
3413 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3415 self.assertEqual(inner[ICMPv6EchoRequest].id,
3418 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3421 def test_hairpinning(self):
3422 """ NAT64 hairpinning """
3424 client = self.pg0.remote_hosts[0]
3425 server = self.pg0.remote_hosts[1]
3426 server_tcp_in_port = 22
3427 server_tcp_out_port = 4022
3428 server_udp_in_port = 23
3429 server_udp_out_port = 4023
3430 client_tcp_in_port = 1234
3431 client_udp_in_port = 1235
3432 client_tcp_out_port = 0
3433 client_udp_out_port = 0
3434 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3435 nat_addr_ip6 = ip.src
3437 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3439 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3440 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3442 self.vapi.nat64_add_del_static_bib(server.ip6n,
3445 server_tcp_out_port,
3447 self.vapi.nat64_add_del_static_bib(server.ip6n,
3450 server_udp_out_port,
3455 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3456 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3457 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3460 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3461 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3463 self.pg0.add_stream(pkts)
3464 self.pg_enable_capture(self.pg_interfaces)
3466 capture = self.pg0.get_capture(len(pkts))
3467 for packet in capture:
3469 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3470 self.assertEqual(packet[IPv6].dst, server.ip6)
3471 if packet.haslayer(TCP):
3472 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3473 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3474 self.check_tcp_checksum(packet)
3475 client_tcp_out_port = packet[TCP].sport
3477 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3478 self.assertEqual(packet[UDP].dport, server_udp_in_port)
3479 self.check_udp_checksum(packet)
3480 client_udp_out_port = packet[UDP].sport
3482 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3487 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3488 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3489 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3491 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3492 IPv6(src=server.ip6, dst=nat_addr_ip6) /
3493 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3495 self.pg0.add_stream(pkts)
3496 self.pg_enable_capture(self.pg_interfaces)
3498 capture = self.pg0.get_capture(len(pkts))
3499 for packet in capture:
3501 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3502 self.assertEqual(packet[IPv6].dst, client.ip6)
3503 if packet.haslayer(TCP):
3504 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3505 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3506 self.check_tcp_checksum(packet)
3508 self.assertEqual(packet[UDP].sport, server_udp_out_port)
3509 self.assertEqual(packet[UDP].dport, client_udp_in_port)
3510 self.check_udp_checksum(packet)
3512 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3517 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3518 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3519 ICMPv6DestUnreach(code=1) /
3520 packet[IPv6] for packet in capture]
3521 self.pg0.add_stream(pkts)
3522 self.pg_enable_capture(self.pg_interfaces)
3524 capture = self.pg0.get_capture(len(pkts))
3525 for packet in capture:
3527 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3528 self.assertEqual(packet[IPv6].dst, server.ip6)
3529 icmp = packet[ICMPv6DestUnreach]
3530 self.assertEqual(icmp.code, 1)
3531 inner = icmp[IPerror6]
3532 self.assertEqual(inner.src, server.ip6)
3533 self.assertEqual(inner.dst, nat_addr_ip6)
3534 self.check_icmpv6_checksum(packet)
3535 if inner.haslayer(TCPerror):
3536 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3537 self.assertEqual(inner[TCPerror].dport,
3538 client_tcp_out_port)
3540 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3541 self.assertEqual(inner[UDPerror].dport,
3542 client_udp_out_port)
3544 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3547 def test_prefix(self):
3548 """ NAT64 Network-Specific Prefix """
3550 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3552 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3553 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3554 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3555 self.vrf1_nat_addr_n,
3556 vrf_id=self.vrf1_id)
3557 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3560 global_pref64 = "2001:db8::"
3561 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3562 global_pref64_len = 32
3563 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3565 prefix = self.vapi.nat64_prefix_dump()
3566 self.assertEqual(len(prefix), 1)
3567 self.assertEqual(prefix[0].prefix, global_pref64_n)
3568 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3569 self.assertEqual(prefix[0].vrf_id, 0)
3571 # Add tenant specific prefix
3572 vrf1_pref64 = "2001:db8:122:300::"
3573 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3574 vrf1_pref64_len = 56
3575 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3577 vrf_id=self.vrf1_id)
3578 prefix = self.vapi.nat64_prefix_dump()
3579 self.assertEqual(len(prefix), 2)
3582 pkts = self.create_stream_in_ip6(self.pg0,
3585 plen=global_pref64_len)
3586 self.pg0.add_stream(pkts)
3587 self.pg_enable_capture(self.pg_interfaces)
3589 capture = self.pg1.get_capture(len(pkts))
3590 self.verify_capture_out(capture, nat_ip=self.nat_addr,
3591 dst_ip=self.pg1.remote_ip4)
3593 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3594 self.pg1.add_stream(pkts)
3595 self.pg_enable_capture(self.pg_interfaces)
3597 capture = self.pg0.get_capture(len(pkts))
3598 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3601 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3603 # Tenant specific prefix
3604 pkts = self.create_stream_in_ip6(self.pg2,
3607 plen=vrf1_pref64_len)
3608 self.pg2.add_stream(pkts)
3609 self.pg_enable_capture(self.pg_interfaces)
3611 capture = self.pg1.get_capture(len(pkts))
3612 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3613 dst_ip=self.pg1.remote_ip4)
3615 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3616 self.pg1.add_stream(pkts)
3617 self.pg_enable_capture(self.pg_interfaces)
3619 capture = self.pg2.get_capture(len(pkts))
3620 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3623 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3625 def test_unknown_proto(self):
3626 """ NAT64 translate packet with unknown protocol """
3628 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3630 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3631 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3632 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3635 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3636 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3637 TCP(sport=self.tcp_port_in, dport=20))
3638 self.pg0.add_stream(p)
3639 self.pg_enable_capture(self.pg_interfaces)
3641 p = self.pg1.get_capture(1)
3643 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3644 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3646 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3647 TCP(sport=1234, dport=1234))
3648 self.pg0.add_stream(p)
3649 self.pg_enable_capture(self.pg_interfaces)
3651 p = self.pg1.get_capture(1)
3654 self.assertEqual(packet[IP].src, self.nat_addr)
3655 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3656 self.assertTrue(packet.haslayer(GRE))
3657 self.check_ip_checksum(packet)
3659 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3663 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3664 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3666 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3667 TCP(sport=1234, dport=1234))
3668 self.pg1.add_stream(p)
3669 self.pg_enable_capture(self.pg_interfaces)
3671 p = self.pg0.get_capture(1)
3674 self.assertEqual(packet[IPv6].src, remote_ip6)
3675 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3676 self.assertEqual(packet[IPv6].nh, 47)
3678 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3681 def test_hairpinning_unknown_proto(self):
3682 """ NAT64 translate packet with unknown protocol - hairpinning """
3684 client = self.pg0.remote_hosts[0]
3685 server = self.pg0.remote_hosts[1]
3686 server_tcp_in_port = 22
3687 server_tcp_out_port = 4022
3688 client_tcp_in_port = 1234
3689 client_tcp_out_port = 1235
3690 server_nat_ip = "10.0.0.100"
3691 client_nat_ip = "10.0.0.110"
3692 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3693 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3694 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3695 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3697 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3699 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3700 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3702 self.vapi.nat64_add_del_static_bib(server.ip6n,
3705 server_tcp_out_port,
3708 self.vapi.nat64_add_del_static_bib(server.ip6n,
3714 self.vapi.nat64_add_del_static_bib(client.ip6n,
3717 client_tcp_out_port,
3721 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3722 IPv6(src=client.ip6, dst=server_nat_ip6) /
3723 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3724 self.pg0.add_stream(p)
3725 self.pg_enable_capture(self.pg_interfaces)
3727 p = self.pg0.get_capture(1)
3729 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3730 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3732 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3733 TCP(sport=1234, dport=1234))
3734 self.pg0.add_stream(p)
3735 self.pg_enable_capture(self.pg_interfaces)
3737 p = self.pg0.get_capture(1)
3740 self.assertEqual(packet[IPv6].src, client_nat_ip6)
3741 self.assertEqual(packet[IPv6].dst, server.ip6)
3742 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3744 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3748 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3749 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3751 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3752 TCP(sport=1234, dport=1234))
3753 self.pg0.add_stream(p)
3754 self.pg_enable_capture(self.pg_interfaces)
3756 p = self.pg0.get_capture(1)
3759 self.assertEqual(packet[IPv6].src, server_nat_ip6)
3760 self.assertEqual(packet[IPv6].dst, client.ip6)
3761 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3763 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3766 def nat64_get_ses_num(self):
3768 Return number of active NAT64 sessions.
3770 st = self.vapi.nat64_st_dump()
3773 def clear_nat64(self):
3775 Clear NAT64 configuration.
3777 self.vapi.nat64_set_timeouts()
3779 interfaces = self.vapi.nat64_interface_dump()
3780 for intf in interfaces:
3781 self.vapi.nat64_add_del_interface(intf.sw_if_index,
3785 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3788 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3796 bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3799 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3807 bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3810 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3818 adresses = self.vapi.nat64_pool_addr_dump()
3819 for addr in adresses:
3820 self.vapi.nat64_add_del_pool_addr_range(addr.address,
3825 prefixes = self.vapi.nat64_prefix_dump()
3826 for prefix in prefixes:
3827 self.vapi.nat64_add_del_prefix(prefix.prefix,
3829 vrf_id=prefix.vrf_id,
3833 super(TestNAT64, self).tearDown()
3834 if not self.vpp_dead:
3835 self.logger.info(self.vapi.cli("show nat64 pool"))
3836 self.logger.info(self.vapi.cli("show nat64 interfaces"))
3837 self.logger.info(self.vapi.cli("show nat64 prefix"))
3838 self.logger.info(self.vapi.cli("show nat64 bib all"))
3839 self.logger.info(self.vapi.cli("show nat64 session table all"))
3842 if __name__ == '__main__':
3843 unittest.main(testRunner=VppTestRunner)