9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
112 twice_nat=lb_sm.twice_nat,
113 self_twice_nat=lb_sm.self_twice_nat,
114 out2in_only=lb_sm.out2in_only,
120 identity_mappings = self.vapi.nat44_identity_mapping_dump()
121 for id_m in identity_mappings:
122 self.vapi.nat44_add_del_identity_mapping(
123 addr_only=id_m.addr_only,
126 sw_if_index=id_m.sw_if_index,
128 protocol=id_m.protocol,
131 adresses = self.vapi.nat44_address_dump()
132 for addr in adresses:
133 self.vapi.nat44_add_del_address_range(addr.ip_address,
135 twice_nat=addr.twice_nat,
138 self.vapi.nat_set_reass()
139 self.vapi.nat_set_reass(is_ip6=1)
141 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
142 local_port=0, external_port=0, vrf_id=0,
143 is_add=1, external_sw_if_index=0xFFFFFFFF,
144 proto=0, twice_nat=0, self_twice_nat=0,
145 out2in_only=0, tag=""):
147 Add/delete NAT44 static mapping
149 :param local_ip: Local IP address
150 :param external_ip: External IP address
151 :param local_port: Local port number (Optional)
152 :param external_port: External port number (Optional)
153 :param vrf_id: VRF ID (Default 0)
154 :param is_add: 1 if add, 0 if delete (Default add)
155 :param external_sw_if_index: External interface instead of IP address
156 :param proto: IP protocol (Mandatory if port specified)
157 :param twice_nat: 1 if translate external host address and port
158 :param self_twice_nat: 1 if translate external host address and port
159 whenever external host address equals
160 local address of internal host
161 :param out2in_only: if 1 rule is matching only out2in direction
162 :param tag: Opaque string tag
165 if local_port and external_port:
167 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
168 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
169 self.vapi.nat44_add_del_static_mapping(
172 external_sw_if_index,
184 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
186 Add/delete NAT44 address
188 :param ip: IP address
189 :param is_add: 1 if add, 0 if delete (Default add)
190 :param twice_nat: twice NAT address for extenal hosts
192 nat_addr = socket.inet_pton(socket.AF_INET, ip)
193 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
197 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
199 Create packet stream for inside network
201 :param in_if: Inside interface
202 :param out_if: Outside interface
203 :param dst_ip: Destination address
204 :param ttl: TTL of generated packets
207 dst_ip = out_if.remote_ip4
211 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
212 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 TCP(sport=self.tcp_port_in, dport=20))
217 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
218 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 UDP(sport=self.udp_port_in, dport=20))
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
225 ICMP(id=self.icmp_id_in, type='echo-request'))
230 def compose_ip6(self, ip4, pref, plen):
232 Compose IPv4-embedded IPv6 addresses
234 :param ip4: IPv4 address
235 :param pref: IPv6 prefix
236 :param plen: IPv6 prefix length
237 :returns: IPv4-embedded IPv6 addresses
239 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
240 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
255 pref_n[10] = ip4_n[3]
259 pref_n[10] = ip4_n[2]
260 pref_n[11] = ip4_n[3]
263 pref_n[10] = ip4_n[1]
264 pref_n[11] = ip4_n[2]
265 pref_n[12] = ip4_n[3]
267 pref_n[12] = ip4_n[0]
268 pref_n[13] = ip4_n[1]
269 pref_n[14] = ip4_n[2]
270 pref_n[15] = ip4_n[3]
271 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
273 def extract_ip4(self, ip6, plen):
275 Extract IPv4 address embedded in IPv6 addresses
277 :param ip6: IPv6 address
278 :param plen: IPv6 prefix length
279 :returns: extracted IPv4 address
281 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
313 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
315 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
317 Create IPv6 packet stream for inside network
319 :param in_if: Inside interface
320 :param out_if: Outside interface
321 :param ttl: Hop Limit of generated packets
322 :param pref: NAT64 prefix
323 :param plen: NAT64 prefix length
327 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
329 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
333 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
334 TCP(sport=self.tcp_port_in, dport=20))
338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
339 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
340 UDP(sport=self.udp_port_in, dport=20))
344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
345 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
346 ICMPv6EchoRequest(id=self.icmp_id_in))
351 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
352 use_inside_ports=False):
354 Create packet stream for outside network
356 :param out_if: Outside interface
357 :param dst_ip: Destination IP address (Default use global NAT address)
358 :param ttl: TTL of generated packets
359 :param use_inside_ports: Use inside NAT ports as destination ports
360 instead of outside ports
363 dst_ip = self.nat_addr
364 if not use_inside_ports:
365 tcp_port = self.tcp_port_out
366 udp_port = self.udp_port_out
367 icmp_id = self.icmp_id_out
369 tcp_port = self.tcp_port_in
370 udp_port = self.udp_port_in
371 icmp_id = self.icmp_id_in
374 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
375 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
376 TCP(dport=tcp_port, sport=20))
380 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
381 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
382 UDP(dport=udp_port, sport=20))
386 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
387 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
388 ICMP(id=icmp_id, type='echo-reply'))
393 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
395 Create packet stream for outside network
397 :param out_if: Outside interface
398 :param dst_ip: Destination IP address (Default use global NAT address)
399 :param hl: HL of generated packets
403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
404 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
405 TCP(dport=self.tcp_port_out, sport=20))
409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
410 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
411 UDP(dport=self.udp_port_out, sport=20))
415 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
416 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
417 ICMPv6EchoReply(id=self.icmp_id_out))
422 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
423 packet_num=3, dst_ip=None, is_ip6=False):
425 Verify captured packets on outside network
427 :param capture: Captured packets
428 :param nat_ip: Translated IP address (Default use global NAT address)
429 :param same_port: Sorce port number is not translated (Default False)
430 :param packet_num: Expected number of packets (Default 3)
431 :param dst_ip: Destination IP address (Default do not verify)
432 :param is_ip6: If L3 protocol is IPv6 (Default False)
436 ICMP46 = ICMPv6EchoRequest
441 nat_ip = self.nat_addr
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
478 packet_num=3, dst_ip=None):
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param packet_num: Expected number of packets (Default 3)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
491 def verify_capture_in(self, capture, in_if, packet_num=3):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
497 :param packet_num: Expected number of packets (Default 3)
499 self.assertEqual(packet_num, len(capture))
500 for packet in capture:
502 self.assert_packet_checksums_valid(packet)
503 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
504 if packet.haslayer(TCP):
505 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
506 elif packet.haslayer(UDP):
507 self.assertEqual(packet[UDP].dport, self.udp_port_in)
509 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511 self.logger.error(ppp("Unexpected or invalid packet "
512 "(inside network):", packet))
515 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
517 Verify captured IPv6 packets on inside network
519 :param capture: Captured packets
520 :param src_ip: Source IP
521 :param dst_ip: Destination IP address
522 :param packet_num: Expected number of packets (Default 3)
524 self.assertEqual(packet_num, len(capture))
525 for packet in capture:
527 self.assertEqual(packet[IPv6].src, src_ip)
528 self.assertEqual(packet[IPv6].dst, dst_ip)
529 self.assert_packet_checksums_valid(packet)
530 if packet.haslayer(TCP):
531 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
532 elif packet.haslayer(UDP):
533 self.assertEqual(packet[UDP].dport, self.udp_port_in)
535 self.assertEqual(packet[ICMPv6EchoReply].id,
538 self.logger.error(ppp("Unexpected or invalid packet "
539 "(inside network):", packet))
542 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
544 Verify captured packet that don't have to be translated
546 :param capture: Captured packets
547 :param ingress_if: Ingress interface
548 :param egress_if: Egress interface
550 for packet in capture:
552 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
553 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
554 if packet.haslayer(TCP):
555 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
556 elif packet.haslayer(UDP):
557 self.assertEqual(packet[UDP].sport, self.udp_port_in)
559 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
561 self.logger.error(ppp("Unexpected or invalid packet "
562 "(inside network):", packet))
565 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
566 packet_num=3, icmp_type=11):
568 Verify captured packets with ICMP errors on outside network
570 :param capture: Captured packets
571 :param src_ip: Translated IP address or IP address of VPP
572 (Default use global NAT address)
573 :param packet_num: Expected number of packets (Default 3)
574 :param icmp_type: Type of error ICMP packet
575 we are expecting (Default 11)
578 src_ip = self.nat_addr
579 self.assertEqual(packet_num, len(capture))
580 for packet in capture:
582 self.assertEqual(packet[IP].src, src_ip)
583 self.assertTrue(packet.haslayer(ICMP))
585 self.assertEqual(icmp.type, icmp_type)
586 self.assertTrue(icmp.haslayer(IPerror))
587 inner_ip = icmp[IPerror]
588 if inner_ip.haslayer(TCPerror):
589 self.assertEqual(inner_ip[TCPerror].dport,
591 elif inner_ip.haslayer(UDPerror):
592 self.assertEqual(inner_ip[UDPerror].dport,
595 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
597 self.logger.error(ppp("Unexpected or invalid packet "
598 "(outside network):", packet))
601 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
604 Verify captured packets with ICMP errors on inside network
606 :param capture: Captured packets
607 :param in_if: Inside interface
608 :param packet_num: Expected number of packets (Default 3)
609 :param icmp_type: Type of error ICMP packet
610 we are expecting (Default 11)
612 self.assertEqual(packet_num, len(capture))
613 for packet in capture:
615 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
616 self.assertTrue(packet.haslayer(ICMP))
618 self.assertEqual(icmp.type, icmp_type)
619 self.assertTrue(icmp.haslayer(IPerror))
620 inner_ip = icmp[IPerror]
621 if inner_ip.haslayer(TCPerror):
622 self.assertEqual(inner_ip[TCPerror].sport,
624 elif inner_ip.haslayer(UDPerror):
625 self.assertEqual(inner_ip[UDPerror].sport,
628 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
630 self.logger.error(ppp("Unexpected or invalid packet "
631 "(inside network):", packet))
634 def create_stream_frag(self, src_if, dst, sport, dport, data):
636 Create fragmented packet stream
638 :param src_if: Source interface
639 :param dst: Destination IPv4 address
640 :param sport: Source TCP port
641 :param dport: Destination TCP port
642 :param data: Payload data
645 id = random.randint(0, 65535)
646 p = (IP(src=src_if.remote_ip4, dst=dst) /
647 TCP(sport=sport, dport=dport) /
649 p = p.__class__(str(p))
650 chksum = p['TCP'].chksum
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
654 TCP(sport=sport, dport=dport, chksum=chksum) /
657 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
658 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
659 proto=IP_PROTOS.tcp) /
662 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
663 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
669 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
670 pref=None, plen=0, frag_size=128):
672 Create fragmented packet stream
674 :param src_if: Source interface
675 :param dst: Destination IPv4 address
676 :param sport: Source TCP port
677 :param dport: Destination TCP port
678 :param data: Payload data
679 :param pref: NAT64 prefix
680 :param plen: NAT64 prefix length
681 :param fragsize: size of fragments
685 dst_ip6 = ''.join(['64:ff9b::', dst])
687 dst_ip6 = self.compose_ip6(dst, pref, plen)
689 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
690 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
691 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
692 TCP(sport=sport, dport=dport) /
695 return fragment6(p, frag_size)
697 def reass_frags_and_verify(self, frags, src, dst):
699 Reassemble and verify fragmented packet
701 :param frags: Captured fragments
702 :param src: Source IPv4 address to verify
703 :param dst: Destination IPv4 address to verify
705 :returns: Reassembled IPv4 packet
707 buffer = StringIO.StringIO()
709 self.assertEqual(p[IP].src, src)
710 self.assertEqual(p[IP].dst, dst)
711 self.assert_ip_checksum_valid(p)
712 buffer.seek(p[IP].frag * 8)
713 buffer.write(p[IP].payload)
714 ip = frags[0].getlayer(IP)
715 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
716 proto=frags[0][IP].proto)
717 if ip.proto == IP_PROTOS.tcp:
718 p = (ip / TCP(buffer.getvalue()))
719 self.assert_tcp_checksum_valid(p)
720 elif ip.proto == IP_PROTOS.udp:
721 p = (ip / UDP(buffer.getvalue()))
724 def reass_frags_and_verify_ip6(self, frags, src, dst):
726 Reassemble and verify fragmented packet
728 :param frags: Captured fragments
729 :param src: Source IPv6 address to verify
730 :param dst: Destination IPv6 address to verify
732 :returns: Reassembled IPv6 packet
734 buffer = StringIO.StringIO()
736 self.assertEqual(p[IPv6].src, src)
737 self.assertEqual(p[IPv6].dst, dst)
738 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
739 buffer.write(p[IPv6ExtHdrFragment].payload)
740 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
741 nh=frags[0][IPv6ExtHdrFragment].nh)
742 if ip.nh == IP_PROTOS.tcp:
743 p = (ip / TCP(buffer.getvalue()))
744 elif ip.nh == IP_PROTOS.udp:
745 p = (ip / UDP(buffer.getvalue()))
746 self.assert_packet_checksums_valid(p)
749 def initiate_tcp_session(self, in_if, out_if):
751 Initiates TCP session
753 :param in_if: Inside interface
754 :param out_if: Outside interface
758 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
759 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
760 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
763 self.pg_enable_capture(self.pg_interfaces)
765 capture = out_if.get_capture(1)
767 self.tcp_port_out = p[TCP].sport
769 # SYN + ACK packet out->in
770 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
771 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
772 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
775 self.pg_enable_capture(self.pg_interfaces)
780 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
782 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
785 self.pg_enable_capture(self.pg_interfaces)
787 out_if.get_capture(1)
790 self.logger.error("TCP 3 way handshake failed")
793 def verify_ipfix_nat44_ses(self, data):
795 Verify IPFIX NAT44 session create/delete event
797 :param data: Decoded IPFIX data records
799 nat44_ses_create_num = 0
800 nat44_ses_delete_num = 0
801 self.assertEqual(6, len(data))
804 self.assertIn(ord(record[230]), [4, 5])
805 if ord(record[230]) == 4:
806 nat44_ses_create_num += 1
808 nat44_ses_delete_num += 1
810 self.assertEqual(self.pg0.remote_ip4n, record[8])
811 # postNATSourceIPv4Address
812 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
815 self.assertEqual(struct.pack("!I", 0), record[234])
816 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
817 if IP_PROTOS.icmp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
819 self.assertEqual(struct.pack("!H", self.icmp_id_out),
821 elif IP_PROTOS.tcp == ord(record[4]):
822 self.assertEqual(struct.pack("!H", self.tcp_port_in),
824 self.assertEqual(struct.pack("!H", self.tcp_port_out),
826 elif IP_PROTOS.udp == ord(record[4]):
827 self.assertEqual(struct.pack("!H", self.udp_port_in),
829 self.assertEqual(struct.pack("!H", self.udp_port_out),
832 self.fail("Invalid protocol")
833 self.assertEqual(3, nat44_ses_create_num)
834 self.assertEqual(3, nat44_ses_delete_num)
836 def verify_ipfix_addr_exhausted(self, data):
838 Verify IPFIX NAT addresses event
840 :param data: Decoded IPFIX data records
842 self.assertEqual(1, len(data))
845 self.assertEqual(ord(record[230]), 3)
847 self.assertEqual(struct.pack("!I", 0), record[283])
849 def verify_ipfix_max_sessions(self, data, limit):
851 Verify IPFIX maximum session entries exceeded event
853 :param data: Decoded IPFIX data records
854 :param limit: Number of maximum session entries that can be created.
856 self.assertEqual(1, len(data))
859 self.assertEqual(ord(record[230]), 13)
860 # natQuotaExceededEvent
861 self.assertEqual(struct.pack("I", 1), record[466])
863 self.assertEqual(struct.pack("I", limit), record[471])
865 def verify_ipfix_max_bibs(self, data, limit):
867 Verify IPFIX maximum BIB entries exceeded event
869 :param data: Decoded IPFIX data records
870 :param limit: Number of maximum BIB entries that can be created.
872 self.assertEqual(1, len(data))
875 self.assertEqual(ord(record[230]), 13)
876 # natQuotaExceededEvent
877 self.assertEqual(struct.pack("I", 2), record[466])
879 self.assertEqual(struct.pack("I", limit), record[472])
881 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
883 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
885 :param data: Decoded IPFIX data records
886 :param limit: Number of maximum fragments pending reassembly
887 :param src_addr: IPv6 source address
889 self.assertEqual(1, len(data))
892 self.assertEqual(ord(record[230]), 13)
893 # natQuotaExceededEvent
894 self.assertEqual(struct.pack("I", 5), record[466])
895 # maxFragmentsPendingReassembly
896 self.assertEqual(struct.pack("I", limit), record[475])
898 self.assertEqual(src_addr, record[27])
900 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
902 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
904 :param data: Decoded IPFIX data records
905 :param limit: Number of maximum fragments pending reassembly
906 :param src_addr: IPv4 source address
908 self.assertEqual(1, len(data))
911 self.assertEqual(ord(record[230]), 13)
912 # natQuotaExceededEvent
913 self.assertEqual(struct.pack("I", 5), record[466])
914 # maxFragmentsPendingReassembly
915 self.assertEqual(struct.pack("I", limit), record[475])
917 self.assertEqual(src_addr, record[8])
919 def verify_ipfix_bib(self, data, is_create, src_addr):
921 Verify IPFIX NAT64 BIB create and delete events
923 :param data: Decoded IPFIX data records
924 :param is_create: Create event if nonzero value otherwise delete event
925 :param src_addr: IPv6 source address
927 self.assertEqual(1, len(data))
931 self.assertEqual(ord(record[230]), 10)
933 self.assertEqual(ord(record[230]), 11)
935 self.assertEqual(src_addr, record[27])
936 # postNATSourceIPv4Address
937 self.assertEqual(self.nat_addr_n, record[225])
939 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
941 self.assertEqual(struct.pack("!I", 0), record[234])
942 # sourceTransportPort
943 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
944 # postNAPTSourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
947 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
950 Verify IPFIX NAT64 session create and delete events
952 :param data: Decoded IPFIX data records
953 :param is_create: Create event if nonzero value otherwise delete event
954 :param src_addr: IPv6 source address
955 :param dst_addr: IPv4 destination address
956 :param dst_port: destination TCP port
958 self.assertEqual(1, len(data))
962 self.assertEqual(ord(record[230]), 6)
964 self.assertEqual(ord(record[230]), 7)
966 self.assertEqual(src_addr, record[27])
967 # destinationIPv6Address
968 self.assertEqual(socket.inet_pton(socket.AF_INET6,
969 self.compose_ip6(dst_addr,
973 # postNATSourceIPv4Address
974 self.assertEqual(self.nat_addr_n, record[225])
975 # postNATDestinationIPv4Address
976 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
979 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
981 self.assertEqual(struct.pack("!I", 0), record[234])
982 # sourceTransportPort
983 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
984 # postNAPTSourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
986 # destinationTransportPort
987 self.assertEqual(struct.pack("!H", dst_port), record[11])
988 # postNAPTDestinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[228])
992 class TestNAT44(MethodHolder):
993 """ NAT44 Test Cases """
997 super(TestNAT44, cls).setUpClass()
998 cls.vapi.cli("set log class nat level debug")
1001 cls.tcp_port_in = 6303
1002 cls.tcp_port_out = 6303
1003 cls.udp_port_in = 6304
1004 cls.udp_port_out = 6304
1005 cls.icmp_id_in = 6305
1006 cls.icmp_id_out = 6305
1007 cls.nat_addr = '10.0.0.3'
1008 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1009 cls.ipfix_src_port = 4739
1010 cls.ipfix_domain_id = 1
1011 cls.tcp_external_port = 80
1013 cls.create_pg_interfaces(range(10))
1014 cls.interfaces = list(cls.pg_interfaces[0:4])
1016 for i in cls.interfaces:
1021 cls.pg0.generate_remote_hosts(3)
1022 cls.pg0.configure_ipv4_neighbors()
1024 cls.pg1.generate_remote_hosts(1)
1025 cls.pg1.configure_ipv4_neighbors()
1027 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1028 cls.vapi.ip_table_add_del(10, is_add=1)
1029 cls.vapi.ip_table_add_del(20, is_add=1)
1031 cls.pg4._local_ip4 = "172.16.255.1"
1032 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1033 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1034 cls.pg4.set_table_ip4(10)
1035 cls.pg5._local_ip4 = "172.17.255.3"
1036 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1037 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1038 cls.pg5.set_table_ip4(10)
1039 cls.pg6._local_ip4 = "172.16.255.1"
1040 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1041 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1042 cls.pg6.set_table_ip4(20)
1043 for i in cls.overlapping_interfaces:
1051 cls.pg9.generate_remote_hosts(2)
1052 cls.pg9.config_ip4()
1053 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1054 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1058 cls.pg9.resolve_arp()
1059 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1060 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1061 cls.pg9.resolve_arp()
1064 super(TestNAT44, cls).tearDownClass()
1067 def test_dynamic(self):
1068 """ NAT44 dynamic translation test """
1070 self.nat44_add_address(self.nat_addr)
1071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1076 pkts = self.create_stream_in(self.pg0, self.pg1)
1077 self.pg0.add_stream(pkts)
1078 self.pg_enable_capture(self.pg_interfaces)
1080 capture = self.pg1.get_capture(len(pkts))
1081 self.verify_capture_out(capture)
1084 pkts = self.create_stream_out(self.pg1)
1085 self.pg1.add_stream(pkts)
1086 self.pg_enable_capture(self.pg_interfaces)
1088 capture = self.pg0.get_capture(len(pkts))
1089 self.verify_capture_in(capture, self.pg0)
1091 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1092 """ NAT44 handling of client packets with TTL=1 """
1094 self.nat44_add_address(self.nat_addr)
1095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1099 # Client side - generate traffic
1100 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1101 self.pg0.add_stream(pkts)
1102 self.pg_enable_capture(self.pg_interfaces)
1105 # Client side - verify ICMP type 11 packets
1106 capture = self.pg0.get_capture(len(pkts))
1107 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1109 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1110 """ NAT44 handling of server packets with TTL=1 """
1112 self.nat44_add_address(self.nat_addr)
1113 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1114 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1117 # Client side - create sessions
1118 pkts = self.create_stream_in(self.pg0, self.pg1)
1119 self.pg0.add_stream(pkts)
1120 self.pg_enable_capture(self.pg_interfaces)
1123 # Server side - generate traffic
1124 capture = self.pg1.get_capture(len(pkts))
1125 self.verify_capture_out(capture)
1126 pkts = self.create_stream_out(self.pg1, ttl=1)
1127 self.pg1.add_stream(pkts)
1128 self.pg_enable_capture(self.pg_interfaces)
1131 # Server side - verify ICMP type 11 packets
1132 capture = self.pg1.get_capture(len(pkts))
1133 self.verify_capture_out_with_icmp_errors(capture,
1134 src_ip=self.pg1.local_ip4)
1136 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1137 """ NAT44 handling of error responses to client packets with TTL=2 """
1139 self.nat44_add_address(self.nat_addr)
1140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1144 # Client side - generate traffic
1145 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1146 self.pg0.add_stream(pkts)
1147 self.pg_enable_capture(self.pg_interfaces)
1150 # Server side - simulate ICMP type 11 response
1151 capture = self.pg1.get_capture(len(pkts))
1152 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1153 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1154 ICMP(type=11) / packet[IP] for packet in capture]
1155 self.pg1.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1159 # Client side - verify ICMP type 11 packets
1160 capture = self.pg0.get_capture(len(pkts))
1161 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1164 """ NAT44 handling of error responses to server packets with TTL=2 """
1166 self.nat44_add_address(self.nat_addr)
1167 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1171 # Client side - create sessions
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1177 # Server side - generate traffic
1178 capture = self.pg1.get_capture(len(pkts))
1179 self.verify_capture_out(capture)
1180 pkts = self.create_stream_out(self.pg1, ttl=2)
1181 self.pg1.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1185 # Client side - simulate ICMP type 11 response
1186 capture = self.pg0.get_capture(len(pkts))
1187 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1188 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1189 ICMP(type=11) / packet[IP] for packet in capture]
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1194 # Server side - verify ICMP type 11 packets
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_out_with_icmp_errors(capture)
1198 def test_ping_out_interface_from_outside(self):
1199 """ Ping NAT44 out interface from outside network """
1201 self.nat44_add_address(self.nat_addr)
1202 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1203 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1206 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1207 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1208 ICMP(id=self.icmp_id_out, type='echo-request'))
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1213 capture = self.pg1.get_capture(len(pkts))
1214 self.assertEqual(1, len(capture))
1217 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1218 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1219 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1220 self.assertEqual(packet[ICMP].type, 0) # echo reply
1222 self.logger.error(ppp("Unexpected or invalid packet "
1223 "(outside network):", packet))
1226 def test_ping_internal_host_from_outside(self):
1227 """ Ping internal host from outside network """
1229 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1230 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1231 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1235 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1236 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1237 ICMP(id=self.icmp_id_out, type='echo-request'))
1238 self.pg1.add_stream(pkt)
1239 self.pg_enable_capture(self.pg_interfaces)
1241 capture = self.pg0.get_capture(1)
1242 self.verify_capture_in(capture, self.pg0, packet_num=1)
1243 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1246 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1247 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1248 ICMP(id=self.icmp_id_in, type='echo-reply'))
1249 self.pg0.add_stream(pkt)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg1.get_capture(1)
1253 self.verify_capture_out(capture, same_port=True, packet_num=1)
1254 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1256 def _test_forwarding(self):
1257 """ NAT44 forwarding test """
1259 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1260 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1262 self.vapi.nat44_forwarding_enable_disable(1)
1264 real_ip = self.pg0.remote_ip4n
1265 alias_ip = self.nat_addr_n
1266 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1267 external_ip=alias_ip)
1270 # in2out - static mapping match
1272 pkts = self.create_stream_out(self.pg1)
1273 self.pg1.add_stream(pkts)
1274 self.pg_enable_capture(self.pg_interfaces)
1276 capture = self.pg0.get_capture(len(pkts))
1277 self.verify_capture_in(capture, self.pg0)
1279 pkts = self.create_stream_in(self.pg0, self.pg1)
1280 self.pg0.add_stream(pkts)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 capture = self.pg1.get_capture(len(pkts))
1284 self.verify_capture_out(capture, same_port=True)
1286 # in2out - no static mapping match
1288 host0 = self.pg0.remote_hosts[0]
1289 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1291 pkts = self.create_stream_out(self.pg1,
1292 dst_ip=self.pg0.remote_ip4,
1293 use_inside_ports=True)
1294 self.pg1.add_stream(pkts)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg0.get_capture(len(pkts))
1298 self.verify_capture_in(capture, self.pg0)
1300 pkts = self.create_stream_in(self.pg0, self.pg1)
1301 self.pg0.add_stream(pkts)
1302 self.pg_enable_capture(self.pg_interfaces)
1304 capture = self.pg1.get_capture(len(pkts))
1305 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1308 self.pg0.remote_hosts[0] = host0
1310 user = self.pg0.remote_hosts[1]
1311 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1312 self.assertEqual(len(sessions), 3)
1313 self.assertTrue(sessions[0].ext_host_valid)
1314 self.vapi.nat44_del_session(
1315 sessions[0].inside_ip_address,
1316 sessions[0].inside_port,
1317 sessions[0].protocol,
1318 ext_host_address=sessions[0].ext_host_address,
1319 ext_host_port=sessions[0].ext_host_port)
1320 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1321 self.assertEqual(len(sessions), 2)
1324 self.vapi.nat44_forwarding_enable_disable(0)
1325 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1326 external_ip=alias_ip,
1329 def test_static_in(self):
1330 """ 1:1 NAT initialized from inside network """
1332 nat_ip = "10.0.0.10"
1333 self.tcp_port_out = 6303
1334 self.udp_port_out = 6304
1335 self.icmp_id_out = 6305
1337 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1338 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1339 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1341 sm = self.vapi.nat44_static_mapping_dump()
1342 self.assertEqual(len(sm), 1)
1343 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1344 self.assertEqual(sm[0].protocol, 0)
1345 self.assertEqual(sm[0].local_port, 0)
1346 self.assertEqual(sm[0].external_port, 0)
1349 pkts = self.create_stream_in(self.pg0, self.pg1)
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(capture, nat_ip, True)
1357 pkts = self.create_stream_out(self.pg1, nat_ip)
1358 self.pg1.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 self.verify_capture_in(capture, self.pg0)
1364 def test_static_out(self):
1365 """ 1:1 NAT initialized from outside network """
1367 nat_ip = "10.0.0.20"
1368 self.tcp_port_out = 6303
1369 self.udp_port_out = 6304
1370 self.icmp_id_out = 6305
1373 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1374 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1375 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1377 sm = self.vapi.nat44_static_mapping_dump()
1378 self.assertEqual(len(sm), 1)
1379 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1382 pkts = self.create_stream_out(self.pg1, nat_ip)
1383 self.pg1.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg0.get_capture(len(pkts))
1387 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, nat_ip, True)
1397 def test_static_with_port_in(self):
1398 """ 1:1 NAPT initialized from inside network """
1400 self.tcp_port_out = 3606
1401 self.udp_port_out = 3607
1402 self.icmp_id_out = 3608
1404 self.nat44_add_address(self.nat_addr)
1405 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1406 self.tcp_port_in, self.tcp_port_out,
1407 proto=IP_PROTOS.tcp)
1408 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1409 self.udp_port_in, self.udp_port_out,
1410 proto=IP_PROTOS.udp)
1411 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1412 self.icmp_id_in, self.icmp_id_out,
1413 proto=IP_PROTOS.icmp)
1414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1415 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 pkts = self.create_stream_in(self.pg0, self.pg1)
1420 self.pg0.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg1.get_capture(len(pkts))
1424 self.verify_capture_out(capture)
1427 pkts = self.create_stream_out(self.pg1)
1428 self.pg1.add_stream(pkts)
1429 self.pg_enable_capture(self.pg_interfaces)
1431 capture = self.pg0.get_capture(len(pkts))
1432 self.verify_capture_in(capture, self.pg0)
1434 def test_static_with_port_out(self):
1435 """ 1:1 NAPT initialized from outside network """
1437 self.tcp_port_out = 30606
1438 self.udp_port_out = 30607
1439 self.icmp_id_out = 30608
1441 self.nat44_add_address(self.nat_addr)
1442 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1443 self.tcp_port_in, self.tcp_port_out,
1444 proto=IP_PROTOS.tcp)
1445 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1446 self.udp_port_in, self.udp_port_out,
1447 proto=IP_PROTOS.udp)
1448 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1449 self.icmp_id_in, self.icmp_id_out,
1450 proto=IP_PROTOS.icmp)
1451 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1452 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1456 pkts = self.create_stream_out(self.pg1)
1457 self.pg1.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg0.get_capture(len(pkts))
1461 self.verify_capture_in(capture, self.pg0)
1464 pkts = self.create_stream_in(self.pg0, self.pg1)
1465 self.pg0.add_stream(pkts)
1466 self.pg_enable_capture(self.pg_interfaces)
1468 capture = self.pg1.get_capture(len(pkts))
1469 self.verify_capture_out(capture)
1471 def test_static_vrf_aware(self):
1472 """ 1:1 NAT VRF awareness """
1474 nat_ip1 = "10.0.0.30"
1475 nat_ip2 = "10.0.0.40"
1476 self.tcp_port_out = 6303
1477 self.udp_port_out = 6304
1478 self.icmp_id_out = 6305
1480 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1482 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1484 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1486 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1487 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1489 # inside interface VRF match NAT44 static mapping VRF
1490 pkts = self.create_stream_in(self.pg4, self.pg3)
1491 self.pg4.add_stream(pkts)
1492 self.pg_enable_capture(self.pg_interfaces)
1494 capture = self.pg3.get_capture(len(pkts))
1495 self.verify_capture_out(capture, nat_ip1, True)
1497 # inside interface VRF don't match NAT44 static mapping VRF (packets
1499 pkts = self.create_stream_in(self.pg0, self.pg3)
1500 self.pg0.add_stream(pkts)
1501 self.pg_enable_capture(self.pg_interfaces)
1503 self.pg3.assert_nothing_captured()
1505 def test_dynamic_to_static(self):
1506 """ Switch from dynamic translation to 1:1NAT """
1507 nat_ip = "10.0.0.10"
1508 self.tcp_port_out = 6303
1509 self.udp_port_out = 6304
1510 self.icmp_id_out = 6305
1512 self.nat44_add_address(self.nat_addr)
1513 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1514 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1518 pkts = self.create_stream_in(self.pg0, self.pg1)
1519 self.pg0.add_stream(pkts)
1520 self.pg_enable_capture(self.pg_interfaces)
1522 capture = self.pg1.get_capture(len(pkts))
1523 self.verify_capture_out(capture)
1526 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1527 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1528 self.assertEqual(len(sessions), 0)
1529 pkts = self.create_stream_in(self.pg0, self.pg1)
1530 self.pg0.add_stream(pkts)
1531 self.pg_enable_capture(self.pg_interfaces)
1533 capture = self.pg1.get_capture(len(pkts))
1534 self.verify_capture_out(capture, nat_ip, True)
1536 def test_identity_nat(self):
1537 """ Identity NAT """
1539 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1544 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1545 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1546 TCP(sport=12345, dport=56789))
1547 self.pg1.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg0.get_capture(1)
1555 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1556 self.assertEqual(ip.src, self.pg1.remote_ip4)
1557 self.assertEqual(tcp.dport, 56789)
1558 self.assertEqual(tcp.sport, 12345)
1559 self.assert_packet_checksums_valid(p)
1561 self.logger.error(ppp("Unexpected or invalid packet:", p))
1564 def test_multiple_inside_interfaces(self):
1565 """ NAT44 multiple non-overlapping address space inside interfaces """
1567 self.nat44_add_address(self.nat_addr)
1568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1570 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1573 # between two NAT44 inside interfaces (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg1)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg1.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1581 # from NAT44 inside to interface without NAT44 feature (no translation)
1582 pkts = self.create_stream_in(self.pg0, self.pg2)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg2.get_capture(len(pkts))
1587 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1589 # in2out 1st interface
1590 pkts = self.create_stream_in(self.pg0, self.pg3)
1591 self.pg0.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg3.get_capture(len(pkts))
1595 self.verify_capture_out(capture)
1597 # out2in 1st interface
1598 pkts = self.create_stream_out(self.pg3)
1599 self.pg3.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg0.get_capture(len(pkts))
1603 self.verify_capture_in(capture, self.pg0)
1605 # in2out 2nd interface
1606 pkts = self.create_stream_in(self.pg1, self.pg3)
1607 self.pg1.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg3.get_capture(len(pkts))
1611 self.verify_capture_out(capture)
1613 # out2in 2nd interface
1614 pkts = self.create_stream_out(self.pg3)
1615 self.pg3.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg1.get_capture(len(pkts))
1619 self.verify_capture_in(capture, self.pg1)
1621 def test_inside_overlapping_interfaces(self):
1622 """ NAT44 multiple inside interfaces with overlapping address space """
1624 static_nat_ip = "10.0.0.10"
1625 self.nat44_add_address(self.nat_addr)
1626 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1628 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1630 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1631 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1634 # between NAT44 inside interfaces with same VRF (no translation)
1635 pkts = self.create_stream_in(self.pg4, self.pg5)
1636 self.pg4.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg5.get_capture(len(pkts))
1640 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1642 # between NAT44 inside interfaces with different VRF (hairpinning)
1643 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1644 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1645 TCP(sport=1234, dport=5678))
1646 self.pg4.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg6.get_capture(1)
1654 self.assertEqual(ip.src, self.nat_addr)
1655 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1656 self.assertNotEqual(tcp.sport, 1234)
1657 self.assertEqual(tcp.dport, 5678)
1659 self.logger.error(ppp("Unexpected or invalid packet:", p))
1662 # in2out 1st interface
1663 pkts = self.create_stream_in(self.pg4, self.pg3)
1664 self.pg4.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg3.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1670 # out2in 1st interface
1671 pkts = self.create_stream_out(self.pg3)
1672 self.pg3.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg4.get_capture(len(pkts))
1676 self.verify_capture_in(capture, self.pg4)
1678 # in2out 2nd interface
1679 pkts = self.create_stream_in(self.pg5, self.pg3)
1680 self.pg5.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg3.get_capture(len(pkts))
1684 self.verify_capture_out(capture)
1686 # out2in 2nd interface
1687 pkts = self.create_stream_out(self.pg3)
1688 self.pg3.add_stream(pkts)
1689 self.pg_enable_capture(self.pg_interfaces)
1691 capture = self.pg5.get_capture(len(pkts))
1692 self.verify_capture_in(capture, self.pg5)
1695 addresses = self.vapi.nat44_address_dump()
1696 self.assertEqual(len(addresses), 1)
1697 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1698 self.assertEqual(len(sessions), 3)
1699 for session in sessions:
1700 self.assertFalse(session.is_static)
1701 self.assertEqual(session.inside_ip_address[0:4],
1702 self.pg5.remote_ip4n)
1703 self.assertEqual(session.outside_ip_address,
1704 addresses[0].ip_address)
1705 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1706 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1707 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1708 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1709 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1710 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1711 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1712 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1713 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1715 # in2out 3rd interface
1716 pkts = self.create_stream_in(self.pg6, self.pg3)
1717 self.pg6.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg3.get_capture(len(pkts))
1721 self.verify_capture_out(capture, static_nat_ip, True)
1723 # out2in 3rd interface
1724 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1725 self.pg3.add_stream(pkts)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg6.get_capture(len(pkts))
1729 self.verify_capture_in(capture, self.pg6)
1731 # general user and session dump verifications
1732 users = self.vapi.nat44_user_dump()
1733 self.assertTrue(len(users) >= 3)
1734 addresses = self.vapi.nat44_address_dump()
1735 self.assertEqual(len(addresses), 1)
1737 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1739 for session in sessions:
1740 self.assertEqual(user.ip_address, session.inside_ip_address)
1741 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1742 self.assertTrue(session.protocol in
1743 [IP_PROTOS.tcp, IP_PROTOS.udp,
1745 self.assertFalse(session.ext_host_valid)
1748 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1749 self.assertTrue(len(sessions) >= 4)
1750 for session in sessions:
1751 self.assertFalse(session.is_static)
1752 self.assertEqual(session.inside_ip_address[0:4],
1753 self.pg4.remote_ip4n)
1754 self.assertEqual(session.outside_ip_address,
1755 addresses[0].ip_address)
1758 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1759 self.assertTrue(len(sessions) >= 3)
1760 for session in sessions:
1761 self.assertTrue(session.is_static)
1762 self.assertEqual(session.inside_ip_address[0:4],
1763 self.pg6.remote_ip4n)
1764 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1765 map(int, static_nat_ip.split('.')))
1766 self.assertTrue(session.inside_port in
1767 [self.tcp_port_in, self.udp_port_in,
1770 def test_hairpinning(self):
1771 """ NAT44 hairpinning - 1:1 NAPT """
1773 host = self.pg0.remote_hosts[0]
1774 server = self.pg0.remote_hosts[1]
1777 server_in_port = 5678
1778 server_out_port = 8765
1780 self.nat44_add_address(self.nat_addr)
1781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1784 # add static mapping for server
1785 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1786 server_in_port, server_out_port,
1787 proto=IP_PROTOS.tcp)
1789 # send packet from host to server
1790 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1791 IP(src=host.ip4, dst=self.nat_addr) /
1792 TCP(sport=host_in_port, dport=server_out_port))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg0.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(ip.dst, server.ip4)
1803 self.assertNotEqual(tcp.sport, host_in_port)
1804 self.assertEqual(tcp.dport, server_in_port)
1805 self.assert_packet_checksums_valid(p)
1806 host_out_port = tcp.sport
1808 self.logger.error(ppp("Unexpected or invalid packet:", p))
1811 # send reply from server to host
1812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813 IP(src=server.ip4, dst=self.nat_addr) /
1814 TCP(sport=server_in_port, dport=host_out_port))
1815 self.pg0.add_stream(p)
1816 self.pg_enable_capture(self.pg_interfaces)
1818 capture = self.pg0.get_capture(1)
1823 self.assertEqual(ip.src, self.nat_addr)
1824 self.assertEqual(ip.dst, host.ip4)
1825 self.assertEqual(tcp.sport, server_out_port)
1826 self.assertEqual(tcp.dport, host_in_port)
1827 self.assert_packet_checksums_valid(p)
1829 self.logger.error(ppp("Unexpected or invalid packet:", p))
1832 def test_hairpinning2(self):
1833 """ NAT44 hairpinning - 1:1 NAT"""
1835 server1_nat_ip = "10.0.0.10"
1836 server2_nat_ip = "10.0.0.11"
1837 host = self.pg0.remote_hosts[0]
1838 server1 = self.pg0.remote_hosts[1]
1839 server2 = self.pg0.remote_hosts[2]
1840 server_tcp_port = 22
1841 server_udp_port = 20
1843 self.nat44_add_address(self.nat_addr)
1844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1848 # add static mapping for servers
1849 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1850 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1858 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1859 IP(src=host.ip4, dst=server1_nat_ip) /
1860 UDP(sport=self.udp_port_in, dport=server_udp_port))
1862 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863 IP(src=host.ip4, dst=server1_nat_ip) /
1864 ICMP(id=self.icmp_id_in, type='echo-request'))
1866 self.pg0.add_stream(pkts)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 capture = self.pg0.get_capture(len(pkts))
1870 for packet in capture:
1872 self.assertEqual(packet[IP].src, self.nat_addr)
1873 self.assertEqual(packet[IP].dst, server1.ip4)
1874 if packet.haslayer(TCP):
1875 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1876 self.assertEqual(packet[TCP].dport, server_tcp_port)
1877 self.tcp_port_out = packet[TCP].sport
1878 self.assert_packet_checksums_valid(packet)
1879 elif packet.haslayer(UDP):
1880 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1881 self.assertEqual(packet[UDP].dport, server_udp_port)
1882 self.udp_port_out = packet[UDP].sport
1884 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1885 self.icmp_id_out = packet[ICMP].id
1887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=server1.ip4, dst=self.nat_addr) /
1898 UDP(sport=server_udp_port, dport=self.udp_port_out))
1900 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1901 IP(src=server1.ip4, dst=self.nat_addr) /
1902 ICMP(id=self.icmp_id_out, type='echo-reply'))
1904 self.pg0.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg0.get_capture(len(pkts))
1908 for packet in capture:
1910 self.assertEqual(packet[IP].src, server1_nat_ip)
1911 self.assertEqual(packet[IP].dst, host.ip4)
1912 if packet.haslayer(TCP):
1913 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1914 self.assertEqual(packet[TCP].sport, server_tcp_port)
1915 self.assert_packet_checksums_valid(packet)
1916 elif packet.haslayer(UDP):
1917 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1918 self.assertEqual(packet[UDP].sport, server_udp_port)
1920 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1922 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1925 # server2 to server1
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1931 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1932 IP(src=server2.ip4, dst=server1_nat_ip) /
1933 UDP(sport=self.udp_port_in, dport=server_udp_port))
1935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1936 IP(src=server2.ip4, dst=server1_nat_ip) /
1937 ICMP(id=self.icmp_id_in, type='echo-request'))
1939 self.pg0.add_stream(pkts)
1940 self.pg_enable_capture(self.pg_interfaces)
1942 capture = self.pg0.get_capture(len(pkts))
1943 for packet in capture:
1945 self.assertEqual(packet[IP].src, server2_nat_ip)
1946 self.assertEqual(packet[IP].dst, server1.ip4)
1947 if packet.haslayer(TCP):
1948 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1949 self.assertEqual(packet[TCP].dport, server_tcp_port)
1950 self.tcp_port_out = packet[TCP].sport
1951 self.assert_packet_checksums_valid(packet)
1952 elif packet.haslayer(UDP):
1953 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1954 self.assertEqual(packet[UDP].dport, server_udp_port)
1955 self.udp_port_out = packet[UDP].sport
1957 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1958 self.icmp_id_out = packet[ICMP].id
1960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1963 # server1 to server2
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1969 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1970 IP(src=server1.ip4, dst=server2_nat_ip) /
1971 UDP(sport=server_udp_port, dport=self.udp_port_out))
1973 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1974 IP(src=server1.ip4, dst=server2_nat_ip) /
1975 ICMP(id=self.icmp_id_out, type='echo-reply'))
1977 self.pg0.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 for packet in capture:
1983 self.assertEqual(packet[IP].src, server1_nat_ip)
1984 self.assertEqual(packet[IP].dst, server2.ip4)
1985 if packet.haslayer(TCP):
1986 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1987 self.assertEqual(packet[TCP].sport, server_tcp_port)
1988 self.assert_packet_checksums_valid(packet)
1989 elif packet.haslayer(UDP):
1990 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1991 self.assertEqual(packet[UDP].sport, server_udp_port)
1993 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1995 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1998 def test_max_translations_per_user(self):
1999 """ MAX translations per user - recycle the least recently used """
2001 self.nat44_add_address(self.nat_addr)
2002 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2003 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2006 # get maximum number of translations per user
2007 nat44_config = self.vapi.nat_show_config()
2009 # send more than maximum number of translations per user packets
2010 pkts_num = nat44_config.max_translations_per_user + 5
2012 for port in range(0, pkts_num):
2013 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2015 TCP(sport=1025 + port))
2017 self.pg0.add_stream(pkts)
2018 self.pg_enable_capture(self.pg_interfaces)
2021 # verify number of translated packet
2022 self.pg1.get_capture(pkts_num)
2024 users = self.vapi.nat44_user_dump()
2026 if user.ip_address == self.pg0.remote_ip4n:
2027 self.assertEqual(user.nsessions,
2028 nat44_config.max_translations_per_user)
2029 self.assertEqual(user.nstaticsessions, 0)
2032 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2034 proto=IP_PROTOS.tcp)
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037 TCP(sport=tcp_port))
2038 self.pg0.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 self.pg1.get_capture(1)
2042 users = self.vapi.nat44_user_dump()
2044 if user.ip_address == self.pg0.remote_ip4n:
2045 self.assertEqual(user.nsessions,
2046 nat44_config.max_translations_per_user - 1)
2047 self.assertEqual(user.nstaticsessions, 1)
2049 def test_interface_addr(self):
2050 """ Acquire NAT44 addresses from interface """
2051 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2053 # no address in NAT pool
2054 adresses = self.vapi.nat44_address_dump()
2055 self.assertEqual(0, len(adresses))
2057 # configure interface address and check NAT address pool
2058 self.pg7.config_ip4()
2059 adresses = self.vapi.nat44_address_dump()
2060 self.assertEqual(1, len(adresses))
2061 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2063 # remove interface address and check NAT address pool
2064 self.pg7.unconfig_ip4()
2065 adresses = self.vapi.nat44_address_dump()
2066 self.assertEqual(0, len(adresses))
2068 def test_interface_addr_static_mapping(self):
2069 """ Static mapping with addresses from interface """
2072 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2073 self.nat44_add_static_mapping(
2075 external_sw_if_index=self.pg7.sw_if_index,
2078 # static mappings with external interface
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(1, len(static_mappings))
2081 self.assertEqual(self.pg7.sw_if_index,
2082 static_mappings[0].external_sw_if_index)
2083 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2085 # configure interface address and check static mappings
2086 self.pg7.config_ip4()
2087 static_mappings = self.vapi.nat44_static_mapping_dump()
2088 self.assertEqual(2, len(static_mappings))
2090 for sm in static_mappings:
2091 if sm.external_sw_if_index == 0xFFFFFFFF:
2092 self.assertEqual(sm.external_ip_address[0:4],
2093 self.pg7.local_ip4n)
2094 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2096 self.assertTrue(resolved)
2098 # remove interface address and check static mappings
2099 self.pg7.unconfig_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(1, len(static_mappings))
2102 self.assertEqual(self.pg7.sw_if_index,
2103 static_mappings[0].external_sw_if_index)
2104 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2106 # configure interface address again and check static mappings
2107 self.pg7.config_ip4()
2108 static_mappings = self.vapi.nat44_static_mapping_dump()
2109 self.assertEqual(2, len(static_mappings))
2111 for sm in static_mappings:
2112 if sm.external_sw_if_index == 0xFFFFFFFF:
2113 self.assertEqual(sm.external_ip_address[0:4],
2114 self.pg7.local_ip4n)
2115 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2117 self.assertTrue(resolved)
2119 # remove static mapping
2120 self.nat44_add_static_mapping(
2122 external_sw_if_index=self.pg7.sw_if_index,
2125 static_mappings = self.vapi.nat44_static_mapping_dump()
2126 self.assertEqual(0, len(static_mappings))
2128 def test_interface_addr_identity_nat(self):
2129 """ Identity NAT with addresses from interface """
2132 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2133 self.vapi.nat44_add_del_identity_mapping(
2134 sw_if_index=self.pg7.sw_if_index,
2136 protocol=IP_PROTOS.tcp,
2139 # identity mappings with external interface
2140 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(1, len(identity_mappings))
2142 self.assertEqual(self.pg7.sw_if_index,
2143 identity_mappings[0].sw_if_index)
2145 # configure interface address and check identity mappings
2146 self.pg7.config_ip4()
2147 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149 self.assertEqual(2, len(identity_mappings))
2150 for sm in identity_mappings:
2151 if sm.sw_if_index == 0xFFFFFFFF:
2152 self.assertEqual(identity_mappings[0].ip_address,
2153 self.pg7.local_ip4n)
2154 self.assertEqual(port, identity_mappings[0].port)
2155 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2157 self.assertTrue(resolved)
2159 # remove interface address and check identity mappings
2160 self.pg7.unconfig_ip4()
2161 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2162 self.assertEqual(1, len(identity_mappings))
2163 self.assertEqual(self.pg7.sw_if_index,
2164 identity_mappings[0].sw_if_index)
2166 def test_ipfix_nat44_sess(self):
2167 """ IPFIX logging NAT44 session created/delted """
2168 self.ipfix_domain_id = 10
2169 self.ipfix_src_port = 20202
2170 colector_port = 30303
2171 bind_layers(UDP, IPFIX, dport=30303)
2172 self.nat44_add_address(self.nat_addr)
2173 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2174 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2176 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2177 src_address=self.pg3.local_ip4n,
2179 template_interval=10,
2180 collector_port=colector_port)
2181 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2182 src_port=self.ipfix_src_port)
2184 pkts = self.create_stream_in(self.pg0, self.pg1)
2185 self.pg0.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg1.get_capture(len(pkts))
2189 self.verify_capture_out(capture)
2190 self.nat44_add_address(self.nat_addr, is_add=0)
2191 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2192 capture = self.pg3.get_capture(9)
2193 ipfix = IPFIXDecoder()
2194 # first load template
2196 self.assertTrue(p.haslayer(IPFIX))
2197 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2198 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2199 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2200 self.assertEqual(p[UDP].dport, colector_port)
2201 self.assertEqual(p[IPFIX].observationDomainID,
2202 self.ipfix_domain_id)
2203 if p.haslayer(Template):
2204 ipfix.add_template(p.getlayer(Template))
2205 # verify events in data set
2207 if p.haslayer(Data):
2208 data = ipfix.decode_data_set(p.getlayer(Set))
2209 self.verify_ipfix_nat44_ses(data)
2211 def test_ipfix_addr_exhausted(self):
2212 """ IPFIX logging NAT addresses exhausted """
2213 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2214 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2216 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2217 src_address=self.pg3.local_ip4n,
2219 template_interval=10)
2220 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2221 src_port=self.ipfix_src_port)
2223 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2224 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2226 self.pg0.add_stream(p)
2227 self.pg_enable_capture(self.pg_interfaces)
2229 self.pg1.assert_nothing_captured()
2231 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2232 capture = self.pg3.get_capture(9)
2233 ipfix = IPFIXDecoder()
2234 # first load template
2236 self.assertTrue(p.haslayer(IPFIX))
2237 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2238 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2239 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2240 self.assertEqual(p[UDP].dport, 4739)
2241 self.assertEqual(p[IPFIX].observationDomainID,
2242 self.ipfix_domain_id)
2243 if p.haslayer(Template):
2244 ipfix.add_template(p.getlayer(Template))
2245 # verify events in data set
2247 if p.haslayer(Data):
2248 data = ipfix.decode_data_set(p.getlayer(Set))
2249 self.verify_ipfix_addr_exhausted(data)
2251 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2252 def test_ipfix_max_sessions(self):
2253 """ IPFIX logging maximum session entries exceeded """
2254 self.nat44_add_address(self.nat_addr)
2255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2259 nat44_config = self.vapi.nat_show_config()
2260 max_sessions = 10 * nat44_config.translation_buckets
2263 for i in range(0, max_sessions):
2264 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=src, dst=self.pg1.remote_ip4) /
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2273 self.pg1.get_capture(max_sessions)
2274 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2275 src_address=self.pg3.local_ip4n,
2277 template_interval=10)
2278 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2279 src_port=self.ipfix_src_port)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2284 self.pg0.add_stream(p)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 self.pg1.assert_nothing_captured()
2289 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2290 capture = self.pg3.get_capture(9)
2291 ipfix = IPFIXDecoder()
2292 # first load template
2294 self.assertTrue(p.haslayer(IPFIX))
2295 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2296 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2297 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2298 self.assertEqual(p[UDP].dport, 4739)
2299 self.assertEqual(p[IPFIX].observationDomainID,
2300 self.ipfix_domain_id)
2301 if p.haslayer(Template):
2302 ipfix.add_template(p.getlayer(Template))
2303 # verify events in data set
2305 if p.haslayer(Data):
2306 data = ipfix.decode_data_set(p.getlayer(Set))
2307 self.verify_ipfix_max_sessions(data, max_sessions)
2309 def test_pool_addr_fib(self):
2310 """ NAT44 add pool addresses to FIB """
2311 static_addr = '10.0.0.10'
2312 self.nat44_add_address(self.nat_addr)
2313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2316 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2319 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2320 ARP(op=ARP.who_has, pdst=self.nat_addr,
2321 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2322 self.pg1.add_stream(p)
2323 self.pg_enable_capture(self.pg_interfaces)
2325 capture = self.pg1.get_capture(1)
2326 self.assertTrue(capture[0].haslayer(ARP))
2327 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2330 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2331 ARP(op=ARP.who_has, pdst=static_addr,
2332 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2333 self.pg1.add_stream(p)
2334 self.pg_enable_capture(self.pg_interfaces)
2336 capture = self.pg1.get_capture(1)
2337 self.assertTrue(capture[0].haslayer(ARP))
2338 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2340 # send ARP to non-NAT44 interface
2341 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2342 ARP(op=ARP.who_has, pdst=self.nat_addr,
2343 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2344 self.pg2.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.assert_nothing_captured()
2349 # remove addresses and verify
2350 self.nat44_add_address(self.nat_addr, is_add=0)
2351 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=self.nat_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2363 ARP(op=ARP.who_has, pdst=static_addr,
2364 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2365 self.pg1.add_stream(p)
2366 self.pg_enable_capture(self.pg_interfaces)
2368 self.pg1.assert_nothing_captured()
2370 def test_vrf_mode(self):
2371 """ NAT44 tenant VRF aware address pool mode """
2375 nat_ip1 = "10.0.0.10"
2376 nat_ip2 = "10.0.0.11"
2378 self.pg0.unconfig_ip4()
2379 self.pg1.unconfig_ip4()
2380 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2381 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2382 self.pg0.set_table_ip4(vrf_id1)
2383 self.pg1.set_table_ip4(vrf_id2)
2384 self.pg0.config_ip4()
2385 self.pg1.config_ip4()
2387 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2388 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2389 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2390 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2391 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2395 pkts = self.create_stream_in(self.pg0, self.pg2)
2396 self.pg0.add_stream(pkts)
2397 self.pg_enable_capture(self.pg_interfaces)
2399 capture = self.pg2.get_capture(len(pkts))
2400 self.verify_capture_out(capture, nat_ip1)
2403 pkts = self.create_stream_in(self.pg1, self.pg2)
2404 self.pg1.add_stream(pkts)
2405 self.pg_enable_capture(self.pg_interfaces)
2407 capture = self.pg2.get_capture(len(pkts))
2408 self.verify_capture_out(capture, nat_ip2)
2410 self.pg0.unconfig_ip4()
2411 self.pg1.unconfig_ip4()
2412 self.pg0.set_table_ip4(0)
2413 self.pg1.set_table_ip4(0)
2414 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2415 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2417 def test_vrf_feature_independent(self):
2418 """ NAT44 tenant VRF independent address pool mode """
2420 nat_ip1 = "10.0.0.10"
2421 nat_ip2 = "10.0.0.11"
2423 self.nat44_add_address(nat_ip1)
2424 self.nat44_add_address(nat_ip2, vrf_id=99)
2425 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2426 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2427 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2431 pkts = self.create_stream_in(self.pg0, self.pg2)
2432 self.pg0.add_stream(pkts)
2433 self.pg_enable_capture(self.pg_interfaces)
2435 capture = self.pg2.get_capture(len(pkts))
2436 self.verify_capture_out(capture, nat_ip1)
2439 pkts = self.create_stream_in(self.pg1, self.pg2)
2440 self.pg1.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2446 def test_dynamic_ipless_interfaces(self):
2447 """ NAT44 interfaces without configured IP address """
2449 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2450 mactobinary(self.pg7.remote_mac),
2451 self.pg7.remote_ip4n,
2453 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2454 mactobinary(self.pg8.remote_mac),
2455 self.pg8.remote_ip4n,
2458 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2459 dst_address_length=32,
2460 next_hop_address=self.pg7.remote_ip4n,
2461 next_hop_sw_if_index=self.pg7.sw_if_index)
2462 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2463 dst_address_length=32,
2464 next_hop_address=self.pg8.remote_ip4n,
2465 next_hop_sw_if_index=self.pg8.sw_if_index)
2467 self.nat44_add_address(self.nat_addr)
2468 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2469 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2473 pkts = self.create_stream_in(self.pg7, self.pg8)
2474 self.pg7.add_stream(pkts)
2475 self.pg_enable_capture(self.pg_interfaces)
2477 capture = self.pg8.get_capture(len(pkts))
2478 self.verify_capture_out(capture)
2481 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2482 self.pg8.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg7.get_capture(len(pkts))
2486 self.verify_capture_in(capture, self.pg7)
2488 def test_static_ipless_interfaces(self):
2489 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2491 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2492 mactobinary(self.pg7.remote_mac),
2493 self.pg7.remote_ip4n,
2495 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2496 mactobinary(self.pg8.remote_mac),
2497 self.pg8.remote_ip4n,
2500 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2501 dst_address_length=32,
2502 next_hop_address=self.pg7.remote_ip4n,
2503 next_hop_sw_if_index=self.pg7.sw_if_index)
2504 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2505 dst_address_length=32,
2506 next_hop_address=self.pg8.remote_ip4n,
2507 next_hop_sw_if_index=self.pg8.sw_if_index)
2509 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2510 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2511 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2515 pkts = self.create_stream_out(self.pg8)
2516 self.pg8.add_stream(pkts)
2517 self.pg_enable_capture(self.pg_interfaces)
2519 capture = self.pg7.get_capture(len(pkts))
2520 self.verify_capture_in(capture, self.pg7)
2523 pkts = self.create_stream_in(self.pg7, self.pg8)
2524 self.pg7.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg8.get_capture(len(pkts))
2528 self.verify_capture_out(capture, self.nat_addr, True)
2530 def test_static_with_port_ipless_interfaces(self):
2531 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2533 self.tcp_port_out = 30606
2534 self.udp_port_out = 30607
2535 self.icmp_id_out = 30608
2537 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2538 mactobinary(self.pg7.remote_mac),
2539 self.pg7.remote_ip4n,
2541 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2542 mactobinary(self.pg8.remote_mac),
2543 self.pg8.remote_ip4n,
2546 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2547 dst_address_length=32,
2548 next_hop_address=self.pg7.remote_ip4n,
2549 next_hop_sw_if_index=self.pg7.sw_if_index)
2550 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2551 dst_address_length=32,
2552 next_hop_address=self.pg8.remote_ip4n,
2553 next_hop_sw_if_index=self.pg8.sw_if_index)
2555 self.nat44_add_address(self.nat_addr)
2556 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2557 self.tcp_port_in, self.tcp_port_out,
2558 proto=IP_PROTOS.tcp)
2559 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2560 self.udp_port_in, self.udp_port_out,
2561 proto=IP_PROTOS.udp)
2562 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2563 self.icmp_id_in, self.icmp_id_out,
2564 proto=IP_PROTOS.icmp)
2565 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2566 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2570 pkts = self.create_stream_out(self.pg8)
2571 self.pg8.add_stream(pkts)
2572 self.pg_enable_capture(self.pg_interfaces)
2574 capture = self.pg7.get_capture(len(pkts))
2575 self.verify_capture_in(capture, self.pg7)
2578 pkts = self.create_stream_in(self.pg7, self.pg8)
2579 self.pg7.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg8.get_capture(len(pkts))
2583 self.verify_capture_out(capture)
2585 def test_static_unknown_proto(self):
2586 """ 1:1 NAT translate packet with unknown protocol """
2587 nat_ip = "10.0.0.10"
2588 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2589 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2590 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2594 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2595 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2597 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2598 TCP(sport=1234, dport=1234))
2599 self.pg0.add_stream(p)
2600 self.pg_enable_capture(self.pg_interfaces)
2602 p = self.pg1.get_capture(1)
2605 self.assertEqual(packet[IP].src, nat_ip)
2606 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2607 self.assertTrue(packet.haslayer(GRE))
2608 self.assert_packet_checksums_valid(packet)
2610 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2614 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2615 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2617 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2618 TCP(sport=1234, dport=1234))
2619 self.pg1.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 p = self.pg0.get_capture(1)
2625 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2626 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2627 self.assertTrue(packet.haslayer(GRE))
2628 self.assert_packet_checksums_valid(packet)
2630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2633 def test_hairpinning_static_unknown_proto(self):
2634 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2636 host = self.pg0.remote_hosts[0]
2637 server = self.pg0.remote_hosts[1]
2639 host_nat_ip = "10.0.0.10"
2640 server_nat_ip = "10.0.0.11"
2642 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2643 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2649 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2650 IP(src=host.ip4, dst=server_nat_ip) /
2652 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2653 TCP(sport=1234, dport=1234))
2654 self.pg0.add_stream(p)
2655 self.pg_enable_capture(self.pg_interfaces)
2657 p = self.pg0.get_capture(1)
2660 self.assertEqual(packet[IP].src, host_nat_ip)
2661 self.assertEqual(packet[IP].dst, server.ip4)
2662 self.assertTrue(packet.haslayer(GRE))
2663 self.assert_packet_checksums_valid(packet)
2665 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2669 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2670 IP(src=server.ip4, dst=host_nat_ip) /
2672 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2673 TCP(sport=1234, dport=1234))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 p = self.pg0.get_capture(1)
2680 self.assertEqual(packet[IP].src, server_nat_ip)
2681 self.assertEqual(packet[IP].dst, host.ip4)
2682 self.assertTrue(packet.haslayer(GRE))
2683 self.assert_packet_checksums_valid(packet)
2685 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2688 def test_output_feature(self):
2689 """ NAT44 interface output feature (in2out postrouting) """
2690 self.nat44_add_address(self.nat_addr)
2691 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2692 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2693 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2697 pkts = self.create_stream_in(self.pg0, self.pg3)
2698 self.pg0.add_stream(pkts)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg3.get_capture(len(pkts))
2702 self.verify_capture_out(capture)
2705 pkts = self.create_stream_out(self.pg3)
2706 self.pg3.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg0.get_capture(len(pkts))
2710 self.verify_capture_in(capture, self.pg0)
2712 # from non-NAT interface to NAT inside interface
2713 pkts = self.create_stream_in(self.pg2, self.pg0)
2714 self.pg2.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2720 def test_output_feature_vrf_aware(self):
2721 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2722 nat_ip_vrf10 = "10.0.0.10"
2723 nat_ip_vrf20 = "10.0.0.20"
2725 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2726 dst_address_length=32,
2727 next_hop_address=self.pg3.remote_ip4n,
2728 next_hop_sw_if_index=self.pg3.sw_if_index,
2730 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2731 dst_address_length=32,
2732 next_hop_address=self.pg3.remote_ip4n,
2733 next_hop_sw_if_index=self.pg3.sw_if_index,
2736 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2737 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2738 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2739 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2740 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2744 pkts = self.create_stream_in(self.pg4, self.pg3)
2745 self.pg4.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg3.get_capture(len(pkts))
2749 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2752 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2753 self.pg3.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg4.get_capture(len(pkts))
2757 self.verify_capture_in(capture, self.pg4)
2760 pkts = self.create_stream_in(self.pg6, self.pg3)
2761 self.pg6.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg3.get_capture(len(pkts))
2765 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2768 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2769 self.pg3.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg6.get_capture(len(pkts))
2773 self.verify_capture_in(capture, self.pg6)
2775 def test_output_feature_hairpinning(self):
2776 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2777 host = self.pg0.remote_hosts[0]
2778 server = self.pg0.remote_hosts[1]
2781 server_in_port = 5678
2782 server_out_port = 8765
2784 self.nat44_add_address(self.nat_addr)
2785 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2786 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2789 # add static mapping for server
2790 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2791 server_in_port, server_out_port,
2792 proto=IP_PROTOS.tcp)
2794 # send packet from host to server
2795 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2796 IP(src=host.ip4, dst=self.nat_addr) /
2797 TCP(sport=host_in_port, dport=server_out_port))
2798 self.pg0.add_stream(p)
2799 self.pg_enable_capture(self.pg_interfaces)
2801 capture = self.pg0.get_capture(1)
2806 self.assertEqual(ip.src, self.nat_addr)
2807 self.assertEqual(ip.dst, server.ip4)
2808 self.assertNotEqual(tcp.sport, host_in_port)
2809 self.assertEqual(tcp.dport, server_in_port)
2810 self.assert_packet_checksums_valid(p)
2811 host_out_port = tcp.sport
2813 self.logger.error(ppp("Unexpected or invalid packet:", p))
2816 # send reply from server to host
2817 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2818 IP(src=server.ip4, dst=self.nat_addr) /
2819 TCP(sport=server_in_port, dport=host_out_port))
2820 self.pg0.add_stream(p)
2821 self.pg_enable_capture(self.pg_interfaces)
2823 capture = self.pg0.get_capture(1)
2828 self.assertEqual(ip.src, self.nat_addr)
2829 self.assertEqual(ip.dst, host.ip4)
2830 self.assertEqual(tcp.sport, server_out_port)
2831 self.assertEqual(tcp.dport, host_in_port)
2832 self.assert_packet_checksums_valid(p)
2834 self.logger.error(ppp("Unexpected or invalid packet:", p))
2837 def test_one_armed_nat44(self):
2838 """ One armed NAT44 """
2839 remote_host = self.pg9.remote_hosts[0]
2840 local_host = self.pg9.remote_hosts[1]
2843 self.nat44_add_address(self.nat_addr)
2844 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2845 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2849 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2850 IP(src=local_host.ip4, dst=remote_host.ip4) /
2851 TCP(sport=12345, dport=80))
2852 self.pg9.add_stream(p)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg9.get_capture(1)
2860 self.assertEqual(ip.src, self.nat_addr)
2861 self.assertEqual(ip.dst, remote_host.ip4)
2862 self.assertNotEqual(tcp.sport, 12345)
2863 external_port = tcp.sport
2864 self.assertEqual(tcp.dport, 80)
2865 self.assert_packet_checksums_valid(p)
2867 self.logger.error(ppp("Unexpected or invalid packet:", p))
2871 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2872 IP(src=remote_host.ip4, dst=self.nat_addr) /
2873 TCP(sport=80, dport=external_port))
2874 self.pg9.add_stream(p)
2875 self.pg_enable_capture(self.pg_interfaces)
2877 capture = self.pg9.get_capture(1)
2882 self.assertEqual(ip.src, remote_host.ip4)
2883 self.assertEqual(ip.dst, local_host.ip4)
2884 self.assertEqual(tcp.sport, 80)
2885 self.assertEqual(tcp.dport, 12345)
2886 self.assert_packet_checksums_valid(p)
2888 self.logger.error(ppp("Unexpected or invalid packet:", p))
2891 def test_del_session(self):
2892 """ Delete NAT44 session """
2893 self.nat44_add_address(self.nat_addr)
2894 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2895 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2898 pkts = self.create_stream_in(self.pg0, self.pg1)
2899 self.pg0.add_stream(pkts)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 self.pg1.get_capture(len(pkts))
2904 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2905 nsessions = len(sessions)
2907 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2908 sessions[0].inside_port,
2909 sessions[0].protocol)
2910 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2911 sessions[1].outside_port,
2912 sessions[1].protocol,
2915 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2916 self.assertEqual(nsessions - len(sessions), 2)
2918 def test_set_get_reass(self):
2919 """ NAT44 set/get virtual fragmentation reassembly """
2920 reas_cfg1 = self.vapi.nat_get_reass()
2922 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2923 max_reass=reas_cfg1.ip4_max_reass * 2,
2924 max_frag=reas_cfg1.ip4_max_frag * 2)
2926 reas_cfg2 = self.vapi.nat_get_reass()
2928 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2929 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2930 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2932 self.vapi.nat_set_reass(drop_frag=1)
2933 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2935 def test_frag_in_order(self):
2936 """ NAT44 translate fragments arriving in order """
2937 self.nat44_add_address(self.nat_addr)
2938 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2939 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2942 data = "A" * 4 + "B" * 16 + "C" * 3
2943 self.tcp_port_in = random.randint(1025, 65535)
2945 reass = self.vapi.nat_reass_dump()
2946 reass_n_start = len(reass)
2949 pkts = self.create_stream_frag(self.pg0,
2950 self.pg1.remote_ip4,
2954 self.pg0.add_stream(pkts)
2955 self.pg_enable_capture(self.pg_interfaces)
2957 frags = self.pg1.get_capture(len(pkts))
2958 p = self.reass_frags_and_verify(frags,
2960 self.pg1.remote_ip4)
2961 self.assertEqual(p[TCP].dport, 20)
2962 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2963 self.tcp_port_out = p[TCP].sport
2964 self.assertEqual(data, p[Raw].load)
2967 pkts = self.create_stream_frag(self.pg1,
2972 self.pg1.add_stream(pkts)
2973 self.pg_enable_capture(self.pg_interfaces)
2975 frags = self.pg0.get_capture(len(pkts))
2976 p = self.reass_frags_and_verify(frags,
2977 self.pg1.remote_ip4,
2978 self.pg0.remote_ip4)
2979 self.assertEqual(p[TCP].sport, 20)
2980 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2981 self.assertEqual(data, p[Raw].load)
2983 reass = self.vapi.nat_reass_dump()
2984 reass_n_end = len(reass)
2986 self.assertEqual(reass_n_end - reass_n_start, 2)
2988 def test_reass_hairpinning(self):
2989 """ NAT44 fragments hairpinning """
2990 server = self.pg0.remote_hosts[1]
2991 host_in_port = random.randint(1025, 65535)
2992 server_in_port = random.randint(1025, 65535)
2993 server_out_port = random.randint(1025, 65535)
2994 data = "A" * 4 + "B" * 16 + "C" * 3
2996 self.nat44_add_address(self.nat_addr)
2997 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2998 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3000 # add static mapping for server
3001 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3002 server_in_port, server_out_port,
3003 proto=IP_PROTOS.tcp)
3005 # send packet from host to server
3006 pkts = self.create_stream_frag(self.pg0,
3011 self.pg0.add_stream(pkts)
3012 self.pg_enable_capture(self.pg_interfaces)
3014 frags = self.pg0.get_capture(len(pkts))
3015 p = self.reass_frags_and_verify(frags,
3018 self.assertNotEqual(p[TCP].sport, host_in_port)
3019 self.assertEqual(p[TCP].dport, server_in_port)
3020 self.assertEqual(data, p[Raw].load)
3022 def test_frag_out_of_order(self):
3023 """ NAT44 translate fragments arriving out of order """
3024 self.nat44_add_address(self.nat_addr)
3025 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3026 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3029 data = "A" * 4 + "B" * 16 + "C" * 3
3030 random.randint(1025, 65535)
3033 pkts = self.create_stream_frag(self.pg0,
3034 self.pg1.remote_ip4,
3039 self.pg0.add_stream(pkts)
3040 self.pg_enable_capture(self.pg_interfaces)
3042 frags = self.pg1.get_capture(len(pkts))
3043 p = self.reass_frags_and_verify(frags,
3045 self.pg1.remote_ip4)
3046 self.assertEqual(p[TCP].dport, 20)
3047 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3048 self.tcp_port_out = p[TCP].sport
3049 self.assertEqual(data, p[Raw].load)
3052 pkts = self.create_stream_frag(self.pg1,
3058 self.pg1.add_stream(pkts)
3059 self.pg_enable_capture(self.pg_interfaces)
3061 frags = self.pg0.get_capture(len(pkts))
3062 p = self.reass_frags_and_verify(frags,
3063 self.pg1.remote_ip4,
3064 self.pg0.remote_ip4)
3065 self.assertEqual(p[TCP].sport, 20)
3066 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3067 self.assertEqual(data, p[Raw].load)
3069 def test_port_restricted(self):
3070 """ Port restricted NAT44 (MAP-E CE) """
3071 self.nat44_add_address(self.nat_addr)
3072 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3073 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3075 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3076 "psid-offset 6 psid-len 6")
3078 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3079 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3080 TCP(sport=4567, dport=22))
3081 self.pg0.add_stream(p)
3082 self.pg_enable_capture(self.pg_interfaces)
3084 capture = self.pg1.get_capture(1)
3089 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3090 self.assertEqual(ip.src, self.nat_addr)
3091 self.assertEqual(tcp.dport, 22)
3092 self.assertNotEqual(tcp.sport, 4567)
3093 self.assertEqual((tcp.sport >> 6) & 63, 10)
3094 self.assert_packet_checksums_valid(p)
3096 self.logger.error(ppp("Unexpected or invalid packet:", p))
3099 def test_ipfix_max_frags(self):
3100 """ IPFIX logging maximum fragments pending reassembly exceeded """
3101 self.nat44_add_address(self.nat_addr)
3102 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3103 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3105 self.vapi.nat_set_reass(max_frag=0)
3106 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3107 src_address=self.pg3.local_ip4n,
3109 template_interval=10)
3110 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3111 src_port=self.ipfix_src_port)
3113 data = "A" * 4 + "B" * 16 + "C" * 3
3114 self.tcp_port_in = random.randint(1025, 65535)
3115 pkts = self.create_stream_frag(self.pg0,
3116 self.pg1.remote_ip4,
3120 self.pg0.add_stream(pkts[-1])
3121 self.pg_enable_capture(self.pg_interfaces)
3123 self.pg1.assert_nothing_captured()
3125 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3126 capture = self.pg3.get_capture(9)
3127 ipfix = IPFIXDecoder()
3128 # first load template
3130 self.assertTrue(p.haslayer(IPFIX))
3131 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3132 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3133 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3134 self.assertEqual(p[UDP].dport, 4739)
3135 self.assertEqual(p[IPFIX].observationDomainID,
3136 self.ipfix_domain_id)
3137 if p.haslayer(Template):
3138 ipfix.add_template(p.getlayer(Template))
3139 # verify events in data set
3141 if p.haslayer(Data):
3142 data = ipfix.decode_data_set(p.getlayer(Set))
3143 self.verify_ipfix_max_fragments_ip4(data, 0,
3144 self.pg0.remote_ip4n)
3147 super(TestNAT44, self).tearDown()
3148 if not self.vpp_dead:
3149 self.logger.info(self.vapi.cli("show nat44 addresses"))
3150 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3151 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3152 self.logger.info(self.vapi.cli("show nat44 interface address"))
3153 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3154 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3155 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3156 self.vapi.cli("nat addr-port-assignment-alg default")
3158 self.vapi.cli("clear logging")
3161 class TestNAT44EndpointDependent(MethodHolder):
3162 """ Endpoint-Dependent mapping and filtering test cases """
3165 def setUpConstants(cls):
3166 super(TestNAT44EndpointDependent, cls).setUpConstants()
3167 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3170 def setUpClass(cls):
3171 super(TestNAT44EndpointDependent, cls).setUpClass()
3172 cls.vapi.cli("set log class nat level debug")
3174 cls.tcp_port_in = 6303
3175 cls.tcp_port_out = 6303
3176 cls.udp_port_in = 6304
3177 cls.udp_port_out = 6304
3178 cls.icmp_id_in = 6305
3179 cls.icmp_id_out = 6305
3180 cls.nat_addr = '10.0.0.3'
3181 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3182 cls.ipfix_src_port = 4739
3183 cls.ipfix_domain_id = 1
3184 cls.tcp_external_port = 80
3186 cls.create_pg_interfaces(range(5))
3187 cls.interfaces = list(cls.pg_interfaces[0:3])
3189 for i in cls.interfaces:
3194 cls.pg0.generate_remote_hosts(3)
3195 cls.pg0.configure_ipv4_neighbors()
3199 cls.pg4.generate_remote_hosts(2)
3200 cls.pg4.config_ip4()
3201 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3202 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3206 cls.pg4.resolve_arp()
3207 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3208 cls.pg4.resolve_arp()
3211 super(TestNAT44EndpointDependent, cls).tearDownClass()
3214 def test_dynamic(self):
3215 """ NAT44 dynamic translation test """
3217 self.nat44_add_address(self.nat_addr)
3218 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3219 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3223 pkts = self.create_stream_in(self.pg0, self.pg1)
3224 self.pg0.add_stream(pkts)
3225 self.pg_enable_capture(self.pg_interfaces)
3227 capture = self.pg1.get_capture(len(pkts))
3228 self.verify_capture_out(capture)
3231 pkts = self.create_stream_out(self.pg1)
3232 self.pg1.add_stream(pkts)
3233 self.pg_enable_capture(self.pg_interfaces)
3235 capture = self.pg0.get_capture(len(pkts))
3236 self.verify_capture_in(capture, self.pg0)
3238 def test_forwarding(self):
3239 """ NAT44 forwarding test """
3241 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3242 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3244 self.vapi.nat44_forwarding_enable_disable(1)
3246 real_ip = self.pg0.remote_ip4n
3247 alias_ip = self.nat_addr_n
3248 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3249 external_ip=alias_ip)
3252 # in2out - static mapping match
3254 pkts = self.create_stream_out(self.pg1)
3255 self.pg1.add_stream(pkts)
3256 self.pg_enable_capture(self.pg_interfaces)
3258 capture = self.pg0.get_capture(len(pkts))
3259 self.verify_capture_in(capture, self.pg0)
3261 pkts = self.create_stream_in(self.pg0, self.pg1)
3262 self.pg0.add_stream(pkts)
3263 self.pg_enable_capture(self.pg_interfaces)
3265 capture = self.pg1.get_capture(len(pkts))
3266 self.verify_capture_out(capture, same_port=True)
3268 # in2out - no static mapping match
3270 host0 = self.pg0.remote_hosts[0]
3271 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3273 pkts = self.create_stream_out(self.pg1,
3274 dst_ip=self.pg0.remote_ip4,
3275 use_inside_ports=True)
3276 self.pg1.add_stream(pkts)
3277 self.pg_enable_capture(self.pg_interfaces)
3279 capture = self.pg0.get_capture(len(pkts))
3280 self.verify_capture_in(capture, self.pg0)
3282 pkts = self.create_stream_in(self.pg0, self.pg1)
3283 self.pg0.add_stream(pkts)
3284 self.pg_enable_capture(self.pg_interfaces)
3286 capture = self.pg1.get_capture(len(pkts))
3287 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3290 self.pg0.remote_hosts[0] = host0
3292 user = self.pg0.remote_hosts[1]
3293 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3294 self.assertEqual(len(sessions), 3)
3295 self.assertTrue(sessions[0].ext_host_valid)
3296 self.vapi.nat44_del_session(
3297 sessions[0].inside_ip_address,
3298 sessions[0].inside_port,
3299 sessions[0].protocol,
3300 ext_host_address=sessions[0].ext_host_address,
3301 ext_host_port=sessions[0].ext_host_port)
3302 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3303 self.assertEqual(len(sessions), 2)
3306 self.vapi.nat44_forwarding_enable_disable(0)
3307 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3308 external_ip=alias_ip,
3311 def test_static_lb(self):
3312 """ NAT44 local service load balancing """
3313 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3316 server1 = self.pg0.remote_hosts[0]
3317 server2 = self.pg0.remote_hosts[1]
3319 locals = [{'addr': server1.ip4n,
3322 {'addr': server2.ip4n,
3326 self.nat44_add_address(self.nat_addr)
3327 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3330 local_num=len(locals),
3332 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3333 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3336 # from client to service
3337 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3338 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3339 TCP(sport=12345, dport=external_port))
3340 self.pg1.add_stream(p)
3341 self.pg_enable_capture(self.pg_interfaces)
3343 capture = self.pg0.get_capture(1)
3349 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3350 if ip.dst == server1.ip4:
3354 self.assertEqual(tcp.dport, local_port)
3355 self.assert_packet_checksums_valid(p)
3357 self.logger.error(ppp("Unexpected or invalid packet:", p))
3360 # from service back to client
3361 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3362 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3363 TCP(sport=local_port, dport=12345))
3364 self.pg0.add_stream(p)
3365 self.pg_enable_capture(self.pg_interfaces)
3367 capture = self.pg1.get_capture(1)
3372 self.assertEqual(ip.src, self.nat_addr)
3373 self.assertEqual(tcp.sport, external_port)
3374 self.assert_packet_checksums_valid(p)
3376 self.logger.error(ppp("Unexpected or invalid packet:", p))
3379 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3380 self.assertEqual(len(sessions), 1)
3381 self.assertTrue(sessions[0].ext_host_valid)
3382 self.vapi.nat44_del_session(
3383 sessions[0].inside_ip_address,
3384 sessions[0].inside_port,
3385 sessions[0].protocol,
3386 ext_host_address=sessions[0].ext_host_address,
3387 ext_host_port=sessions[0].ext_host_port)
3388 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3389 self.assertEqual(len(sessions), 0)
3391 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3392 def test_static_lb_multi_clients(self):
3393 """ NAT44 local service load balancing - multiple clients"""
3395 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3398 server1 = self.pg0.remote_hosts[0]
3399 server2 = self.pg0.remote_hosts[1]
3401 locals = [{'addr': server1.ip4n,
3404 {'addr': server2.ip4n,
3408 self.nat44_add_address(self.nat_addr)
3409 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3412 local_num=len(locals),
3414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3415 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3420 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3422 for client in clients:
3423 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3424 IP(src=client, dst=self.nat_addr) /
3425 TCP(sport=12345, dport=external_port))
3427 self.pg1.add_stream(pkts)
3428 self.pg_enable_capture(self.pg_interfaces)
3430 capture = self.pg0.get_capture(len(pkts))
3432 if p[IP].dst == server1.ip4:
3436 self.assertTrue(server1_n > server2_n)
3438 def test_static_lb_2(self):
3439 """ NAT44 local service load balancing (asymmetrical rule) """
3440 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3443 server1 = self.pg0.remote_hosts[0]
3444 server2 = self.pg0.remote_hosts[1]
3446 locals = [{'addr': server1.ip4n,
3449 {'addr': server2.ip4n,
3453 self.vapi.nat44_forwarding_enable_disable(1)
3454 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3458 local_num=len(locals),
3460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3464 # from client to service
3465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3466 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3467 TCP(sport=12345, dport=external_port))
3468 self.pg1.add_stream(p)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(1)
3477 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3478 if ip.dst == server1.ip4:
3482 self.assertEqual(tcp.dport, local_port)
3483 self.assert_packet_checksums_valid(p)
3485 self.logger.error(ppp("Unexpected or invalid packet:", p))
3488 # from service back to client
3489 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3490 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3491 TCP(sport=local_port, dport=12345))
3492 self.pg0.add_stream(p)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 capture = self.pg1.get_capture(1)
3500 self.assertEqual(ip.src, self.nat_addr)
3501 self.assertEqual(tcp.sport, external_port)
3502 self.assert_packet_checksums_valid(p)
3504 self.logger.error(ppp("Unexpected or invalid packet:", p))
3507 # from client to server (no translation)
3508 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3509 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3510 TCP(sport=12346, dport=local_port))
3511 self.pg1.add_stream(p)
3512 self.pg_enable_capture(self.pg_interfaces)
3514 capture = self.pg0.get_capture(1)
3520 self.assertEqual(ip.dst, server1.ip4)
3521 self.assertEqual(tcp.dport, local_port)
3522 self.assert_packet_checksums_valid(p)
3524 self.logger.error(ppp("Unexpected or invalid packet:", p))
3527 # from service back to client (no translation)
3528 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3529 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3530 TCP(sport=local_port, dport=12346))
3531 self.pg0.add_stream(p)
3532 self.pg_enable_capture(self.pg_interfaces)
3534 capture = self.pg1.get_capture(1)
3539 self.assertEqual(ip.src, server1.ip4)
3540 self.assertEqual(tcp.sport, local_port)
3541 self.assert_packet_checksums_valid(p)
3543 self.logger.error(ppp("Unexpected or invalid packet:", p))
3546 def test_unknown_proto(self):
3547 """ NAT44 translate packet with unknown protocol """
3548 self.nat44_add_address(self.nat_addr)
3549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3550 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3554 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3555 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3556 TCP(sport=self.tcp_port_in, dport=20))
3557 self.pg0.add_stream(p)
3558 self.pg_enable_capture(self.pg_interfaces)
3560 p = self.pg1.get_capture(1)
3562 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3563 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3565 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3566 TCP(sport=1234, dport=1234))
3567 self.pg0.add_stream(p)
3568 self.pg_enable_capture(self.pg_interfaces)
3570 p = self.pg1.get_capture(1)
3573 self.assertEqual(packet[IP].src, self.nat_addr)
3574 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3575 self.assertTrue(packet.haslayer(GRE))
3576 self.assert_packet_checksums_valid(packet)
3578 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3582 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3583 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3585 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3586 TCP(sport=1234, dport=1234))
3587 self.pg1.add_stream(p)
3588 self.pg_enable_capture(self.pg_interfaces)
3590 p = self.pg0.get_capture(1)
3593 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3594 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3595 self.assertTrue(packet.haslayer(GRE))
3596 self.assert_packet_checksums_valid(packet)
3598 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3601 def test_hairpinning_unknown_proto(self):
3602 """ NAT44 translate packet with unknown protocol - hairpinning """
3603 host = self.pg0.remote_hosts[0]
3604 server = self.pg0.remote_hosts[1]
3606 server_out_port = 8765
3607 server_nat_ip = "10.0.0.11"
3609 self.nat44_add_address(self.nat_addr)
3610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3611 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3614 # add static mapping for server
3615 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3618 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3619 IP(src=host.ip4, dst=server_nat_ip) /
3620 TCP(sport=host_in_port, dport=server_out_port))
3621 self.pg0.add_stream(p)
3622 self.pg_enable_capture(self.pg_interfaces)
3624 self.pg0.get_capture(1)
3626 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3627 IP(src=host.ip4, dst=server_nat_ip) /
3629 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3630 TCP(sport=1234, dport=1234))
3631 self.pg0.add_stream(p)
3632 self.pg_enable_capture(self.pg_interfaces)
3634 p = self.pg0.get_capture(1)
3637 self.assertEqual(packet[IP].src, self.nat_addr)
3638 self.assertEqual(packet[IP].dst, server.ip4)
3639 self.assertTrue(packet.haslayer(GRE))
3640 self.assert_packet_checksums_valid(packet)
3642 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3646 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3647 IP(src=server.ip4, dst=self.nat_addr) /
3649 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3650 TCP(sport=1234, dport=1234))
3651 self.pg0.add_stream(p)
3652 self.pg_enable_capture(self.pg_interfaces)
3654 p = self.pg0.get_capture(1)
3657 self.assertEqual(packet[IP].src, server_nat_ip)
3658 self.assertEqual(packet[IP].dst, host.ip4)
3659 self.assertTrue(packet.haslayer(GRE))
3660 self.assert_packet_checksums_valid(packet)
3662 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3665 def test_output_feature_and_service(self):
3666 """ NAT44 interface output feature and services """
3667 external_addr = '1.2.3.4'
3671 self.vapi.nat44_forwarding_enable_disable(1)
3672 self.nat44_add_address(self.nat_addr)
3673 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3674 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3675 local_port, external_port,
3676 proto=IP_PROTOS.tcp, out2in_only=1)
3677 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3678 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3680 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3683 # from client to service
3684 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3685 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3686 TCP(sport=12345, dport=external_port))
3687 self.pg1.add_stream(p)
3688 self.pg_enable_capture(self.pg_interfaces)
3690 capture = self.pg0.get_capture(1)
3695 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3696 self.assertEqual(tcp.dport, local_port)
3697 self.assert_packet_checksums_valid(p)
3699 self.logger.error(ppp("Unexpected or invalid packet:", p))
3702 # from service back to client
3703 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3704 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3705 TCP(sport=local_port, dport=12345))
3706 self.pg0.add_stream(p)
3707 self.pg_enable_capture(self.pg_interfaces)
3709 capture = self.pg1.get_capture(1)
3714 self.assertEqual(ip.src, external_addr)
3715 self.assertEqual(tcp.sport, external_port)
3716 self.assert_packet_checksums_valid(p)
3718 self.logger.error(ppp("Unexpected or invalid packet:", p))
3721 # from local network host to external network
3722 pkts = self.create_stream_in(self.pg0, self.pg1)
3723 self.pg0.add_stream(pkts)
3724 self.pg_enable_capture(self.pg_interfaces)
3726 capture = self.pg1.get_capture(len(pkts))
3727 self.verify_capture_out(capture)
3728 pkts = self.create_stream_in(self.pg0, self.pg1)
3729 self.pg0.add_stream(pkts)
3730 self.pg_enable_capture(self.pg_interfaces)
3732 capture = self.pg1.get_capture(len(pkts))
3733 self.verify_capture_out(capture)
3735 # from external network back to local network host
3736 pkts = self.create_stream_out(self.pg1)
3737 self.pg1.add_stream(pkts)
3738 self.pg_enable_capture(self.pg_interfaces)
3740 capture = self.pg0.get_capture(len(pkts))
3741 self.verify_capture_in(capture, self.pg0)
3743 def test_output_feature_and_service2(self):
3744 """ NAT44 interface output feature and service host direct access """
3745 self.vapi.nat44_forwarding_enable_disable(1)
3746 self.nat44_add_address(self.nat_addr)
3747 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3750 # session initiaded from service host - translate
3751 pkts = self.create_stream_in(self.pg0, self.pg1)
3752 self.pg0.add_stream(pkts)
3753 self.pg_enable_capture(self.pg_interfaces)
3755 capture = self.pg1.get_capture(len(pkts))
3756 self.verify_capture_out(capture)
3758 pkts = self.create_stream_out(self.pg1)
3759 self.pg1.add_stream(pkts)
3760 self.pg_enable_capture(self.pg_interfaces)
3762 capture = self.pg0.get_capture(len(pkts))
3763 self.verify_capture_in(capture, self.pg0)
3765 # session initiaded from remote host - do not translate
3766 self.tcp_port_in = 60303
3767 self.udp_port_in = 60304
3768 self.icmp_id_in = 60305
3769 pkts = self.create_stream_out(self.pg1,
3770 self.pg0.remote_ip4,
3771 use_inside_ports=True)
3772 self.pg1.add_stream(pkts)
3773 self.pg_enable_capture(self.pg_interfaces)
3775 capture = self.pg0.get_capture(len(pkts))
3776 self.verify_capture_in(capture, self.pg0)
3778 pkts = self.create_stream_in(self.pg0, self.pg1)
3779 self.pg0.add_stream(pkts)
3780 self.pg_enable_capture(self.pg_interfaces)
3782 capture = self.pg1.get_capture(len(pkts))
3783 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3786 def test_output_feature_and_service3(self):
3787 """ NAT44 interface output feature and DST NAT """
3788 external_addr = '1.2.3.4'
3792 self.vapi.nat44_forwarding_enable_disable(1)
3793 self.nat44_add_address(self.nat_addr)
3794 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3795 local_port, external_port,
3796 proto=IP_PROTOS.tcp, out2in_only=1)
3797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3798 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3800 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3804 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3805 TCP(sport=12345, dport=external_port))
3806 self.pg0.add_stream(p)
3807 self.pg_enable_capture(self.pg_interfaces)
3809 capture = self.pg1.get_capture(1)
3814 self.assertEqual(ip.src, self.pg0.remote_ip4)
3815 self.assertEqual(tcp.sport, 12345)
3816 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3817 self.assertEqual(tcp.dport, local_port)
3818 self.assert_packet_checksums_valid(p)
3820 self.logger.error(ppp("Unexpected or invalid packet:", p))
3823 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3824 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3825 TCP(sport=local_port, dport=12345))
3826 self.pg1.add_stream(p)
3827 self.pg_enable_capture(self.pg_interfaces)
3829 capture = self.pg0.get_capture(1)
3834 self.assertEqual(ip.src, external_addr)
3835 self.assertEqual(tcp.sport, external_port)
3836 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3837 self.assertEqual(tcp.dport, 12345)
3838 self.assert_packet_checksums_valid(p)
3840 self.logger.error(ppp("Unexpected or invalid packet:", p))
3843 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3845 twice_nat_addr = '10.0.1.3'
3853 port_in1 = port_in+1
3854 port_in2 = port_in+2
3859 server1 = self.pg0.remote_hosts[0]
3860 server2 = self.pg0.remote_hosts[1]
3872 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3875 self.nat44_add_address(self.nat_addr)
3876 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3878 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3880 proto=IP_PROTOS.tcp,
3881 twice_nat=int(not self_twice_nat),
3882 self_twice_nat=int(self_twice_nat))
3884 locals = [{'addr': server1.ip4n,
3887 {'addr': server2.ip4n,
3890 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3891 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3895 not self_twice_nat),
3898 local_num=len(locals),
3900 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3901 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3908 assert client_id is not None
3910 client = self.pg0.remote_hosts[0]
3911 elif client_id == 2:
3912 client = self.pg0.remote_hosts[1]
3914 client = pg1.remote_hosts[0]
3915 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3916 IP(src=client.ip4, dst=self.nat_addr) /
3917 TCP(sport=eh_port_out, dport=port_out))
3919 self.pg_enable_capture(self.pg_interfaces)
3921 capture = pg0.get_capture(1)
3927 if ip.dst == server1.ip4:
3933 self.assertEqual(ip.dst, server.ip4)
3935 self.assertIn(tcp.dport, [port_in1, port_in2])
3937 self.assertEqual(tcp.dport, port_in)
3939 self.assertEqual(ip.src, twice_nat_addr)
3940 self.assertNotEqual(tcp.sport, eh_port_out)
3942 self.assertEqual(ip.src, client.ip4)
3943 self.assertEqual(tcp.sport, eh_port_out)
3945 eh_port_in = tcp.sport
3946 saved_port_in = tcp.dport
3947 self.assert_packet_checksums_valid(p)
3949 self.logger.error(ppp("Unexpected or invalid packet:", p))
3952 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3953 IP(src=server.ip4, dst=eh_addr_in) /
3954 TCP(sport=saved_port_in, dport=eh_port_in))
3956 self.pg_enable_capture(self.pg_interfaces)
3958 capture = pg1.get_capture(1)
3963 self.assertEqual(ip.dst, client.ip4)
3964 self.assertEqual(ip.src, self.nat_addr)
3965 self.assertEqual(tcp.dport, eh_port_out)
3966 self.assertEqual(tcp.sport, port_out)
3967 self.assert_packet_checksums_valid(p)
3969 self.logger.error(ppp("Unexpected or invalid packet:", p))
3973 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3974 self.assertEqual(len(sessions), 1)
3975 self.assertTrue(sessions[0].ext_host_valid)
3976 self.assertTrue(sessions[0].is_twicenat)
3977 self.vapi.nat44_del_session(
3978 sessions[0].inside_ip_address,
3979 sessions[0].inside_port,
3980 sessions[0].protocol,
3981 ext_host_address=sessions[0].ext_host_nat_address,
3982 ext_host_port=sessions[0].ext_host_nat_port)
3983 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3984 self.assertEqual(len(sessions), 0)
3986 def test_twice_nat(self):
3988 self.twice_nat_common()
3990 def test_self_twice_nat_positive(self):
3991 """ Self Twice NAT44 (positive test) """
3992 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3994 def test_self_twice_nat_negative(self):
3995 """ Self Twice NAT44 (negative test) """
3996 self.twice_nat_common(self_twice_nat=True)
3998 def test_twice_nat_lb(self):
3999 """ Twice NAT44 local service load balancing """
4000 self.twice_nat_common(lb=True)
4002 def test_self_twice_nat_lb_positive(self):
4003 """ Self Twice NAT44 local service load balancing (positive test) """
4004 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4007 def test_self_twice_nat_lb_negative(self):
4008 """ Self Twice NAT44 local service load balancing (negative test) """
4009 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4012 def test_twice_nat_interface_addr(self):
4013 """ Acquire twice NAT44 addresses from interface """
4014 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4016 # no address in NAT pool
4017 adresses = self.vapi.nat44_address_dump()
4018 self.assertEqual(0, len(adresses))
4020 # configure interface address and check NAT address pool
4021 self.pg3.config_ip4()
4022 adresses = self.vapi.nat44_address_dump()
4023 self.assertEqual(1, len(adresses))
4024 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4025 self.assertEqual(adresses[0].twice_nat, 1)
4027 # remove interface address and check NAT address pool
4028 self.pg3.unconfig_ip4()
4029 adresses = self.vapi.nat44_address_dump()
4030 self.assertEqual(0, len(adresses))
4032 def test_tcp_session_close_in(self):
4033 """ Close TCP session from inside network """
4034 self.tcp_port_out = 10505
4035 self.nat44_add_address(self.nat_addr)
4036 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4040 proto=IP_PROTOS.tcp,
4042 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4043 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4046 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4047 start_sessnum = len(sessions)
4049 self.initiate_tcp_session(self.pg0, self.pg1)
4051 # FIN packet in -> out
4052 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4053 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4054 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4055 flags="FA", seq=100, ack=300))
4056 self.pg0.add_stream(p)
4057 self.pg_enable_capture(self.pg_interfaces)
4059 self.pg1.get_capture(1)
4063 # ACK packet out -> in
4064 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4065 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4066 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4067 flags="A", seq=300, ack=101))
4070 # FIN packet out -> in
4071 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4072 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4073 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4074 flags="FA", seq=300, ack=101))
4077 self.pg1.add_stream(pkts)
4078 self.pg_enable_capture(self.pg_interfaces)
4080 self.pg0.get_capture(2)
4082 # ACK packet in -> out
4083 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4084 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4085 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4086 flags="A", seq=101, ack=301))
4087 self.pg0.add_stream(p)
4088 self.pg_enable_capture(self.pg_interfaces)
4090 self.pg1.get_capture(1)
4092 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4094 self.assertEqual(len(sessions) - start_sessnum, 0)
4096 def test_tcp_session_close_out(self):
4097 """ Close TCP session from outside network """
4098 self.tcp_port_out = 10505
4099 self.nat44_add_address(self.nat_addr)
4100 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4104 proto=IP_PROTOS.tcp,
4106 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4107 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4110 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4111 start_sessnum = len(sessions)
4113 self.initiate_tcp_session(self.pg0, self.pg1)
4115 # FIN packet out -> in
4116 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4117 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4118 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4119 flags="FA", seq=100, ack=300))
4120 self.pg1.add_stream(p)
4121 self.pg_enable_capture(self.pg_interfaces)
4123 self.pg0.get_capture(1)
4125 # FIN+ACK packet in -> out
4126 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4127 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4128 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4129 flags="FA", seq=300, ack=101))
4131 self.pg0.add_stream(p)
4132 self.pg_enable_capture(self.pg_interfaces)
4134 self.pg1.get_capture(1)
4136 # ACK packet out -> in
4137 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4138 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4139 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4140 flags="A", seq=101, ack=301))
4141 self.pg1.add_stream(p)
4142 self.pg_enable_capture(self.pg_interfaces)
4144 self.pg0.get_capture(1)
4146 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4148 self.assertEqual(len(sessions) - start_sessnum, 0)
4150 def test_tcp_session_close_simultaneous(self):
4151 """ Close TCP session from inside network """
4152 self.tcp_port_out = 10505
4153 self.nat44_add_address(self.nat_addr)
4154 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4158 proto=IP_PROTOS.tcp,
4160 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4161 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4164 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4165 start_sessnum = len(sessions)
4167 self.initiate_tcp_session(self.pg0, self.pg1)
4169 # FIN packet in -> out
4170 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4171 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4172 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4173 flags="FA", seq=100, ack=300))
4174 self.pg0.add_stream(p)
4175 self.pg_enable_capture(self.pg_interfaces)
4177 self.pg1.get_capture(1)
4179 # FIN packet out -> in
4180 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4181 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4182 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4183 flags="FA", seq=300, ack=100))
4184 self.pg1.add_stream(p)
4185 self.pg_enable_capture(self.pg_interfaces)
4187 self.pg0.get_capture(1)
4189 # ACK packet in -> out
4190 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4191 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4192 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4193 flags="A", seq=101, ack=301))
4194 self.pg0.add_stream(p)
4195 self.pg_enable_capture(self.pg_interfaces)
4197 self.pg1.get_capture(1)
4199 # ACK packet out -> in
4200 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4201 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4202 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4203 flags="A", seq=301, ack=101))
4204 self.pg1.add_stream(p)
4205 self.pg_enable_capture(self.pg_interfaces)
4207 self.pg0.get_capture(1)
4209 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4211 self.assertEqual(len(sessions) - start_sessnum, 0)
4213 def test_one_armed_nat44_static(self):
4214 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4215 remote_host = self.pg4.remote_hosts[0]
4216 local_host = self.pg4.remote_hosts[1]
4221 self.vapi.nat44_forwarding_enable_disable(1)
4222 self.nat44_add_address(self.nat_addr, twice_nat=1)
4223 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4224 local_port, external_port,
4225 proto=IP_PROTOS.tcp, out2in_only=1,
4227 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4228 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4231 # from client to service
4232 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4233 IP(src=remote_host.ip4, dst=self.nat_addr) /
4234 TCP(sport=12345, dport=external_port))
4235 self.pg4.add_stream(p)
4236 self.pg_enable_capture(self.pg_interfaces)
4238 capture = self.pg4.get_capture(1)
4243 self.assertEqual(ip.dst, local_host.ip4)
4244 self.assertEqual(ip.src, self.nat_addr)
4245 self.assertEqual(tcp.dport, local_port)
4246 self.assertNotEqual(tcp.sport, 12345)
4247 eh_port_in = tcp.sport
4248 self.assert_packet_checksums_valid(p)
4250 self.logger.error(ppp("Unexpected or invalid packet:", p))
4253 # from service back to client
4254 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4255 IP(src=local_host.ip4, dst=self.nat_addr) /
4256 TCP(sport=local_port, dport=eh_port_in))
4257 self.pg4.add_stream(p)
4258 self.pg_enable_capture(self.pg_interfaces)
4260 capture = self.pg4.get_capture(1)
4265 self.assertEqual(ip.src, self.nat_addr)
4266 self.assertEqual(ip.dst, remote_host.ip4)
4267 self.assertEqual(tcp.sport, external_port)
4268 self.assertEqual(tcp.dport, 12345)
4269 self.assert_packet_checksums_valid(p)
4271 self.logger.error(ppp("Unexpected or invalid packet:", p))
4274 def test_static_with_port_out2(self):
4275 """ 1:1 NAPT asymmetrical rule """
4280 self.vapi.nat44_forwarding_enable_disable(1)
4281 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4282 local_port, external_port,
4283 proto=IP_PROTOS.tcp, out2in_only=1)
4284 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4285 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4288 # from client to service
4289 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4290 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4291 TCP(sport=12345, dport=external_port))
4292 self.pg1.add_stream(p)
4293 self.pg_enable_capture(self.pg_interfaces)
4295 capture = self.pg0.get_capture(1)
4300 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4301 self.assertEqual(tcp.dport, local_port)
4302 self.assert_packet_checksums_valid(p)
4304 self.logger.error(ppp("Unexpected or invalid packet:", p))
4308 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4309 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4310 ICMP(type=11) / capture[0][IP])
4311 self.pg0.add_stream(p)
4312 self.pg_enable_capture(self.pg_interfaces)
4314 capture = self.pg1.get_capture(1)
4317 self.assertEqual(p[IP].src, self.nat_addr)
4319 self.assertEqual(inner.dst, self.nat_addr)
4320 self.assertEqual(inner[TCPerror].dport, external_port)
4322 self.logger.error(ppp("Unexpected or invalid packet:", p))
4325 # from service back to client
4326 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4327 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4328 TCP(sport=local_port, dport=12345))
4329 self.pg0.add_stream(p)
4330 self.pg_enable_capture(self.pg_interfaces)
4332 capture = self.pg1.get_capture(1)
4337 self.assertEqual(ip.src, self.nat_addr)
4338 self.assertEqual(tcp.sport, external_port)
4339 self.assert_packet_checksums_valid(p)
4341 self.logger.error(ppp("Unexpected or invalid packet:", p))
4345 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4346 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4347 ICMP(type=11) / capture[0][IP])
4348 self.pg1.add_stream(p)
4349 self.pg_enable_capture(self.pg_interfaces)
4351 capture = self.pg0.get_capture(1)
4354 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4356 self.assertEqual(inner.src, self.pg0.remote_ip4)
4357 self.assertEqual(inner[TCPerror].sport, local_port)
4359 self.logger.error(ppp("Unexpected or invalid packet:", p))
4362 # from client to server (no translation)
4363 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4364 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4365 TCP(sport=12346, dport=local_port))
4366 self.pg1.add_stream(p)
4367 self.pg_enable_capture(self.pg_interfaces)
4369 capture = self.pg0.get_capture(1)
4374 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4375 self.assertEqual(tcp.dport, local_port)
4376 self.assert_packet_checksums_valid(p)
4378 self.logger.error(ppp("Unexpected or invalid packet:", p))
4381 # from service back to client (no translation)
4382 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4383 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4384 TCP(sport=local_port, dport=12346))
4385 self.pg0.add_stream(p)
4386 self.pg_enable_capture(self.pg_interfaces)
4388 capture = self.pg1.get_capture(1)
4393 self.assertEqual(ip.src, self.pg0.remote_ip4)
4394 self.assertEqual(tcp.sport, local_port)
4395 self.assert_packet_checksums_valid(p)
4397 self.logger.error(ppp("Unexpected or invalid packet:", p))
4400 def test_output_feature(self):
4401 """ NAT44 interface output feature (in2out postrouting) """
4402 self.vapi.nat44_forwarding_enable_disable(1)
4403 self.nat44_add_address(self.nat_addr)
4404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4406 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4410 pkts = self.create_stream_in(self.pg0, self.pg1)
4411 self.pg0.add_stream(pkts)
4412 self.pg_enable_capture(self.pg_interfaces)
4414 capture = self.pg1.get_capture(len(pkts))
4415 self.verify_capture_out(capture)
4418 pkts = self.create_stream_out(self.pg1)
4419 self.pg1.add_stream(pkts)
4420 self.pg_enable_capture(self.pg_interfaces)
4422 capture = self.pg0.get_capture(len(pkts))
4423 self.verify_capture_in(capture, self.pg0)
4426 super(TestNAT44EndpointDependent, self).tearDown()
4427 if not self.vpp_dead:
4428 self.logger.info(self.vapi.cli("show nat44 addresses"))
4429 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4430 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4431 self.logger.info(self.vapi.cli("show nat44 interface address"))
4432 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4433 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4435 self.vapi.cli("clear logging")
4438 class TestNAT44Out2InDPO(MethodHolder):
4439 """ NAT44 Test Cases using out2in DPO """
4442 def setUpConstants(cls):
4443 super(TestNAT44Out2InDPO, cls).setUpConstants()
4444 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4447 def setUpClass(cls):
4448 super(TestNAT44Out2InDPO, cls).setUpClass()
4449 cls.vapi.cli("set log class nat level debug")
4452 cls.tcp_port_in = 6303
4453 cls.tcp_port_out = 6303
4454 cls.udp_port_in = 6304
4455 cls.udp_port_out = 6304
4456 cls.icmp_id_in = 6305
4457 cls.icmp_id_out = 6305
4458 cls.nat_addr = '10.0.0.3'
4459 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4460 cls.dst_ip4 = '192.168.70.1'
4462 cls.create_pg_interfaces(range(2))
4465 cls.pg0.config_ip4()
4466 cls.pg0.resolve_arp()
4469 cls.pg1.config_ip6()
4470 cls.pg1.resolve_ndp()
4472 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4473 dst_address_length=0,
4474 next_hop_address=cls.pg1.remote_ip6n,
4475 next_hop_sw_if_index=cls.pg1.sw_if_index)
4478 super(TestNAT44Out2InDPO, cls).tearDownClass()
4481 def configure_xlat(self):
4482 self.dst_ip6_pfx = '1:2:3::'
4483 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4485 self.dst_ip6_pfx_len = 96
4486 self.src_ip6_pfx = '4:5:6::'
4487 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4489 self.src_ip6_pfx_len = 96
4490 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4491 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4492 '\x00\x00\x00\x00', 0, is_translation=1,
4495 def test_464xlat_ce(self):
4496 """ Test 464XLAT CE with NAT44 """
4498 self.configure_xlat()
4500 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4501 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4503 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4504 self.dst_ip6_pfx_len)
4505 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4506 self.src_ip6_pfx_len)
4509 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4510 self.pg0.add_stream(pkts)
4511 self.pg_enable_capture(self.pg_interfaces)
4513 capture = self.pg1.get_capture(len(pkts))
4514 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4517 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4519 self.pg1.add_stream(pkts)
4520 self.pg_enable_capture(self.pg_interfaces)
4522 capture = self.pg0.get_capture(len(pkts))
4523 self.verify_capture_in(capture, self.pg0)
4525 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4527 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4528 self.nat_addr_n, is_add=0)
4530 def test_464xlat_ce_no_nat(self):
4531 """ Test 464XLAT CE without NAT44 """
4533 self.configure_xlat()
4535 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4536 self.dst_ip6_pfx_len)
4537 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4538 self.src_ip6_pfx_len)
4540 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4541 self.pg0.add_stream(pkts)
4542 self.pg_enable_capture(self.pg_interfaces)
4544 capture = self.pg1.get_capture(len(pkts))
4545 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4546 nat_ip=out_dst_ip6, same_port=True)
4548 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4549 self.pg1.add_stream(pkts)
4550 self.pg_enable_capture(self.pg_interfaces)
4552 capture = self.pg0.get_capture(len(pkts))
4553 self.verify_capture_in(capture, self.pg0)
4556 class TestDeterministicNAT(MethodHolder):
4557 """ Deterministic NAT Test Cases """
4560 def setUpConstants(cls):
4561 super(TestDeterministicNAT, cls).setUpConstants()
4562 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4565 def setUpClass(cls):
4566 super(TestDeterministicNAT, cls).setUpClass()
4567 cls.vapi.cli("set log class nat level debug")
4570 cls.tcp_port_in = 6303
4571 cls.tcp_external_port = 6303
4572 cls.udp_port_in = 6304
4573 cls.udp_external_port = 6304
4574 cls.icmp_id_in = 6305
4575 cls.nat_addr = '10.0.0.3'
4577 cls.create_pg_interfaces(range(3))
4578 cls.interfaces = list(cls.pg_interfaces)
4580 for i in cls.interfaces:
4585 cls.pg0.generate_remote_hosts(2)
4586 cls.pg0.configure_ipv4_neighbors()
4589 super(TestDeterministicNAT, cls).tearDownClass()
4592 def create_stream_in(self, in_if, out_if, ttl=64):
4594 Create packet stream for inside network
4596 :param in_if: Inside interface
4597 :param out_if: Outside interface
4598 :param ttl: TTL of generated packets
4602 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4603 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4604 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4608 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4609 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4610 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4614 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4615 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4616 ICMP(id=self.icmp_id_in, type='echo-request'))
4621 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4623 Create packet stream for outside network
4625 :param out_if: Outside interface
4626 :param dst_ip: Destination IP address (Default use global NAT address)
4627 :param ttl: TTL of generated packets
4630 dst_ip = self.nat_addr
4633 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4634 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4635 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4639 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4640 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4641 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4645 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4646 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4647 ICMP(id=self.icmp_external_id, type='echo-reply'))
4652 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4654 Verify captured packets on outside network
4656 :param capture: Captured packets
4657 :param nat_ip: Translated IP address (Default use global NAT address)
4658 :param same_port: Sorce port number is not translated (Default False)
4659 :param packet_num: Expected number of packets (Default 3)
4662 nat_ip = self.nat_addr
4663 self.assertEqual(packet_num, len(capture))
4664 for packet in capture:
4666 self.assertEqual(packet[IP].src, nat_ip)
4667 if packet.haslayer(TCP):
4668 self.tcp_port_out = packet[TCP].sport
4669 elif packet.haslayer(UDP):
4670 self.udp_port_out = packet[UDP].sport
4672 self.icmp_external_id = packet[ICMP].id
4674 self.logger.error(ppp("Unexpected or invalid packet "
4675 "(outside network):", packet))
4678 def verify_ipfix_max_entries_per_user(self, data):
4680 Verify IPFIX maximum entries per user exceeded event
4682 :param data: Decoded IPFIX data records
4684 self.assertEqual(1, len(data))
4687 self.assertEqual(ord(record[230]), 13)
4688 # natQuotaExceededEvent
4689 self.assertEqual('\x03\x00\x00\x00', record[466])
4691 self.assertEqual('\xe8\x03\x00\x00', record[473])
4693 self.assertEqual(self.pg0.remote_ip4n, record[8])
4695 def test_deterministic_mode(self):
4696 """ NAT plugin run deterministic mode """
4697 in_addr = '172.16.255.0'
4698 out_addr = '172.17.255.50'
4699 in_addr_t = '172.16.255.20'
4700 in_addr_n = socket.inet_aton(in_addr)
4701 out_addr_n = socket.inet_aton(out_addr)
4702 in_addr_t_n = socket.inet_aton(in_addr_t)
4706 nat_config = self.vapi.nat_show_config()
4707 self.assertEqual(1, nat_config.deterministic)
4709 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4711 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4712 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4713 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4714 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4716 deterministic_mappings = self.vapi.nat_det_map_dump()
4717 self.assertEqual(len(deterministic_mappings), 1)
4718 dsm = deterministic_mappings[0]
4719 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4720 self.assertEqual(in_plen, dsm.in_plen)
4721 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4722 self.assertEqual(out_plen, dsm.out_plen)
4724 self.clear_nat_det()
4725 deterministic_mappings = self.vapi.nat_det_map_dump()
4726 self.assertEqual(len(deterministic_mappings), 0)
4728 def test_set_timeouts(self):
4729 """ Set deterministic NAT timeouts """
4730 timeouts_before = self.vapi.nat_det_get_timeouts()
4732 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4733 timeouts_before.tcp_established + 10,
4734 timeouts_before.tcp_transitory + 10,
4735 timeouts_before.icmp + 10)
4737 timeouts_after = self.vapi.nat_det_get_timeouts()
4739 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4740 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4741 self.assertNotEqual(timeouts_before.tcp_established,
4742 timeouts_after.tcp_established)
4743 self.assertNotEqual(timeouts_before.tcp_transitory,
4744 timeouts_after.tcp_transitory)
4746 def test_det_in(self):
4747 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4749 nat_ip = "10.0.0.10"
4751 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4753 socket.inet_aton(nat_ip),
4755 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4756 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4760 pkts = self.create_stream_in(self.pg0, self.pg1)
4761 self.pg0.add_stream(pkts)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 capture = self.pg1.get_capture(len(pkts))
4765 self.verify_capture_out(capture, nat_ip)
4768 pkts = self.create_stream_out(self.pg1, nat_ip)
4769 self.pg1.add_stream(pkts)
4770 self.pg_enable_capture(self.pg_interfaces)
4772 capture = self.pg0.get_capture(len(pkts))
4773 self.verify_capture_in(capture, self.pg0)
4776 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4777 self.assertEqual(len(sessions), 3)
4781 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4782 self.assertEqual(s.in_port, self.tcp_port_in)
4783 self.assertEqual(s.out_port, self.tcp_port_out)
4784 self.assertEqual(s.ext_port, self.tcp_external_port)
4788 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4789 self.assertEqual(s.in_port, self.udp_port_in)
4790 self.assertEqual(s.out_port, self.udp_port_out)
4791 self.assertEqual(s.ext_port, self.udp_external_port)
4795 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4796 self.assertEqual(s.in_port, self.icmp_id_in)
4797 self.assertEqual(s.out_port, self.icmp_external_id)
4799 def test_multiple_users(self):
4800 """ Deterministic NAT multiple users """
4802 nat_ip = "10.0.0.10"
4804 external_port = 6303
4806 host0 = self.pg0.remote_hosts[0]
4807 host1 = self.pg0.remote_hosts[1]
4809 self.vapi.nat_det_add_del_map(host0.ip4n,
4811 socket.inet_aton(nat_ip),
4813 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4814 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4818 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4819 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4820 TCP(sport=port_in, dport=external_port))
4821 self.pg0.add_stream(p)
4822 self.pg_enable_capture(self.pg_interfaces)
4824 capture = self.pg1.get_capture(1)
4829 self.assertEqual(ip.src, nat_ip)
4830 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4831 self.assertEqual(tcp.dport, external_port)
4832 port_out0 = tcp.sport
4834 self.logger.error(ppp("Unexpected or invalid packet:", p))
4838 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4839 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4840 TCP(sport=port_in, dport=external_port))
4841 self.pg0.add_stream(p)
4842 self.pg_enable_capture(self.pg_interfaces)
4844 capture = self.pg1.get_capture(1)
4849 self.assertEqual(ip.src, nat_ip)
4850 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4851 self.assertEqual(tcp.dport, external_port)
4852 port_out1 = tcp.sport
4854 self.logger.error(ppp("Unexpected or invalid packet:", p))
4857 dms = self.vapi.nat_det_map_dump()
4858 self.assertEqual(1, len(dms))
4859 self.assertEqual(2, dms[0].ses_num)
4862 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4863 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4864 TCP(sport=external_port, dport=port_out0))
4865 self.pg1.add_stream(p)
4866 self.pg_enable_capture(self.pg_interfaces)
4868 capture = self.pg0.get_capture(1)
4873 self.assertEqual(ip.src, self.pg1.remote_ip4)
4874 self.assertEqual(ip.dst, host0.ip4)
4875 self.assertEqual(tcp.dport, port_in)
4876 self.assertEqual(tcp.sport, external_port)
4878 self.logger.error(ppp("Unexpected or invalid packet:", p))
4882 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4883 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4884 TCP(sport=external_port, dport=port_out1))
4885 self.pg1.add_stream(p)
4886 self.pg_enable_capture(self.pg_interfaces)
4888 capture = self.pg0.get_capture(1)
4893 self.assertEqual(ip.src, self.pg1.remote_ip4)
4894 self.assertEqual(ip.dst, host1.ip4)
4895 self.assertEqual(tcp.dport, port_in)
4896 self.assertEqual(tcp.sport, external_port)
4898 self.logger.error(ppp("Unexpected or invalid packet", p))
4901 # session close api test
4902 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4904 self.pg1.remote_ip4n,
4906 dms = self.vapi.nat_det_map_dump()
4907 self.assertEqual(dms[0].ses_num, 1)
4909 self.vapi.nat_det_close_session_in(host0.ip4n,
4911 self.pg1.remote_ip4n,
4913 dms = self.vapi.nat_det_map_dump()
4914 self.assertEqual(dms[0].ses_num, 0)
4916 def test_tcp_session_close_detection_in(self):
4917 """ Deterministic NAT TCP session close from inside network """
4918 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4920 socket.inet_aton(self.nat_addr),
4922 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4923 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4926 self.initiate_tcp_session(self.pg0, self.pg1)
4928 # close the session from inside
4930 # FIN packet in -> out
4931 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4932 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4933 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4935 self.pg0.add_stream(p)
4936 self.pg_enable_capture(self.pg_interfaces)
4938 self.pg1.get_capture(1)
4942 # ACK packet out -> in
4943 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4944 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4945 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4949 # FIN packet out -> in
4950 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4951 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4952 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4956 self.pg1.add_stream(pkts)
4957 self.pg_enable_capture(self.pg_interfaces)
4959 self.pg0.get_capture(2)
4961 # ACK packet in -> out
4962 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4963 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4964 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4966 self.pg0.add_stream(p)
4967 self.pg_enable_capture(self.pg_interfaces)
4969 self.pg1.get_capture(1)
4971 # Check if deterministic NAT44 closed the session
4972 dms = self.vapi.nat_det_map_dump()
4973 self.assertEqual(0, dms[0].ses_num)
4975 self.logger.error("TCP session termination failed")
4978 def test_tcp_session_close_detection_out(self):
4979 """ Deterministic NAT TCP session close from outside network """
4980 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4982 socket.inet_aton(self.nat_addr),
4984 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4985 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4988 self.initiate_tcp_session(self.pg0, self.pg1)
4990 # close the session from outside
4992 # FIN packet out -> in
4993 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4994 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4995 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4997 self.pg1.add_stream(p)
4998 self.pg_enable_capture(self.pg_interfaces)
5000 self.pg0.get_capture(1)
5004 # ACK packet in -> out
5005 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5007 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5011 # ACK packet in -> out
5012 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5013 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5014 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5018 self.pg0.add_stream(pkts)
5019 self.pg_enable_capture(self.pg_interfaces)
5021 self.pg1.get_capture(2)
5023 # ACK packet out -> in
5024 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5025 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5026 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5028 self.pg1.add_stream(p)
5029 self.pg_enable_capture(self.pg_interfaces)
5031 self.pg0.get_capture(1)
5033 # Check if deterministic NAT44 closed the session
5034 dms = self.vapi.nat_det_map_dump()
5035 self.assertEqual(0, dms[0].ses_num)
5037 self.logger.error("TCP session termination failed")
5040 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5041 def test_session_timeout(self):
5042 """ Deterministic NAT session timeouts """
5043 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5045 socket.inet_aton(self.nat_addr),
5047 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5048 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5051 self.initiate_tcp_session(self.pg0, self.pg1)
5052 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5053 pkts = self.create_stream_in(self.pg0, self.pg1)
5054 self.pg0.add_stream(pkts)
5055 self.pg_enable_capture(self.pg_interfaces)
5057 capture = self.pg1.get_capture(len(pkts))
5060 dms = self.vapi.nat_det_map_dump()
5061 self.assertEqual(0, dms[0].ses_num)
5063 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5064 def test_session_limit_per_user(self):
5065 """ Deterministic NAT maximum sessions per user limit """
5066 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5068 socket.inet_aton(self.nat_addr),
5070 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5071 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5073 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5074 src_address=self.pg2.local_ip4n,
5076 template_interval=10)
5077 self.vapi.nat_ipfix()
5080 for port in range(1025, 2025):
5081 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5082 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5083 UDP(sport=port, dport=port))
5086 self.pg0.add_stream(pkts)
5087 self.pg_enable_capture(self.pg_interfaces)
5089 capture = self.pg1.get_capture(len(pkts))
5091 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5092 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5093 UDP(sport=3001, dport=3002))
5094 self.pg0.add_stream(p)
5095 self.pg_enable_capture(self.pg_interfaces)
5097 capture = self.pg1.assert_nothing_captured()
5099 # verify ICMP error packet
5100 capture = self.pg0.get_capture(1)
5102 self.assertTrue(p.haslayer(ICMP))
5104 self.assertEqual(icmp.type, 3)
5105 self.assertEqual(icmp.code, 1)
5106 self.assertTrue(icmp.haslayer(IPerror))
5107 inner_ip = icmp[IPerror]
5108 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5109 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5111 dms = self.vapi.nat_det_map_dump()
5113 self.assertEqual(1000, dms[0].ses_num)
5115 # verify IPFIX logging
5116 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5118 capture = self.pg2.get_capture(2)
5119 ipfix = IPFIXDecoder()
5120 # first load template
5122 self.assertTrue(p.haslayer(IPFIX))
5123 if p.haslayer(Template):
5124 ipfix.add_template(p.getlayer(Template))
5125 # verify events in data set
5127 if p.haslayer(Data):
5128 data = ipfix.decode_data_set(p.getlayer(Set))
5129 self.verify_ipfix_max_entries_per_user(data)
5131 def clear_nat_det(self):
5133 Clear deterministic NAT configuration.
5135 self.vapi.nat_ipfix(enable=0)
5136 self.vapi.nat_det_set_timeouts()
5137 deterministic_mappings = self.vapi.nat_det_map_dump()
5138 for dsm in deterministic_mappings:
5139 self.vapi.nat_det_add_del_map(dsm.in_addr,
5145 interfaces = self.vapi.nat44_interface_dump()
5146 for intf in interfaces:
5147 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5152 super(TestDeterministicNAT, self).tearDown()
5153 if not self.vpp_dead:
5154 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5156 self.vapi.cli("show nat44 deterministic mappings"))
5158 self.vapi.cli("show nat44 deterministic timeouts"))
5160 self.vapi.cli("show nat44 deterministic sessions"))
5161 self.clear_nat_det()
5164 class TestNAT64(MethodHolder):
5165 """ NAT64 Test Cases """
5168 def setUpConstants(cls):
5169 super(TestNAT64, cls).setUpConstants()
5170 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5171 "nat64 st hash buckets 256", "}"])
5174 def setUpClass(cls):
5175 super(TestNAT64, cls).setUpClass()
5178 cls.tcp_port_in = 6303
5179 cls.tcp_port_out = 6303
5180 cls.udp_port_in = 6304
5181 cls.udp_port_out = 6304
5182 cls.icmp_id_in = 6305
5183 cls.icmp_id_out = 6305
5184 cls.nat_addr = '10.0.0.3'
5185 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5187 cls.vrf1_nat_addr = '10.0.10.3'
5188 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5190 cls.ipfix_src_port = 4739
5191 cls.ipfix_domain_id = 1
5193 cls.create_pg_interfaces(range(6))
5194 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5195 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5196 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5198 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5200 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5202 cls.pg0.generate_remote_hosts(2)
5204 for i in cls.ip6_interfaces:
5207 i.configure_ipv6_neighbors()
5209 for i in cls.ip4_interfaces:
5215 cls.pg3.config_ip4()
5216 cls.pg3.resolve_arp()
5217 cls.pg3.config_ip6()
5218 cls.pg3.configure_ipv6_neighbors()
5221 cls.pg5.config_ip6()
5224 super(TestNAT64, cls).tearDownClass()
5227 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5228 """ NAT64 inside interface handles Neighbor Advertisement """
5230 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5233 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5234 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5235 ICMPv6EchoRequest())
5237 self.pg5.add_stream(pkts)
5238 self.pg_enable_capture(self.pg_interfaces)
5241 # Wait for Neighbor Solicitation
5242 capture = self.pg5.get_capture(len(pkts))
5243 self.assertEqual(1, len(capture))
5246 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5247 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5248 tgt = packet[ICMPv6ND_NS].tgt
5250 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5253 # Send Neighbor Advertisement
5254 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5255 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5256 ICMPv6ND_NA(tgt=tgt) /
5257 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5259 self.pg5.add_stream(pkts)
5260 self.pg_enable_capture(self.pg_interfaces)
5263 # Try to send ping again
5265 self.pg5.add_stream(pkts)
5266 self.pg_enable_capture(self.pg_interfaces)
5269 # Wait for ping reply
5270 capture = self.pg5.get_capture(len(pkts))
5271 self.assertEqual(1, len(capture))
5274 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5275 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5276 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5278 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5281 def test_pool(self):
5282 """ Add/delete address to NAT64 pool """
5283 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5285 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5287 addresses = self.vapi.nat64_pool_addr_dump()
5288 self.assertEqual(len(addresses), 1)
5289 self.assertEqual(addresses[0].address, nat_addr)
5291 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5293 addresses = self.vapi.nat64_pool_addr_dump()
5294 self.assertEqual(len(addresses), 0)
5296 def test_interface(self):
5297 """ Enable/disable NAT64 feature on the interface """
5298 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5299 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5301 interfaces = self.vapi.nat64_interface_dump()
5302 self.assertEqual(len(interfaces), 2)
5305 for intf in interfaces:
5306 if intf.sw_if_index == self.pg0.sw_if_index:
5307 self.assertEqual(intf.is_inside, 1)
5309 elif intf.sw_if_index == self.pg1.sw_if_index:
5310 self.assertEqual(intf.is_inside, 0)
5312 self.assertTrue(pg0_found)
5313 self.assertTrue(pg1_found)
5315 features = self.vapi.cli("show interface features pg0")
5316 self.assertNotEqual(features.find('nat64-in2out'), -1)
5317 features = self.vapi.cli("show interface features pg1")
5318 self.assertNotEqual(features.find('nat64-out2in'), -1)
5320 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5321 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5323 interfaces = self.vapi.nat64_interface_dump()
5324 self.assertEqual(len(interfaces), 0)
5326 def test_static_bib(self):
5327 """ Add/delete static BIB entry """
5328 in_addr = socket.inet_pton(socket.AF_INET6,
5329 '2001:db8:85a3::8a2e:370:7334')
5330 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5333 proto = IP_PROTOS.tcp
5335 self.vapi.nat64_add_del_static_bib(in_addr,
5340 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5345 self.assertEqual(bibe.i_addr, in_addr)
5346 self.assertEqual(bibe.o_addr, out_addr)
5347 self.assertEqual(bibe.i_port, in_port)
5348 self.assertEqual(bibe.o_port, out_port)
5349 self.assertEqual(static_bib_num, 1)
5351 self.vapi.nat64_add_del_static_bib(in_addr,
5357 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5362 self.assertEqual(static_bib_num, 0)
5364 def test_set_timeouts(self):
5365 """ Set NAT64 timeouts """
5366 # verify default values
5367 timeouts = self.vapi.nat64_get_timeouts()
5368 self.assertEqual(timeouts.udp, 300)
5369 self.assertEqual(timeouts.icmp, 60)
5370 self.assertEqual(timeouts.tcp_trans, 240)
5371 self.assertEqual(timeouts.tcp_est, 7440)
5372 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5374 # set and verify custom values
5375 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5376 tcp_est=7450, tcp_incoming_syn=10)
5377 timeouts = self.vapi.nat64_get_timeouts()
5378 self.assertEqual(timeouts.udp, 200)
5379 self.assertEqual(timeouts.icmp, 30)
5380 self.assertEqual(timeouts.tcp_trans, 250)
5381 self.assertEqual(timeouts.tcp_est, 7450)
5382 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5384 def test_dynamic(self):
5385 """ NAT64 dynamic translation test """
5386 self.tcp_port_in = 6303
5387 self.udp_port_in = 6304
5388 self.icmp_id_in = 6305
5390 ses_num_start = self.nat64_get_ses_num()
5392 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5394 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5395 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5398 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5399 self.pg0.add_stream(pkts)
5400 self.pg_enable_capture(self.pg_interfaces)
5402 capture = self.pg1.get_capture(len(pkts))
5403 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5404 dst_ip=self.pg1.remote_ip4)
5407 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5408 self.pg1.add_stream(pkts)
5409 self.pg_enable_capture(self.pg_interfaces)
5411 capture = self.pg0.get_capture(len(pkts))
5412 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5413 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5416 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5417 self.pg0.add_stream(pkts)
5418 self.pg_enable_capture(self.pg_interfaces)
5420 capture = self.pg1.get_capture(len(pkts))
5421 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5422 dst_ip=self.pg1.remote_ip4)
5425 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5426 self.pg1.add_stream(pkts)
5427 self.pg_enable_capture(self.pg_interfaces)
5429 capture = self.pg0.get_capture(len(pkts))
5430 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5432 ses_num_end = self.nat64_get_ses_num()
5434 self.assertEqual(ses_num_end - ses_num_start, 3)
5436 # tenant with specific VRF
5437 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5438 self.vrf1_nat_addr_n,
5439 vrf_id=self.vrf1_id)
5440 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5442 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5443 self.pg2.add_stream(pkts)
5444 self.pg_enable_capture(self.pg_interfaces)
5446 capture = self.pg1.get_capture(len(pkts))
5447 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5448 dst_ip=self.pg1.remote_ip4)
5450 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5451 self.pg1.add_stream(pkts)
5452 self.pg_enable_capture(self.pg_interfaces)
5454 capture = self.pg2.get_capture(len(pkts))
5455 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5457 def test_static(self):
5458 """ NAT64 static translation test """
5459 self.tcp_port_in = 60303
5460 self.udp_port_in = 60304
5461 self.icmp_id_in = 60305
5462 self.tcp_port_out = 60303
5463 self.udp_port_out = 60304
5464 self.icmp_id_out = 60305
5466 ses_num_start = self.nat64_get_ses_num()
5468 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5470 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5471 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5473 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5478 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5483 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5490 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5491 self.pg0.add_stream(pkts)
5492 self.pg_enable_capture(self.pg_interfaces)
5494 capture = self.pg1.get_capture(len(pkts))
5495 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5496 dst_ip=self.pg1.remote_ip4, same_port=True)
5499 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5500 self.pg1.add_stream(pkts)
5501 self.pg_enable_capture(self.pg_interfaces)
5503 capture = self.pg0.get_capture(len(pkts))
5504 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5505 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5507 ses_num_end = self.nat64_get_ses_num()
5509 self.assertEqual(ses_num_end - ses_num_start, 3)
5511 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5512 def test_session_timeout(self):
5513 """ NAT64 session timeout """
5514 self.icmp_id_in = 1234
5515 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5517 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5518 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5519 self.vapi.nat64_set_timeouts(icmp=5)
5521 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5522 self.pg0.add_stream(pkts)
5523 self.pg_enable_capture(self.pg_interfaces)
5525 capture = self.pg1.get_capture(len(pkts))
5527 ses_num_before_timeout = self.nat64_get_ses_num()
5531 # ICMP session after timeout
5532 ses_num_after_timeout = self.nat64_get_ses_num()
5533 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5535 def test_icmp_error(self):
5536 """ NAT64 ICMP Error message translation """
5537 self.tcp_port_in = 6303
5538 self.udp_port_in = 6304
5539 self.icmp_id_in = 6305
5541 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5543 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5544 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5546 # send some packets to create sessions
5547 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5548 self.pg0.add_stream(pkts)
5549 self.pg_enable_capture(self.pg_interfaces)
5551 capture_ip4 = self.pg1.get_capture(len(pkts))
5552 self.verify_capture_out(capture_ip4,
5553 nat_ip=self.nat_addr,
5554 dst_ip=self.pg1.remote_ip4)
5556 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5557 self.pg1.add_stream(pkts)
5558 self.pg_enable_capture(self.pg_interfaces)
5560 capture_ip6 = self.pg0.get_capture(len(pkts))
5561 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5562 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5563 self.pg0.remote_ip6)
5566 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5567 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5568 ICMPv6DestUnreach(code=1) /
5569 packet[IPv6] for packet in capture_ip6]
5570 self.pg0.add_stream(pkts)
5571 self.pg_enable_capture(self.pg_interfaces)
5573 capture = self.pg1.get_capture(len(pkts))
5574 for packet in capture:
5576 self.assertEqual(packet[IP].src, self.nat_addr)
5577 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5578 self.assertEqual(packet[ICMP].type, 3)
5579 self.assertEqual(packet[ICMP].code, 13)
5580 inner = packet[IPerror]
5581 self.assertEqual(inner.src, self.pg1.remote_ip4)
5582 self.assertEqual(inner.dst, self.nat_addr)
5583 self.assert_packet_checksums_valid(packet)
5584 if inner.haslayer(TCPerror):
5585 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5586 elif inner.haslayer(UDPerror):
5587 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5589 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5591 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5595 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5596 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5597 ICMP(type=3, code=13) /
5598 packet[IP] for packet in capture_ip4]
5599 self.pg1.add_stream(pkts)
5600 self.pg_enable_capture(self.pg_interfaces)
5602 capture = self.pg0.get_capture(len(pkts))
5603 for packet in capture:
5605 self.assertEqual(packet[IPv6].src, ip.src)
5606 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5607 icmp = packet[ICMPv6DestUnreach]
5608 self.assertEqual(icmp.code, 1)
5609 inner = icmp[IPerror6]
5610 self.assertEqual(inner.src, self.pg0.remote_ip6)
5611 self.assertEqual(inner.dst, ip.src)
5612 self.assert_icmpv6_checksum_valid(packet)
5613 if inner.haslayer(TCPerror):
5614 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5615 elif inner.haslayer(UDPerror):
5616 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5618 self.assertEqual(inner[ICMPv6EchoRequest].id,
5621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5624 def test_hairpinning(self):
5625 """ NAT64 hairpinning """
5627 client = self.pg0.remote_hosts[0]
5628 server = self.pg0.remote_hosts[1]
5629 server_tcp_in_port = 22
5630 server_tcp_out_port = 4022
5631 server_udp_in_port = 23
5632 server_udp_out_port = 4023
5633 client_tcp_in_port = 1234
5634 client_udp_in_port = 1235
5635 client_tcp_out_port = 0
5636 client_udp_out_port = 0
5637 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5638 nat_addr_ip6 = ip.src
5640 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5642 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5643 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5645 self.vapi.nat64_add_del_static_bib(server.ip6n,
5648 server_tcp_out_port,
5650 self.vapi.nat64_add_del_static_bib(server.ip6n,
5653 server_udp_out_port,
5658 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5659 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5660 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5662 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5663 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5664 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5666 self.pg0.add_stream(pkts)
5667 self.pg_enable_capture(self.pg_interfaces)
5669 capture = self.pg0.get_capture(len(pkts))
5670 for packet in capture:
5672 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5673 self.assertEqual(packet[IPv6].dst, server.ip6)
5674 self.assert_packet_checksums_valid(packet)
5675 if packet.haslayer(TCP):
5676 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5677 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5678 client_tcp_out_port = packet[TCP].sport
5680 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5681 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5682 client_udp_out_port = packet[UDP].sport
5684 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5689 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5690 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5691 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5693 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5694 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5695 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5697 self.pg0.add_stream(pkts)
5698 self.pg_enable_capture(self.pg_interfaces)
5700 capture = self.pg0.get_capture(len(pkts))
5701 for packet in capture:
5703 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5704 self.assertEqual(packet[IPv6].dst, client.ip6)
5705 self.assert_packet_checksums_valid(packet)
5706 if packet.haslayer(TCP):
5707 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5708 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5710 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5711 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5713 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5718 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5719 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5720 ICMPv6DestUnreach(code=1) /
5721 packet[IPv6] for packet in capture]
5722 self.pg0.add_stream(pkts)
5723 self.pg_enable_capture(self.pg_interfaces)
5725 capture = self.pg0.get_capture(len(pkts))
5726 for packet in capture:
5728 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5729 self.assertEqual(packet[IPv6].dst, server.ip6)
5730 icmp = packet[ICMPv6DestUnreach]
5731 self.assertEqual(icmp.code, 1)
5732 inner = icmp[IPerror6]
5733 self.assertEqual(inner.src, server.ip6)
5734 self.assertEqual(inner.dst, nat_addr_ip6)
5735 self.assert_packet_checksums_valid(packet)
5736 if inner.haslayer(TCPerror):
5737 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5738 self.assertEqual(inner[TCPerror].dport,
5739 client_tcp_out_port)
5741 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5742 self.assertEqual(inner[UDPerror].dport,
5743 client_udp_out_port)
5745 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5748 def test_prefix(self):
5749 """ NAT64 Network-Specific Prefix """
5751 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5753 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5754 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5755 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5756 self.vrf1_nat_addr_n,
5757 vrf_id=self.vrf1_id)
5758 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5761 global_pref64 = "2001:db8::"
5762 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5763 global_pref64_len = 32
5764 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5766 prefix = self.vapi.nat64_prefix_dump()
5767 self.assertEqual(len(prefix), 1)
5768 self.assertEqual(prefix[0].prefix, global_pref64_n)
5769 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5770 self.assertEqual(prefix[0].vrf_id, 0)
5772 # Add tenant specific prefix
5773 vrf1_pref64 = "2001:db8:122:300::"
5774 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5775 vrf1_pref64_len = 56
5776 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5778 vrf_id=self.vrf1_id)
5779 prefix = self.vapi.nat64_prefix_dump()
5780 self.assertEqual(len(prefix), 2)
5783 pkts = self.create_stream_in_ip6(self.pg0,
5786 plen=global_pref64_len)
5787 self.pg0.add_stream(pkts)
5788 self.pg_enable_capture(self.pg_interfaces)
5790 capture = self.pg1.get_capture(len(pkts))
5791 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5792 dst_ip=self.pg1.remote_ip4)
5794 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5795 self.pg1.add_stream(pkts)
5796 self.pg_enable_capture(self.pg_interfaces)
5798 capture = self.pg0.get_capture(len(pkts))
5799 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5802 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5804 # Tenant specific prefix
5805 pkts = self.create_stream_in_ip6(self.pg2,
5808 plen=vrf1_pref64_len)
5809 self.pg2.add_stream(pkts)
5810 self.pg_enable_capture(self.pg_interfaces)
5812 capture = self.pg1.get_capture(len(pkts))
5813 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5814 dst_ip=self.pg1.remote_ip4)
5816 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5817 self.pg1.add_stream(pkts)
5818 self.pg_enable_capture(self.pg_interfaces)
5820 capture = self.pg2.get_capture(len(pkts))
5821 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5824 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5826 def test_unknown_proto(self):
5827 """ NAT64 translate packet with unknown protocol """
5829 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5831 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5832 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5833 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5836 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5837 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5838 TCP(sport=self.tcp_port_in, dport=20))
5839 self.pg0.add_stream(p)
5840 self.pg_enable_capture(self.pg_interfaces)
5842 p = self.pg1.get_capture(1)
5844 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5845 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5847 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5848 TCP(sport=1234, dport=1234))
5849 self.pg0.add_stream(p)
5850 self.pg_enable_capture(self.pg_interfaces)
5852 p = self.pg1.get_capture(1)
5855 self.assertEqual(packet[IP].src, self.nat_addr)
5856 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5857 self.assertTrue(packet.haslayer(GRE))
5858 self.assert_packet_checksums_valid(packet)
5860 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5864 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5865 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5867 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5868 TCP(sport=1234, dport=1234))
5869 self.pg1.add_stream(p)
5870 self.pg_enable_capture(self.pg_interfaces)
5872 p = self.pg0.get_capture(1)
5875 self.assertEqual(packet[IPv6].src, remote_ip6)
5876 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5877 self.assertEqual(packet[IPv6].nh, 47)
5879 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5882 def test_hairpinning_unknown_proto(self):
5883 """ NAT64 translate packet with unknown protocol - hairpinning """
5885 client = self.pg0.remote_hosts[0]
5886 server = self.pg0.remote_hosts[1]
5887 server_tcp_in_port = 22
5888 server_tcp_out_port = 4022
5889 client_tcp_in_port = 1234
5890 client_tcp_out_port = 1235
5891 server_nat_ip = "10.0.0.100"
5892 client_nat_ip = "10.0.0.110"
5893 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5894 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5895 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5896 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5898 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5900 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5901 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5903 self.vapi.nat64_add_del_static_bib(server.ip6n,
5906 server_tcp_out_port,
5909 self.vapi.nat64_add_del_static_bib(server.ip6n,
5915 self.vapi.nat64_add_del_static_bib(client.ip6n,
5918 client_tcp_out_port,
5922 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5923 IPv6(src=client.ip6, dst=server_nat_ip6) /
5924 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5925 self.pg0.add_stream(p)
5926 self.pg_enable_capture(self.pg_interfaces)
5928 p = self.pg0.get_capture(1)
5930 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5931 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5933 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5934 TCP(sport=1234, dport=1234))
5935 self.pg0.add_stream(p)
5936 self.pg_enable_capture(self.pg_interfaces)
5938 p = self.pg0.get_capture(1)
5941 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5942 self.assertEqual(packet[IPv6].dst, server.ip6)
5943 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5945 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5949 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5950 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5952 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5953 TCP(sport=1234, dport=1234))
5954 self.pg0.add_stream(p)
5955 self.pg_enable_capture(self.pg_interfaces)
5957 p = self.pg0.get_capture(1)
5960 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5961 self.assertEqual(packet[IPv6].dst, client.ip6)
5962 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5964 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5967 def test_one_armed_nat64(self):
5968 """ One armed NAT64 """
5970 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5974 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5976 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5977 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5980 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5981 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5982 TCP(sport=12345, dport=80))
5983 self.pg3.add_stream(p)
5984 self.pg_enable_capture(self.pg_interfaces)
5986 capture = self.pg3.get_capture(1)
5991 self.assertEqual(ip.src, self.nat_addr)
5992 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5993 self.assertNotEqual(tcp.sport, 12345)
5994 external_port = tcp.sport
5995 self.assertEqual(tcp.dport, 80)
5996 self.assert_packet_checksums_valid(p)
5998 self.logger.error(ppp("Unexpected or invalid packet:", p))
6002 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6003 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6004 TCP(sport=80, dport=external_port))
6005 self.pg3.add_stream(p)
6006 self.pg_enable_capture(self.pg_interfaces)
6008 capture = self.pg3.get_capture(1)
6013 self.assertEqual(ip.src, remote_host_ip6)
6014 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6015 self.assertEqual(tcp.sport, 80)
6016 self.assertEqual(tcp.dport, 12345)
6017 self.assert_packet_checksums_valid(p)
6019 self.logger.error(ppp("Unexpected or invalid packet:", p))
6022 def test_frag_in_order(self):
6023 """ NAT64 translate fragments arriving in order """
6024 self.tcp_port_in = random.randint(1025, 65535)
6026 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6028 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6029 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6031 reass = self.vapi.nat_reass_dump()
6032 reass_n_start = len(reass)
6036 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6037 self.tcp_port_in, 20, data)
6038 self.pg0.add_stream(pkts)
6039 self.pg_enable_capture(self.pg_interfaces)
6041 frags = self.pg1.get_capture(len(pkts))
6042 p = self.reass_frags_and_verify(frags,
6044 self.pg1.remote_ip4)
6045 self.assertEqual(p[TCP].dport, 20)
6046 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6047 self.tcp_port_out = p[TCP].sport
6048 self.assertEqual(data, p[Raw].load)
6051 data = "A" * 4 + "b" * 16 + "C" * 3
6052 pkts = self.create_stream_frag(self.pg1,
6057 self.pg1.add_stream(pkts)
6058 self.pg_enable_capture(self.pg_interfaces)
6060 frags = self.pg0.get_capture(len(pkts))
6061 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6062 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6063 self.assertEqual(p[TCP].sport, 20)
6064 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6065 self.assertEqual(data, p[Raw].load)
6067 reass = self.vapi.nat_reass_dump()
6068 reass_n_end = len(reass)
6070 self.assertEqual(reass_n_end - reass_n_start, 2)
6072 def test_reass_hairpinning(self):
6073 """ NAT64 fragments hairpinning """
6075 server = self.pg0.remote_hosts[1]
6076 server_in_port = random.randint(1025, 65535)
6077 server_out_port = random.randint(1025, 65535)
6078 client_in_port = random.randint(1025, 65535)
6079 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6080 nat_addr_ip6 = ip.src
6082 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6084 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6085 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6087 # add static BIB entry for server
6088 self.vapi.nat64_add_del_static_bib(server.ip6n,
6094 # send packet from host to server
6095 pkts = self.create_stream_frag_ip6(self.pg0,
6100 self.pg0.add_stream(pkts)
6101 self.pg_enable_capture(self.pg_interfaces)
6103 frags = self.pg0.get_capture(len(pkts))
6104 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6105 self.assertNotEqual(p[TCP].sport, client_in_port)
6106 self.assertEqual(p[TCP].dport, server_in_port)
6107 self.assertEqual(data, p[Raw].load)
6109 def test_frag_out_of_order(self):
6110 """ NAT64 translate fragments arriving out of order """
6111 self.tcp_port_in = random.randint(1025, 65535)
6113 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6115 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6116 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6120 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6121 self.tcp_port_in, 20, data)
6123 self.pg0.add_stream(pkts)
6124 self.pg_enable_capture(self.pg_interfaces)
6126 frags = self.pg1.get_capture(len(pkts))
6127 p = self.reass_frags_and_verify(frags,
6129 self.pg1.remote_ip4)
6130 self.assertEqual(p[TCP].dport, 20)
6131 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6132 self.tcp_port_out = p[TCP].sport
6133 self.assertEqual(data, p[Raw].load)
6136 data = "A" * 4 + "B" * 16 + "C" * 3
6137 pkts = self.create_stream_frag(self.pg1,
6143 self.pg1.add_stream(pkts)
6144 self.pg_enable_capture(self.pg_interfaces)
6146 frags = self.pg0.get_capture(len(pkts))
6147 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6148 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6149 self.assertEqual(p[TCP].sport, 20)
6150 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6151 self.assertEqual(data, p[Raw].load)
6153 def test_interface_addr(self):
6154 """ Acquire NAT64 pool addresses from interface """
6155 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6157 # no address in NAT64 pool
6158 adresses = self.vapi.nat44_address_dump()
6159 self.assertEqual(0, len(adresses))
6161 # configure interface address and check NAT64 address pool
6162 self.pg4.config_ip4()
6163 addresses = self.vapi.nat64_pool_addr_dump()
6164 self.assertEqual(len(addresses), 1)
6165 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6167 # remove interface address and check NAT64 address pool
6168 self.pg4.unconfig_ip4()
6169 addresses = self.vapi.nat64_pool_addr_dump()
6170 self.assertEqual(0, len(adresses))
6172 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6173 def test_ipfix_max_bibs_sessions(self):
6174 """ IPFIX logging maximum session and BIB entries exceeded """
6177 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6181 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6183 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6184 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6188 for i in range(0, max_bibs):
6189 src = "fd01:aa::%x" % (i)
6190 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6191 IPv6(src=src, dst=remote_host_ip6) /
6192 TCP(sport=12345, dport=80))
6194 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6195 IPv6(src=src, dst=remote_host_ip6) /
6196 TCP(sport=12345, dport=22))
6198 self.pg0.add_stream(pkts)
6199 self.pg_enable_capture(self.pg_interfaces)
6201 self.pg1.get_capture(max_sessions)
6203 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6204 src_address=self.pg3.local_ip4n,
6206 template_interval=10)
6207 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6208 src_port=self.ipfix_src_port)
6210 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6211 IPv6(src=src, dst=remote_host_ip6) /
6212 TCP(sport=12345, dport=25))
6213 self.pg0.add_stream(p)
6214 self.pg_enable_capture(self.pg_interfaces)
6216 self.pg1.assert_nothing_captured()
6218 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6219 capture = self.pg3.get_capture(9)
6220 ipfix = IPFIXDecoder()
6221 # first load template
6223 self.assertTrue(p.haslayer(IPFIX))
6224 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6225 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6226 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6227 self.assertEqual(p[UDP].dport, 4739)
6228 self.assertEqual(p[IPFIX].observationDomainID,
6229 self.ipfix_domain_id)
6230 if p.haslayer(Template):
6231 ipfix.add_template(p.getlayer(Template))
6232 # verify events in data set
6234 if p.haslayer(Data):
6235 data = ipfix.decode_data_set(p.getlayer(Set))
6236 self.verify_ipfix_max_sessions(data, max_sessions)
6238 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6239 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6240 TCP(sport=12345, dport=80))
6241 self.pg0.add_stream(p)
6242 self.pg_enable_capture(self.pg_interfaces)
6244 self.pg1.assert_nothing_captured()
6246 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6247 capture = self.pg3.get_capture(1)
6248 # verify events in data set
6250 self.assertTrue(p.haslayer(IPFIX))
6251 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6252 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6253 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6254 self.assertEqual(p[UDP].dport, 4739)
6255 self.assertEqual(p[IPFIX].observationDomainID,
6256 self.ipfix_domain_id)
6257 if p.haslayer(Data):
6258 data = ipfix.decode_data_set(p.getlayer(Set))
6259 self.verify_ipfix_max_bibs(data, max_bibs)
6261 def test_ipfix_max_frags(self):
6262 """ IPFIX logging maximum fragments pending reassembly exceeded """
6263 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6265 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6266 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6267 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6268 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6269 src_address=self.pg3.local_ip4n,
6271 template_interval=10)
6272 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6273 src_port=self.ipfix_src_port)
6276 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6277 self.tcp_port_in, 20, data)
6278 self.pg0.add_stream(pkts[-1])
6279 self.pg_enable_capture(self.pg_interfaces)
6281 self.pg1.assert_nothing_captured()
6283 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6284 capture = self.pg3.get_capture(9)
6285 ipfix = IPFIXDecoder()
6286 # first load template
6288 self.assertTrue(p.haslayer(IPFIX))
6289 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6290 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6291 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6292 self.assertEqual(p[UDP].dport, 4739)
6293 self.assertEqual(p[IPFIX].observationDomainID,
6294 self.ipfix_domain_id)
6295 if p.haslayer(Template):
6296 ipfix.add_template(p.getlayer(Template))
6297 # verify events in data set
6299 if p.haslayer(Data):
6300 data = ipfix.decode_data_set(p.getlayer(Set))
6301 self.verify_ipfix_max_fragments_ip6(data, 0,
6302 self.pg0.remote_ip6n)
6304 def test_ipfix_bib_ses(self):
6305 """ IPFIX logging NAT64 BIB/session create and delete events """
6306 self.tcp_port_in = random.randint(1025, 65535)
6307 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6311 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6313 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6314 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6315 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6316 src_address=self.pg3.local_ip4n,
6318 template_interval=10)
6319 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6320 src_port=self.ipfix_src_port)
6323 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6324 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6325 TCP(sport=self.tcp_port_in, dport=25))
6326 self.pg0.add_stream(p)
6327 self.pg_enable_capture(self.pg_interfaces)
6329 p = self.pg1.get_capture(1)
6330 self.tcp_port_out = p[0][TCP].sport
6331 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6332 capture = self.pg3.get_capture(10)
6333 ipfix = IPFIXDecoder()
6334 # first load template
6336 self.assertTrue(p.haslayer(IPFIX))
6337 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6338 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6339 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6340 self.assertEqual(p[UDP].dport, 4739)
6341 self.assertEqual(p[IPFIX].observationDomainID,
6342 self.ipfix_domain_id)
6343 if p.haslayer(Template):
6344 ipfix.add_template(p.getlayer(Template))
6345 # verify events in data set
6347 if p.haslayer(Data):
6348 data = ipfix.decode_data_set(p.getlayer(Set))
6349 if ord(data[0][230]) == 10:
6350 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6351 elif ord(data[0][230]) == 6:
6352 self.verify_ipfix_nat64_ses(data,
6354 self.pg0.remote_ip6n,
6355 self.pg1.remote_ip4,
6358 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6361 self.pg_enable_capture(self.pg_interfaces)
6362 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6365 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6366 capture = self.pg3.get_capture(2)
6367 # verify events in data set
6369 self.assertTrue(p.haslayer(IPFIX))
6370 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6371 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6372 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6373 self.assertEqual(p[UDP].dport, 4739)
6374 self.assertEqual(p[IPFIX].observationDomainID,
6375 self.ipfix_domain_id)
6376 if p.haslayer(Data):
6377 data = ipfix.decode_data_set(p.getlayer(Set))
6378 if ord(data[0][230]) == 11:
6379 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6380 elif ord(data[0][230]) == 7:
6381 self.verify_ipfix_nat64_ses(data,
6383 self.pg0.remote_ip6n,
6384 self.pg1.remote_ip4,
6387 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6389 def nat64_get_ses_num(self):
6391 Return number of active NAT64 sessions.
6393 st = self.vapi.nat64_st_dump()
6396 def clear_nat64(self):
6398 Clear NAT64 configuration.
6400 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6401 domain_id=self.ipfix_domain_id)
6402 self.ipfix_src_port = 4739
6403 self.ipfix_domain_id = 1
6405 self.vapi.nat64_set_timeouts()
6407 interfaces = self.vapi.nat64_interface_dump()
6408 for intf in interfaces:
6409 if intf.is_inside > 1:
6410 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6413 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6417 bib = self.vapi.nat64_bib_dump(255)
6420 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6428 adresses = self.vapi.nat64_pool_addr_dump()
6429 for addr in adresses:
6430 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6435 prefixes = self.vapi.nat64_prefix_dump()
6436 for prefix in prefixes:
6437 self.vapi.nat64_add_del_prefix(prefix.prefix,
6439 vrf_id=prefix.vrf_id,
6443 super(TestNAT64, self).tearDown()
6444 if not self.vpp_dead:
6445 self.logger.info(self.vapi.cli("show nat64 pool"))
6446 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6447 self.logger.info(self.vapi.cli("show nat64 prefix"))
6448 self.logger.info(self.vapi.cli("show nat64 bib all"))
6449 self.logger.info(self.vapi.cli("show nat64 session table all"))
6450 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6454 class TestDSlite(MethodHolder):
6455 """ DS-Lite Test Cases """
6458 def setUpClass(cls):
6459 super(TestDSlite, cls).setUpClass()
6462 cls.nat_addr = '10.0.0.3'
6463 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6465 cls.create_pg_interfaces(range(2))
6467 cls.pg0.config_ip4()
6468 cls.pg0.resolve_arp()
6470 cls.pg1.config_ip6()
6471 cls.pg1.generate_remote_hosts(2)
6472 cls.pg1.configure_ipv6_neighbors()
6475 super(TestDSlite, cls).tearDownClass()
6478 def test_dslite(self):
6479 """ Test DS-Lite """
6480 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6482 aftr_ip4 = '192.0.0.1'
6483 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6484 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6485 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6486 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6489 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6490 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6491 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6492 UDP(sport=20000, dport=10000))
6493 self.pg1.add_stream(p)
6494 self.pg_enable_capture(self.pg_interfaces)
6496 capture = self.pg0.get_capture(1)
6497 capture = capture[0]
6498 self.assertFalse(capture.haslayer(IPv6))
6499 self.assertEqual(capture[IP].src, self.nat_addr)
6500 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6501 self.assertNotEqual(capture[UDP].sport, 20000)
6502 self.assertEqual(capture[UDP].dport, 10000)
6503 self.assert_packet_checksums_valid(capture)
6504 out_port = capture[UDP].sport
6506 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6507 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6508 UDP(sport=10000, dport=out_port))
6509 self.pg0.add_stream(p)
6510 self.pg_enable_capture(self.pg_interfaces)
6512 capture = self.pg1.get_capture(1)
6513 capture = capture[0]
6514 self.assertEqual(capture[IPv6].src, aftr_ip6)
6515 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6516 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6517 self.assertEqual(capture[IP].dst, '192.168.1.1')
6518 self.assertEqual(capture[UDP].sport, 10000)
6519 self.assertEqual(capture[UDP].dport, 20000)
6520 self.assert_packet_checksums_valid(capture)
6523 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6524 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6525 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6526 TCP(sport=20001, dport=10001))
6527 self.pg1.add_stream(p)
6528 self.pg_enable_capture(self.pg_interfaces)
6530 capture = self.pg0.get_capture(1)
6531 capture = capture[0]
6532 self.assertFalse(capture.haslayer(IPv6))
6533 self.assertEqual(capture[IP].src, self.nat_addr)
6534 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6535 self.assertNotEqual(capture[TCP].sport, 20001)
6536 self.assertEqual(capture[TCP].dport, 10001)
6537 self.assert_packet_checksums_valid(capture)
6538 out_port = capture[TCP].sport
6540 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6541 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6542 TCP(sport=10001, dport=out_port))
6543 self.pg0.add_stream(p)
6544 self.pg_enable_capture(self.pg_interfaces)
6546 capture = self.pg1.get_capture(1)
6547 capture = capture[0]
6548 self.assertEqual(capture[IPv6].src, aftr_ip6)
6549 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6550 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6551 self.assertEqual(capture[IP].dst, '192.168.1.1')
6552 self.assertEqual(capture[TCP].sport, 10001)
6553 self.assertEqual(capture[TCP].dport, 20001)
6554 self.assert_packet_checksums_valid(capture)
6557 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6558 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6559 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6560 ICMP(id=4000, type='echo-request'))
6561 self.pg1.add_stream(p)
6562 self.pg_enable_capture(self.pg_interfaces)
6564 capture = self.pg0.get_capture(1)
6565 capture = capture[0]
6566 self.assertFalse(capture.haslayer(IPv6))
6567 self.assertEqual(capture[IP].src, self.nat_addr)
6568 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6569 self.assertNotEqual(capture[ICMP].id, 4000)
6570 self.assert_packet_checksums_valid(capture)
6571 out_id = capture[ICMP].id
6573 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6574 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6575 ICMP(id=out_id, type='echo-reply'))
6576 self.pg0.add_stream(p)
6577 self.pg_enable_capture(self.pg_interfaces)
6579 capture = self.pg1.get_capture(1)
6580 capture = capture[0]
6581 self.assertEqual(capture[IPv6].src, aftr_ip6)
6582 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6583 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6584 self.assertEqual(capture[IP].dst, '192.168.1.1')
6585 self.assertEqual(capture[ICMP].id, 4000)
6586 self.assert_packet_checksums_valid(capture)
6588 # ping DS-Lite AFTR tunnel endpoint address
6589 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6590 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6591 ICMPv6EchoRequest())
6592 self.pg1.add_stream(p)
6593 self.pg_enable_capture(self.pg_interfaces)
6595 capture = self.pg1.get_capture(1)
6596 self.assertEqual(1, len(capture))
6597 capture = capture[0]
6598 self.assertEqual(capture[IPv6].src, aftr_ip6)
6599 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6600 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6603 super(TestDSlite, self).tearDown()
6604 if not self.vpp_dead:
6605 self.logger.info(self.vapi.cli("show dslite pool"))
6607 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6608 self.logger.info(self.vapi.cli("show dslite sessions"))
6611 class TestDSliteCE(MethodHolder):
6612 """ DS-Lite CE Test Cases """
6615 def setUpConstants(cls):
6616 super(TestDSliteCE, cls).setUpConstants()
6617 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6620 def setUpClass(cls):
6621 super(TestDSliteCE, cls).setUpClass()
6624 cls.create_pg_interfaces(range(2))
6626 cls.pg0.config_ip4()
6627 cls.pg0.resolve_arp()
6629 cls.pg1.config_ip6()
6630 cls.pg1.generate_remote_hosts(1)
6631 cls.pg1.configure_ipv6_neighbors()
6634 super(TestDSliteCE, cls).tearDownClass()
6637 def test_dslite_ce(self):
6638 """ Test DS-Lite CE """
6640 b4_ip4 = '192.0.0.2'
6641 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6642 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6643 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6644 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6646 aftr_ip4 = '192.0.0.1'
6647 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6648 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6649 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6650 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6652 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6653 dst_address_length=128,
6654 next_hop_address=self.pg1.remote_ip6n,
6655 next_hop_sw_if_index=self.pg1.sw_if_index,
6659 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6660 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6661 UDP(sport=10000, dport=20000))
6662 self.pg0.add_stream(p)
6663 self.pg_enable_capture(self.pg_interfaces)
6665 capture = self.pg1.get_capture(1)
6666 capture = capture[0]
6667 self.assertEqual(capture[IPv6].src, b4_ip6)
6668 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6669 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6670 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6671 self.assertEqual(capture[UDP].sport, 10000)
6672 self.assertEqual(capture[UDP].dport, 20000)
6673 self.assert_packet_checksums_valid(capture)
6676 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6677 IPv6(dst=b4_ip6, src=aftr_ip6) /
6678 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6679 UDP(sport=20000, dport=10000))
6680 self.pg1.add_stream(p)
6681 self.pg_enable_capture(self.pg_interfaces)
6683 capture = self.pg0.get_capture(1)
6684 capture = capture[0]
6685 self.assertFalse(capture.haslayer(IPv6))
6686 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6687 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6688 self.assertEqual(capture[UDP].sport, 20000)
6689 self.assertEqual(capture[UDP].dport, 10000)
6690 self.assert_packet_checksums_valid(capture)
6692 # ping DS-Lite B4 tunnel endpoint address
6693 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6694 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6695 ICMPv6EchoRequest())
6696 self.pg1.add_stream(p)
6697 self.pg_enable_capture(self.pg_interfaces)
6699 capture = self.pg1.get_capture(1)
6700 self.assertEqual(1, len(capture))
6701 capture = capture[0]
6702 self.assertEqual(capture[IPv6].src, b4_ip6)
6703 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6704 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6707 super(TestDSliteCE, self).tearDown()
6708 if not self.vpp_dead:
6710 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6712 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6715 class TestNAT66(MethodHolder):
6716 """ NAT66 Test Cases """
6719 def setUpClass(cls):
6720 super(TestNAT66, cls).setUpClass()
6723 cls.nat_addr = 'fd01:ff::2'
6724 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6726 cls.create_pg_interfaces(range(2))
6727 cls.interfaces = list(cls.pg_interfaces)
6729 for i in cls.interfaces:
6732 i.configure_ipv6_neighbors()
6735 super(TestNAT66, cls).tearDownClass()
6738 def test_static(self):
6739 """ 1:1 NAT66 test """
6740 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6741 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6742 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6747 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6748 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6751 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6752 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6755 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6756 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6757 ICMPv6EchoRequest())
6759 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6760 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6761 GRE() / IP() / TCP())
6763 self.pg0.add_stream(pkts)
6764 self.pg_enable_capture(self.pg_interfaces)
6766 capture = self.pg1.get_capture(len(pkts))
6767 for packet in capture:
6769 self.assertEqual(packet[IPv6].src, self.nat_addr)
6770 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6771 self.assert_packet_checksums_valid(packet)
6773 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6778 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6779 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6782 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6783 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6786 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6787 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6790 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6791 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6792 GRE() / IP() / TCP())
6794 self.pg1.add_stream(pkts)
6795 self.pg_enable_capture(self.pg_interfaces)
6797 capture = self.pg0.get_capture(len(pkts))
6798 for packet in capture:
6800 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6801 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6802 self.assert_packet_checksums_valid(packet)
6804 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6807 sm = self.vapi.nat66_static_mapping_dump()
6808 self.assertEqual(len(sm), 1)
6809 self.assertEqual(sm[0].total_pkts, 8)
6811 def test_check_no_translate(self):
6812 """ NAT66 translate only when egress interface is outside interface """
6813 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6814 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6815 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6819 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6820 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6822 self.pg0.add_stream([p])
6823 self.pg_enable_capture(self.pg_interfaces)
6825 capture = self.pg1.get_capture(1)
6828 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6829 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6831 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6834 def clear_nat66(self):
6836 Clear NAT66 configuration.
6838 interfaces = self.vapi.nat66_interface_dump()
6839 for intf in interfaces:
6840 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6844 static_mappings = self.vapi.nat66_static_mapping_dump()
6845 for sm in static_mappings:
6846 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6847 sm.external_ip_address,
6852 super(TestNAT66, self).tearDown()
6853 if not self.vpp_dead:
6854 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6855 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6859 if __name__ == '__main__':
6860 unittest.main(testRunner=VppTestRunner)