9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
13 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
14 from scapy.layers.l2 import Ether, ARP, GRE
15 from scapy.data import IP_PROTOS
16 from scapy.packet import bind_layers, Raw
17 from scapy.all import fragment6
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from time import sleep
21 from util import ip4_range
22 from util import mactobinary
25 class MethodHolder(VppTestCase):
26 """ NAT create capture and verify method holder """
28 def clear_nat44(self):
30 Clear NAT44 configuration.
32 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
33 # I found no elegant way to do this
34 self.vapi.ip_add_del_route(
35 dst_address=self.pg7.remote_ip4n,
36 dst_address_length=32,
37 next_hop_address=self.pg7.remote_ip4n,
38 next_hop_sw_if_index=self.pg7.sw_if_index,
40 self.vapi.ip_add_del_route(
41 dst_address=self.pg8.remote_ip4n,
42 dst_address_length=32,
43 next_hop_address=self.pg8.remote_ip4n,
44 next_hop_sw_if_index=self.pg8.sw_if_index,
47 for intf in [self.pg7, self.pg8]:
48 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
50 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
55 if self.pg7.has_ip4_config:
56 self.pg7.unconfig_ip4()
58 self.vapi.nat44_forwarding_enable_disable(0)
60 interfaces = self.vapi.nat44_interface_addr_dump()
61 for intf in interfaces:
62 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
63 twice_nat=intf.twice_nat,
66 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
67 domain_id=self.ipfix_domain_id)
68 self.ipfix_src_port = 4739
69 self.ipfix_domain_id = 1
71 interfaces = self.vapi.nat44_interface_dump()
72 for intf in interfaces:
73 if intf.is_inside > 1:
74 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
77 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
81 interfaces = self.vapi.nat44_interface_output_feature_dump()
82 for intf in interfaces:
83 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
87 static_mappings = self.vapi.nat44_static_mapping_dump()
88 for sm in static_mappings:
89 self.vapi.nat44_add_del_static_mapping(
91 sm.external_ip_address,
92 local_port=sm.local_port,
93 external_port=sm.external_port,
94 addr_only=sm.addr_only,
97 twice_nat=sm.twice_nat,
98 self_twice_nat=sm.self_twice_nat,
99 out2in_only=sm.out2in_only,
101 external_sw_if_index=sm.external_sw_if_index,
104 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
105 for lb_sm in lb_static_mappings:
106 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
140 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
141 local_port=0, external_port=0, vrf_id=0,
142 is_add=1, external_sw_if_index=0xFFFFFFFF,
143 proto=0, twice_nat=0, self_twice_nat=0,
144 out2in_only=0, tag=""):
146 Add/delete NAT44 static mapping
148 :param local_ip: Local IP address
149 :param external_ip: External IP address
150 :param local_port: Local port number (Optional)
151 :param external_port: External port number (Optional)
152 :param vrf_id: VRF ID (Default 0)
153 :param is_add: 1 if add, 0 if delete (Default add)
154 :param external_sw_if_index: External interface instead of IP address
155 :param proto: IP protocol (Mandatory if port specified)
156 :param twice_nat: 1 if translate external host address and port
157 :param self_twice_nat: 1 if translate external host address and port
158 whenever external host address equals
159 local address of internal host
160 :param out2in_only: if 1 rule is matching only out2in direction
161 :param tag: Opaque string tag
164 if local_port and external_port:
166 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
167 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
168 self.vapi.nat44_add_del_static_mapping(
171 external_sw_if_index,
183 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
185 Add/delete NAT44 address
187 :param ip: IP address
188 :param is_add: 1 if add, 0 if delete (Default add)
189 :param twice_nat: twice NAT address for extenal hosts
191 nat_addr = socket.inet_pton(socket.AF_INET, ip)
192 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
196 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
198 Create packet stream for inside network
200 :param in_if: Inside interface
201 :param out_if: Outside interface
202 :param dst_ip: Destination address
203 :param ttl: TTL of generated packets
206 dst_ip = out_if.remote_ip4
210 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
211 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
212 TCP(sport=self.tcp_port_in, dport=20))
216 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
217 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
218 UDP(sport=self.udp_port_in, dport=20))
222 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
224 ICMP(id=self.icmp_id_in, type='echo-request'))
229 def compose_ip6(self, ip4, pref, plen):
231 Compose IPv4-embedded IPv6 addresses
233 :param ip4: IPv4 address
234 :param pref: IPv6 prefix
235 :param plen: IPv6 prefix length
236 :returns: IPv4-embedded IPv6 addresses
238 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
239 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
254 pref_n[10] = ip4_n[3]
258 pref_n[10] = ip4_n[2]
259 pref_n[11] = ip4_n[3]
262 pref_n[10] = ip4_n[1]
263 pref_n[11] = ip4_n[2]
264 pref_n[12] = ip4_n[3]
266 pref_n[12] = ip4_n[0]
267 pref_n[13] = ip4_n[1]
268 pref_n[14] = ip4_n[2]
269 pref_n[15] = ip4_n[3]
270 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
272 def extract_ip4(self, ip6, plen):
274 Extract IPv4 address embedded in IPv6 addresses
276 :param ip6: IPv6 address
277 :param plen: IPv6 prefix length
278 :returns: extracted IPv4 address
280 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
312 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
314 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
316 Create IPv6 packet stream for inside network
318 :param in_if: Inside interface
319 :param out_if: Outside interface
320 :param ttl: Hop Limit of generated packets
321 :param pref: NAT64 prefix
322 :param plen: NAT64 prefix length
326 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
328 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
331 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
332 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
333 TCP(sport=self.tcp_port_in, dport=20))
337 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
338 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
339 UDP(sport=self.udp_port_in, dport=20))
343 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
344 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
345 ICMPv6EchoRequest(id=self.icmp_id_in))
350 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
351 use_inside_ports=False):
353 Create packet stream for outside network
355 :param out_if: Outside interface
356 :param dst_ip: Destination IP address (Default use global NAT address)
357 :param ttl: TTL of generated packets
358 :param use_inside_ports: Use inside NAT ports as destination ports
359 instead of outside ports
362 dst_ip = self.nat_addr
363 if not use_inside_ports:
364 tcp_port = self.tcp_port_out
365 udp_port = self.udp_port_out
366 icmp_id = self.icmp_id_out
368 tcp_port = self.tcp_port_in
369 udp_port = self.udp_port_in
370 icmp_id = self.icmp_id_in
373 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
374 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
375 TCP(dport=tcp_port, sport=20))
379 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
380 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
381 UDP(dport=udp_port, sport=20))
385 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
386 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
387 ICMP(id=icmp_id, type='echo-reply'))
392 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
394 Create packet stream for outside network
396 :param out_if: Outside interface
397 :param dst_ip: Destination IP address (Default use global NAT address)
398 :param hl: HL of generated packets
402 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
403 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
404 TCP(dport=self.tcp_port_out, sport=20))
408 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
409 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
410 UDP(dport=self.udp_port_out, sport=20))
414 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
415 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
416 ICMPv6EchoReply(id=self.icmp_id_out))
421 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
422 packet_num=3, dst_ip=None, is_ip6=False):
424 Verify captured packets on outside network
426 :param capture: Captured packets
427 :param nat_ip: Translated IP address (Default use global NAT address)
428 :param same_port: Sorce port number is not translated (Default False)
429 :param packet_num: Expected number of packets (Default 3)
430 :param dst_ip: Destination IP address (Default do not verify)
431 :param is_ip6: If L3 protocol is IPv6 (Default False)
435 ICMP46 = ICMPv6EchoRequest
440 nat_ip = self.nat_addr
441 self.assertEqual(packet_num, len(capture))
442 for packet in capture:
445 self.assert_packet_checksums_valid(packet)
446 self.assertEqual(packet[IP46].src, nat_ip)
447 if dst_ip is not None:
448 self.assertEqual(packet[IP46].dst, dst_ip)
449 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
454 packet[TCP].sport, self.tcp_port_in)
455 self.tcp_port_out = packet[TCP].sport
456 self.assert_packet_checksums_valid(packet)
457 elif packet.haslayer(UDP):
459 self.assertEqual(packet[UDP].sport, self.udp_port_in)
462 packet[UDP].sport, self.udp_port_in)
463 self.udp_port_out = packet[UDP].sport
466 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
468 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.icmp_id_out = packet[ICMP46].id
470 self.assert_packet_checksums_valid(packet)
472 self.logger.error(ppp("Unexpected or invalid packet "
473 "(outside network):", packet))
476 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
477 packet_num=3, dst_ip=None):
479 Verify captured packets on outside network
481 :param capture: Captured packets
482 :param nat_ip: Translated IP address
483 :param same_port: Sorce port number is not translated (Default False)
484 :param packet_num: Expected number of packets (Default 3)
485 :param dst_ip: Destination IP address (Default do not verify)
487 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
490 def verify_capture_in(self, capture, in_if, packet_num=3):
492 Verify captured packets on inside network
494 :param capture: Captured packets
495 :param in_if: Inside interface
496 :param packet_num: Expected number of packets (Default 3)
498 self.assertEqual(packet_num, len(capture))
499 for packet in capture:
501 self.assert_packet_checksums_valid(packet)
502 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
503 if packet.haslayer(TCP):
504 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
505 elif packet.haslayer(UDP):
506 self.assertEqual(packet[UDP].dport, self.udp_port_in)
508 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
510 self.logger.error(ppp("Unexpected or invalid packet "
511 "(inside network):", packet))
514 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
516 Verify captured IPv6 packets on inside network
518 :param capture: Captured packets
519 :param src_ip: Source IP
520 :param dst_ip: Destination IP address
521 :param packet_num: Expected number of packets (Default 3)
523 self.assertEqual(packet_num, len(capture))
524 for packet in capture:
526 self.assertEqual(packet[IPv6].src, src_ip)
527 self.assertEqual(packet[IPv6].dst, dst_ip)
528 self.assert_packet_checksums_valid(packet)
529 if packet.haslayer(TCP):
530 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
531 elif packet.haslayer(UDP):
532 self.assertEqual(packet[UDP].dport, self.udp_port_in)
534 self.assertEqual(packet[ICMPv6EchoReply].id,
537 self.logger.error(ppp("Unexpected or invalid packet "
538 "(inside network):", packet))
541 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
543 Verify captured packet that don't have to be translated
545 :param capture: Captured packets
546 :param ingress_if: Ingress interface
547 :param egress_if: Egress interface
549 for packet in capture:
551 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
552 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
553 if packet.haslayer(TCP):
554 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
555 elif packet.haslayer(UDP):
556 self.assertEqual(packet[UDP].sport, self.udp_port_in)
558 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
560 self.logger.error(ppp("Unexpected or invalid packet "
561 "(inside network):", packet))
564 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
565 packet_num=3, icmp_type=11):
567 Verify captured packets with ICMP errors on outside network
569 :param capture: Captured packets
570 :param src_ip: Translated IP address or IP address of VPP
571 (Default use global NAT address)
572 :param packet_num: Expected number of packets (Default 3)
573 :param icmp_type: Type of error ICMP packet
574 we are expecting (Default 11)
577 src_ip = self.nat_addr
578 self.assertEqual(packet_num, len(capture))
579 for packet in capture:
581 self.assertEqual(packet[IP].src, src_ip)
582 self.assertTrue(packet.haslayer(ICMP))
584 self.assertEqual(icmp.type, icmp_type)
585 self.assertTrue(icmp.haslayer(IPerror))
586 inner_ip = icmp[IPerror]
587 if inner_ip.haslayer(TCPerror):
588 self.assertEqual(inner_ip[TCPerror].dport,
590 elif inner_ip.haslayer(UDPerror):
591 self.assertEqual(inner_ip[UDPerror].dport,
594 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
596 self.logger.error(ppp("Unexpected or invalid packet "
597 "(outside network):", packet))
600 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
603 Verify captured packets with ICMP errors on inside network
605 :param capture: Captured packets
606 :param in_if: Inside interface
607 :param packet_num: Expected number of packets (Default 3)
608 :param icmp_type: Type of error ICMP packet
609 we are expecting (Default 11)
611 self.assertEqual(packet_num, len(capture))
612 for packet in capture:
614 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
615 self.assertTrue(packet.haslayer(ICMP))
617 self.assertEqual(icmp.type, icmp_type)
618 self.assertTrue(icmp.haslayer(IPerror))
619 inner_ip = icmp[IPerror]
620 if inner_ip.haslayer(TCPerror):
621 self.assertEqual(inner_ip[TCPerror].sport,
623 elif inner_ip.haslayer(UDPerror):
624 self.assertEqual(inner_ip[UDPerror].sport,
627 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
629 self.logger.error(ppp("Unexpected or invalid packet "
630 "(inside network):", packet))
633 def create_stream_frag(self, src_if, dst, sport, dport, data):
635 Create fragmented packet stream
637 :param src_if: Source interface
638 :param dst: Destination IPv4 address
639 :param sport: Source TCP port
640 :param dport: Destination TCP port
641 :param data: Payload data
644 id = random.randint(0, 65535)
645 p = (IP(src=src_if.remote_ip4, dst=dst) /
646 TCP(sport=sport, dport=dport) /
648 p = p.__class__(str(p))
649 chksum = p['TCP'].chksum
651 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
652 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
653 TCP(sport=sport, dport=dport, chksum=chksum) /
656 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
657 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
658 proto=IP_PROTOS.tcp) /
661 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
662 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
668 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
669 pref=None, plen=0, frag_size=128):
671 Create fragmented packet stream
673 :param src_if: Source interface
674 :param dst: Destination IPv4 address
675 :param sport: Source TCP port
676 :param dport: Destination TCP port
677 :param data: Payload data
678 :param pref: NAT64 prefix
679 :param plen: NAT64 prefix length
680 :param fragsize: size of fragments
684 dst_ip6 = ''.join(['64:ff9b::', dst])
686 dst_ip6 = self.compose_ip6(dst, pref, plen)
688 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
689 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
690 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
691 TCP(sport=sport, dport=dport) /
694 return fragment6(p, frag_size)
696 def reass_frags_and_verify(self, frags, src, dst):
698 Reassemble and verify fragmented packet
700 :param frags: Captured fragments
701 :param src: Source IPv4 address to verify
702 :param dst: Destination IPv4 address to verify
704 :returns: Reassembled IPv4 packet
706 buffer = StringIO.StringIO()
708 self.assertEqual(p[IP].src, src)
709 self.assertEqual(p[IP].dst, dst)
710 self.assert_ip_checksum_valid(p)
711 buffer.seek(p[IP].frag * 8)
712 buffer.write(p[IP].payload)
713 ip = frags[0].getlayer(IP)
714 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
715 proto=frags[0][IP].proto)
716 if ip.proto == IP_PROTOS.tcp:
717 p = (ip / TCP(buffer.getvalue()))
718 self.assert_tcp_checksum_valid(p)
719 elif ip.proto == IP_PROTOS.udp:
720 p = (ip / UDP(buffer.getvalue()))
723 def reass_frags_and_verify_ip6(self, frags, src, dst):
725 Reassemble and verify fragmented packet
727 :param frags: Captured fragments
728 :param src: Source IPv6 address to verify
729 :param dst: Destination IPv6 address to verify
731 :returns: Reassembled IPv6 packet
733 buffer = StringIO.StringIO()
735 self.assertEqual(p[IPv6].src, src)
736 self.assertEqual(p[IPv6].dst, dst)
737 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
738 buffer.write(p[IPv6ExtHdrFragment].payload)
739 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
740 nh=frags[0][IPv6ExtHdrFragment].nh)
741 if ip.nh == IP_PROTOS.tcp:
742 p = (ip / TCP(buffer.getvalue()))
743 elif ip.nh == IP_PROTOS.udp:
744 p = (ip / UDP(buffer.getvalue()))
745 self.assert_packet_checksums_valid(p)
748 def initiate_tcp_session(self, in_if, out_if):
750 Initiates TCP session
752 :param in_if: Inside interface
753 :param out_if: Outside interface
757 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
758 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
759 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
762 self.pg_enable_capture(self.pg_interfaces)
764 capture = out_if.get_capture(1)
766 self.tcp_port_out = p[TCP].sport
768 # SYN + ACK packet out->in
769 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
770 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
771 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
774 self.pg_enable_capture(self.pg_interfaces)
779 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
780 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
781 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
784 self.pg_enable_capture(self.pg_interfaces)
786 out_if.get_capture(1)
789 self.logger.error("TCP 3 way handshake failed")
792 def verify_ipfix_nat44_ses(self, data):
794 Verify IPFIX NAT44 session create/delete event
796 :param data: Decoded IPFIX data records
798 nat44_ses_create_num = 0
799 nat44_ses_delete_num = 0
800 self.assertEqual(6, len(data))
803 self.assertIn(ord(record[230]), [4, 5])
804 if ord(record[230]) == 4:
805 nat44_ses_create_num += 1
807 nat44_ses_delete_num += 1
809 self.assertEqual(self.pg0.remote_ip4n, record[8])
810 # postNATSourceIPv4Address
811 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
814 self.assertEqual(struct.pack("!I", 0), record[234])
815 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
816 if IP_PROTOS.icmp == ord(record[4]):
817 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
818 self.assertEqual(struct.pack("!H", self.icmp_id_out),
820 elif IP_PROTOS.tcp == ord(record[4]):
821 self.assertEqual(struct.pack("!H", self.tcp_port_in),
823 self.assertEqual(struct.pack("!H", self.tcp_port_out),
825 elif IP_PROTOS.udp == ord(record[4]):
826 self.assertEqual(struct.pack("!H", self.udp_port_in),
828 self.assertEqual(struct.pack("!H", self.udp_port_out),
831 self.fail("Invalid protocol")
832 self.assertEqual(3, nat44_ses_create_num)
833 self.assertEqual(3, nat44_ses_delete_num)
835 def verify_ipfix_addr_exhausted(self, data):
837 Verify IPFIX NAT addresses event
839 :param data: Decoded IPFIX data records
841 self.assertEqual(1, len(data))
844 self.assertEqual(ord(record[230]), 3)
846 self.assertEqual(struct.pack("!I", 0), record[283])
848 def verify_ipfix_max_sessions(self, data, limit):
850 Verify IPFIX maximum session entries exceeded event
852 :param data: Decoded IPFIX data records
853 :param limit: Number of maximum session entries that can be created.
855 self.assertEqual(1, len(data))
858 self.assertEqual(ord(record[230]), 13)
859 # natQuotaExceededEvent
860 self.assertEqual(struct.pack("I", 1), record[466])
862 self.assertEqual(struct.pack("I", limit), record[471])
864 def verify_ipfix_max_bibs(self, data, limit):
866 Verify IPFIX maximum BIB entries exceeded event
868 :param data: Decoded IPFIX data records
869 :param limit: Number of maximum BIB entries that can be created.
871 self.assertEqual(1, len(data))
874 self.assertEqual(ord(record[230]), 13)
875 # natQuotaExceededEvent
876 self.assertEqual(struct.pack("I", 2), record[466])
878 self.assertEqual(struct.pack("I", limit), record[472])
880 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
882 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
884 :param data: Decoded IPFIX data records
885 :param limit: Number of maximum fragments pending reassembly
886 :param src_addr: IPv6 source address
888 self.assertEqual(1, len(data))
891 self.assertEqual(ord(record[230]), 13)
892 # natQuotaExceededEvent
893 self.assertEqual(struct.pack("I", 5), record[466])
894 # maxFragmentsPendingReassembly
895 self.assertEqual(struct.pack("I", limit), record[475])
897 self.assertEqual(src_addr, record[27])
899 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
901 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
903 :param data: Decoded IPFIX data records
904 :param limit: Number of maximum fragments pending reassembly
905 :param src_addr: IPv4 source address
907 self.assertEqual(1, len(data))
910 self.assertEqual(ord(record[230]), 13)
911 # natQuotaExceededEvent
912 self.assertEqual(struct.pack("I", 5), record[466])
913 # maxFragmentsPendingReassembly
914 self.assertEqual(struct.pack("I", limit), record[475])
916 self.assertEqual(src_addr, record[8])
918 def verify_ipfix_bib(self, data, is_create, src_addr):
920 Verify IPFIX NAT64 BIB create and delete events
922 :param data: Decoded IPFIX data records
923 :param is_create: Create event if nonzero value otherwise delete event
924 :param src_addr: IPv6 source address
926 self.assertEqual(1, len(data))
930 self.assertEqual(ord(record[230]), 10)
932 self.assertEqual(ord(record[230]), 11)
934 self.assertEqual(src_addr, record[27])
935 # postNATSourceIPv4Address
936 self.assertEqual(self.nat_addr_n, record[225])
938 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
940 self.assertEqual(struct.pack("!I", 0), record[234])
941 # sourceTransportPort
942 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
943 # postNAPTSourceTransportPort
944 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
946 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
949 Verify IPFIX NAT64 session create and delete events
951 :param data: Decoded IPFIX data records
952 :param is_create: Create event if nonzero value otherwise delete event
953 :param src_addr: IPv6 source address
954 :param dst_addr: IPv4 destination address
955 :param dst_port: destination TCP port
957 self.assertEqual(1, len(data))
961 self.assertEqual(ord(record[230]), 6)
963 self.assertEqual(ord(record[230]), 7)
965 self.assertEqual(src_addr, record[27])
966 # destinationIPv6Address
967 self.assertEqual(socket.inet_pton(socket.AF_INET6,
968 self.compose_ip6(dst_addr,
972 # postNATSourceIPv4Address
973 self.assertEqual(self.nat_addr_n, record[225])
974 # postNATDestinationIPv4Address
975 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
978 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
980 self.assertEqual(struct.pack("!I", 0), record[234])
981 # sourceTransportPort
982 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
983 # postNAPTSourceTransportPort
984 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
985 # destinationTransportPort
986 self.assertEqual(struct.pack("!H", dst_port), record[11])
987 # postNAPTDestinationTransportPort
988 self.assertEqual(struct.pack("!H", dst_port), record[228])
991 class TestNAT44(MethodHolder):
992 """ NAT44 Test Cases """
996 super(TestNAT44, cls).setUpClass()
997 cls.vapi.cli("set log class nat level debug")
1000 cls.tcp_port_in = 6303
1001 cls.tcp_port_out = 6303
1002 cls.udp_port_in = 6304
1003 cls.udp_port_out = 6304
1004 cls.icmp_id_in = 6305
1005 cls.icmp_id_out = 6305
1006 cls.nat_addr = '10.0.0.3'
1007 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1008 cls.ipfix_src_port = 4739
1009 cls.ipfix_domain_id = 1
1010 cls.tcp_external_port = 80
1012 cls.create_pg_interfaces(range(10))
1013 cls.interfaces = list(cls.pg_interfaces[0:4])
1015 for i in cls.interfaces:
1020 cls.pg0.generate_remote_hosts(3)
1021 cls.pg0.configure_ipv4_neighbors()
1023 cls.pg1.generate_remote_hosts(1)
1024 cls.pg1.configure_ipv4_neighbors()
1026 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1027 cls.vapi.ip_table_add_del(10, is_add=1)
1028 cls.vapi.ip_table_add_del(20, is_add=1)
1030 cls.pg4._local_ip4 = "172.16.255.1"
1031 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1032 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1033 cls.pg4.set_table_ip4(10)
1034 cls.pg5._local_ip4 = "172.17.255.3"
1035 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1036 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1037 cls.pg5.set_table_ip4(10)
1038 cls.pg6._local_ip4 = "172.16.255.1"
1039 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1040 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1041 cls.pg6.set_table_ip4(20)
1042 for i in cls.overlapping_interfaces:
1050 cls.pg9.generate_remote_hosts(2)
1051 cls.pg9.config_ip4()
1052 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1053 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1057 cls.pg9.resolve_arp()
1058 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1059 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1060 cls.pg9.resolve_arp()
1063 super(TestNAT44, cls).tearDownClass()
1066 def test_dynamic(self):
1067 """ NAT44 dynamic translation test """
1069 self.nat44_add_address(self.nat_addr)
1070 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1071 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075 pkts = self.create_stream_in(self.pg0, self.pg1)
1076 self.pg0.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg1.get_capture(len(pkts))
1080 self.verify_capture_out(capture)
1083 pkts = self.create_stream_out(self.pg1)
1084 self.pg1.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg0.get_capture(len(pkts))
1088 self.verify_capture_in(capture, self.pg0)
1090 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1091 """ NAT44 handling of client packets with TTL=1 """
1093 self.nat44_add_address(self.nat_addr)
1094 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1095 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1098 # Client side - generate traffic
1099 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1100 self.pg0.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1104 # Client side - verify ICMP type 11 packets
1105 capture = self.pg0.get_capture(len(pkts))
1106 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1108 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1109 """ NAT44 handling of server packets with TTL=1 """
1111 self.nat44_add_address(self.nat_addr)
1112 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1113 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1116 # Client side - create sessions
1117 pkts = self.create_stream_in(self.pg0, self.pg1)
1118 self.pg0.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1122 # Server side - generate traffic
1123 capture = self.pg1.get_capture(len(pkts))
1124 self.verify_capture_out(capture)
1125 pkts = self.create_stream_out(self.pg1, ttl=1)
1126 self.pg1.add_stream(pkts)
1127 self.pg_enable_capture(self.pg_interfaces)
1130 # Server side - verify ICMP type 11 packets
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out_with_icmp_errors(capture,
1133 src_ip=self.pg1.local_ip4)
1135 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1136 """ NAT44 handling of error responses to client packets with TTL=2 """
1138 self.nat44_add_address(self.nat_addr)
1139 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143 # Client side - generate traffic
1144 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1145 self.pg0.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1149 # Server side - simulate ICMP type 11 response
1150 capture = self.pg1.get_capture(len(pkts))
1151 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1152 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1153 ICMP(type=11) / packet[IP] for packet in capture]
1154 self.pg1.add_stream(pkts)
1155 self.pg_enable_capture(self.pg_interfaces)
1158 # Client side - verify ICMP type 11 packets
1159 capture = self.pg0.get_capture(len(pkts))
1160 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1162 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1163 """ NAT44 handling of error responses to server packets with TTL=2 """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170 # Client side - create sessions
1171 pkts = self.create_stream_in(self.pg0, self.pg1)
1172 self.pg0.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Server side - generate traffic
1177 capture = self.pg1.get_capture(len(pkts))
1178 self.verify_capture_out(capture)
1179 pkts = self.create_stream_out(self.pg1, ttl=2)
1180 self.pg1.add_stream(pkts)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 # Client side - simulate ICMP type 11 response
1185 capture = self.pg0.get_capture(len(pkts))
1186 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1187 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1188 ICMP(type=11) / packet[IP] for packet in capture]
1189 self.pg0.add_stream(pkts)
1190 self.pg_enable_capture(self.pg_interfaces)
1193 # Server side - verify ICMP type 11 packets
1194 capture = self.pg1.get_capture(len(pkts))
1195 self.verify_capture_out_with_icmp_errors(capture)
1197 def test_ping_out_interface_from_outside(self):
1198 """ Ping NAT44 out interface from outside network """
1200 self.nat44_add_address(self.nat_addr)
1201 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1202 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1206 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1207 ICMP(id=self.icmp_id_out, type='echo-request'))
1209 self.pg1.add_stream(pkts)
1210 self.pg_enable_capture(self.pg_interfaces)
1212 capture = self.pg1.get_capture(len(pkts))
1213 self.assertEqual(1, len(capture))
1216 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1217 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1218 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1219 self.assertEqual(packet[ICMP].type, 0) # echo reply
1221 self.logger.error(ppp("Unexpected or invalid packet "
1222 "(outside network):", packet))
1225 def test_ping_internal_host_from_outside(self):
1226 """ Ping internal host from outside network """
1228 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1229 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1230 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1234 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1235 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1236 ICMP(id=self.icmp_id_out, type='echo-request'))
1237 self.pg1.add_stream(pkt)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg0.get_capture(1)
1241 self.verify_capture_in(capture, self.pg0, packet_num=1)
1242 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1245 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1246 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1247 ICMP(id=self.icmp_id_in, type='echo-reply'))
1248 self.pg0.add_stream(pkt)
1249 self.pg_enable_capture(self.pg_interfaces)
1251 capture = self.pg1.get_capture(1)
1252 self.verify_capture_out(capture, same_port=True, packet_num=1)
1253 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1255 def _test_forwarding(self):
1256 """ NAT44 forwarding test """
1258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 self.vapi.nat44_forwarding_enable_disable(1)
1263 real_ip = self.pg0.remote_ip4n
1264 alias_ip = self.nat_addr_n
1265 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1266 external_ip=alias_ip)
1269 # in2out - static mapping match
1271 pkts = self.create_stream_out(self.pg1)
1272 self.pg1.add_stream(pkts)
1273 self.pg_enable_capture(self.pg_interfaces)
1275 capture = self.pg0.get_capture(len(pkts))
1276 self.verify_capture_in(capture, self.pg0)
1278 pkts = self.create_stream_in(self.pg0, self.pg1)
1279 self.pg0.add_stream(pkts)
1280 self.pg_enable_capture(self.pg_interfaces)
1282 capture = self.pg1.get_capture(len(pkts))
1283 self.verify_capture_out(capture, same_port=True)
1285 # in2out - no static mapping match
1287 host0 = self.pg0.remote_hosts[0]
1288 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1290 pkts = self.create_stream_out(self.pg1,
1291 dst_ip=self.pg0.remote_ip4,
1292 use_inside_ports=True)
1293 self.pg1.add_stream(pkts)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(len(pkts))
1297 self.verify_capture_in(capture, self.pg0)
1299 pkts = self.create_stream_in(self.pg0, self.pg1)
1300 self.pg0.add_stream(pkts)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(len(pkts))
1304 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1307 self.pg0.remote_hosts[0] = host0
1309 user = self.pg0.remote_hosts[1]
1310 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1311 self.assertEqual(len(sessions), 3)
1312 self.assertTrue(sessions[0].ext_host_valid)
1313 self.vapi.nat44_del_session(
1314 sessions[0].inside_ip_address,
1315 sessions[0].inside_port,
1316 sessions[0].protocol,
1317 ext_host_address=sessions[0].ext_host_address,
1318 ext_host_port=sessions[0].ext_host_port)
1319 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1320 self.assertEqual(len(sessions), 2)
1323 self.vapi.nat44_forwarding_enable_disable(0)
1324 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1325 external_ip=alias_ip,
1328 def test_static_in(self):
1329 """ 1:1 NAT initialized from inside network """
1331 nat_ip = "10.0.0.10"
1332 self.tcp_port_out = 6303
1333 self.udp_port_out = 6304
1334 self.icmp_id_out = 6305
1336 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1337 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1338 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1340 sm = self.vapi.nat44_static_mapping_dump()
1341 self.assertEqual(len(sm), 1)
1342 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1343 self.assertEqual(sm[0].protocol, 0)
1344 self.assertEqual(sm[0].local_port, 0)
1345 self.assertEqual(sm[0].external_port, 0)
1348 pkts = self.create_stream_in(self.pg0, self.pg1)
1349 self.pg0.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg1.get_capture(len(pkts))
1353 self.verify_capture_out(capture, nat_ip, True)
1356 pkts = self.create_stream_out(self.pg1, nat_ip)
1357 self.pg1.add_stream(pkts)
1358 self.pg_enable_capture(self.pg_interfaces)
1360 capture = self.pg0.get_capture(len(pkts))
1361 self.verify_capture_in(capture, self.pg0)
1363 def test_static_out(self):
1364 """ 1:1 NAT initialized from outside network """
1366 nat_ip = "10.0.0.20"
1367 self.tcp_port_out = 6303
1368 self.udp_port_out = 6304
1369 self.icmp_id_out = 6305
1372 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1373 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1374 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1376 sm = self.vapi.nat44_static_mapping_dump()
1377 self.assertEqual(len(sm), 1)
1378 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1381 pkts = self.create_stream_out(self.pg1, nat_ip)
1382 self.pg1.add_stream(pkts)
1383 self.pg_enable_capture(self.pg_interfaces)
1385 capture = self.pg0.get_capture(len(pkts))
1386 self.verify_capture_in(capture, self.pg0)
1389 pkts = self.create_stream_in(self.pg0, self.pg1)
1390 self.pg0.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg1.get_capture(len(pkts))
1394 self.verify_capture_out(capture, nat_ip, True)
1396 def test_static_with_port_in(self):
1397 """ 1:1 NAPT initialized from inside network """
1399 self.tcp_port_out = 3606
1400 self.udp_port_out = 3607
1401 self.icmp_id_out = 3608
1403 self.nat44_add_address(self.nat_addr)
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1405 self.tcp_port_in, self.tcp_port_out,
1406 proto=IP_PROTOS.tcp)
1407 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1408 self.udp_port_in, self.udp_port_out,
1409 proto=IP_PROTOS.udp)
1410 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1411 self.icmp_id_in, self.icmp_id_out,
1412 proto=IP_PROTOS.icmp)
1413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1418 pkts = self.create_stream_in(self.pg0, self.pg1)
1419 self.pg0.add_stream(pkts)
1420 self.pg_enable_capture(self.pg_interfaces)
1422 capture = self.pg1.get_capture(len(pkts))
1423 self.verify_capture_out(capture)
1426 pkts = self.create_stream_out(self.pg1)
1427 self.pg1.add_stream(pkts)
1428 self.pg_enable_capture(self.pg_interfaces)
1430 capture = self.pg0.get_capture(len(pkts))
1431 self.verify_capture_in(capture, self.pg0)
1433 def test_static_with_port_out(self):
1434 """ 1:1 NAPT initialized from outside network """
1436 self.tcp_port_out = 30606
1437 self.udp_port_out = 30607
1438 self.icmp_id_out = 30608
1440 self.nat44_add_address(self.nat_addr)
1441 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1442 self.tcp_port_in, self.tcp_port_out,
1443 proto=IP_PROTOS.tcp)
1444 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445 self.udp_port_in, self.udp_port_out,
1446 proto=IP_PROTOS.udp)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.icmp_id_in, self.icmp_id_out,
1449 proto=IP_PROTOS.icmp)
1450 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1451 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1455 pkts = self.create_stream_out(self.pg1)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1463 pkts = self.create_stream_in(self.pg0, self.pg1)
1464 self.pg0.add_stream(pkts)
1465 self.pg_enable_capture(self.pg_interfaces)
1467 capture = self.pg1.get_capture(len(pkts))
1468 self.verify_capture_out(capture)
1470 def test_static_vrf_aware(self):
1471 """ 1:1 NAT VRF awareness """
1473 nat_ip1 = "10.0.0.30"
1474 nat_ip2 = "10.0.0.40"
1475 self.tcp_port_out = 6303
1476 self.udp_port_out = 6304
1477 self.icmp_id_out = 6305
1479 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1481 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1483 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1486 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1488 # inside interface VRF match NAT44 static mapping VRF
1489 pkts = self.create_stream_in(self.pg4, self.pg3)
1490 self.pg4.add_stream(pkts)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg3.get_capture(len(pkts))
1494 self.verify_capture_out(capture, nat_ip1, True)
1496 # inside interface VRF don't match NAT44 static mapping VRF (packets
1498 pkts = self.create_stream_in(self.pg0, self.pg3)
1499 self.pg0.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 self.pg3.assert_nothing_captured()
1504 def test_dynamic_to_static(self):
1505 """ Switch from dynamic translation to 1:1NAT """
1506 nat_ip = "10.0.0.10"
1507 self.tcp_port_out = 6303
1508 self.udp_port_out = 6304
1509 self.icmp_id_out = 6305
1511 self.nat44_add_address(self.nat_addr)
1512 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1513 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1525 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1526 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1527 self.assertEqual(len(sessions), 0)
1528 pkts = self.create_stream_in(self.pg0, self.pg1)
1529 self.pg0.add_stream(pkts)
1530 self.pg_enable_capture(self.pg_interfaces)
1532 capture = self.pg1.get_capture(len(pkts))
1533 self.verify_capture_out(capture, nat_ip, True)
1535 def test_identity_nat(self):
1536 """ Identity NAT """
1538 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1540 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1543 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1544 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1545 TCP(sport=12345, dport=56789))
1546 self.pg1.add_stream(p)
1547 self.pg_enable_capture(self.pg_interfaces)
1549 capture = self.pg0.get_capture(1)
1554 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1555 self.assertEqual(ip.src, self.pg1.remote_ip4)
1556 self.assertEqual(tcp.dport, 56789)
1557 self.assertEqual(tcp.sport, 12345)
1558 self.assert_packet_checksums_valid(p)
1560 self.logger.error(ppp("Unexpected or invalid packet:", p))
1563 def test_multiple_inside_interfaces(self):
1564 """ NAT44 multiple non-overlapping address space inside interfaces """
1566 self.nat44_add_address(self.nat_addr)
1567 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1568 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1572 # between two NAT44 inside interfaces (no translation)
1573 pkts = self.create_stream_in(self.pg0, self.pg1)
1574 self.pg0.add_stream(pkts)
1575 self.pg_enable_capture(self.pg_interfaces)
1577 capture = self.pg1.get_capture(len(pkts))
1578 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1580 # from NAT44 inside to interface without NAT44 feature (no translation)
1581 pkts = self.create_stream_in(self.pg0, self.pg2)
1582 self.pg0.add_stream(pkts)
1583 self.pg_enable_capture(self.pg_interfaces)
1585 capture = self.pg2.get_capture(len(pkts))
1586 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1588 # in2out 1st interface
1589 pkts = self.create_stream_in(self.pg0, self.pg3)
1590 self.pg0.add_stream(pkts)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg3.get_capture(len(pkts))
1594 self.verify_capture_out(capture)
1596 # out2in 1st interface
1597 pkts = self.create_stream_out(self.pg3)
1598 self.pg3.add_stream(pkts)
1599 self.pg_enable_capture(self.pg_interfaces)
1601 capture = self.pg0.get_capture(len(pkts))
1602 self.verify_capture_in(capture, self.pg0)
1604 # in2out 2nd interface
1605 pkts = self.create_stream_in(self.pg1, self.pg3)
1606 self.pg1.add_stream(pkts)
1607 self.pg_enable_capture(self.pg_interfaces)
1609 capture = self.pg3.get_capture(len(pkts))
1610 self.verify_capture_out(capture)
1612 # out2in 2nd interface
1613 pkts = self.create_stream_out(self.pg3)
1614 self.pg3.add_stream(pkts)
1615 self.pg_enable_capture(self.pg_interfaces)
1617 capture = self.pg1.get_capture(len(pkts))
1618 self.verify_capture_in(capture, self.pg1)
1620 def test_inside_overlapping_interfaces(self):
1621 """ NAT44 multiple inside interfaces with overlapping address space """
1623 static_nat_ip = "10.0.0.10"
1624 self.nat44_add_address(self.nat_addr)
1625 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1627 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1628 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1630 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1633 # between NAT44 inside interfaces with same VRF (no translation)
1634 pkts = self.create_stream_in(self.pg4, self.pg5)
1635 self.pg4.add_stream(pkts)
1636 self.pg_enable_capture(self.pg_interfaces)
1638 capture = self.pg5.get_capture(len(pkts))
1639 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1641 # between NAT44 inside interfaces with different VRF (hairpinning)
1642 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1643 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1644 TCP(sport=1234, dport=5678))
1645 self.pg4.add_stream(p)
1646 self.pg_enable_capture(self.pg_interfaces)
1648 capture = self.pg6.get_capture(1)
1653 self.assertEqual(ip.src, self.nat_addr)
1654 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1655 self.assertNotEqual(tcp.sport, 1234)
1656 self.assertEqual(tcp.dport, 5678)
1658 self.logger.error(ppp("Unexpected or invalid packet:", p))
1661 # in2out 1st interface
1662 pkts = self.create_stream_in(self.pg4, self.pg3)
1663 self.pg4.add_stream(pkts)
1664 self.pg_enable_capture(self.pg_interfaces)
1666 capture = self.pg3.get_capture(len(pkts))
1667 self.verify_capture_out(capture)
1669 # out2in 1st interface
1670 pkts = self.create_stream_out(self.pg3)
1671 self.pg3.add_stream(pkts)
1672 self.pg_enable_capture(self.pg_interfaces)
1674 capture = self.pg4.get_capture(len(pkts))
1675 self.verify_capture_in(capture, self.pg4)
1677 # in2out 2nd interface
1678 pkts = self.create_stream_in(self.pg5, self.pg3)
1679 self.pg5.add_stream(pkts)
1680 self.pg_enable_capture(self.pg_interfaces)
1682 capture = self.pg3.get_capture(len(pkts))
1683 self.verify_capture_out(capture)
1685 # out2in 2nd interface
1686 pkts = self.create_stream_out(self.pg3)
1687 self.pg3.add_stream(pkts)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg5.get_capture(len(pkts))
1691 self.verify_capture_in(capture, self.pg5)
1694 addresses = self.vapi.nat44_address_dump()
1695 self.assertEqual(len(addresses), 1)
1696 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1697 self.assertEqual(len(sessions), 3)
1698 for session in sessions:
1699 self.assertFalse(session.is_static)
1700 self.assertEqual(session.inside_ip_address[0:4],
1701 self.pg5.remote_ip4n)
1702 self.assertEqual(session.outside_ip_address,
1703 addresses[0].ip_address)
1704 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1705 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1706 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1707 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1708 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1709 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1710 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1711 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1712 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1714 # in2out 3rd interface
1715 pkts = self.create_stream_in(self.pg6, self.pg3)
1716 self.pg6.add_stream(pkts)
1717 self.pg_enable_capture(self.pg_interfaces)
1719 capture = self.pg3.get_capture(len(pkts))
1720 self.verify_capture_out(capture, static_nat_ip, True)
1722 # out2in 3rd interface
1723 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1724 self.pg3.add_stream(pkts)
1725 self.pg_enable_capture(self.pg_interfaces)
1727 capture = self.pg6.get_capture(len(pkts))
1728 self.verify_capture_in(capture, self.pg6)
1730 # general user and session dump verifications
1731 users = self.vapi.nat44_user_dump()
1732 self.assertTrue(len(users) >= 3)
1733 addresses = self.vapi.nat44_address_dump()
1734 self.assertEqual(len(addresses), 1)
1736 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1738 for session in sessions:
1739 self.assertEqual(user.ip_address, session.inside_ip_address)
1740 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1741 self.assertTrue(session.protocol in
1742 [IP_PROTOS.tcp, IP_PROTOS.udp,
1744 self.assertFalse(session.ext_host_valid)
1747 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1748 self.assertTrue(len(sessions) >= 4)
1749 for session in sessions:
1750 self.assertFalse(session.is_static)
1751 self.assertEqual(session.inside_ip_address[0:4],
1752 self.pg4.remote_ip4n)
1753 self.assertEqual(session.outside_ip_address,
1754 addresses[0].ip_address)
1757 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1758 self.assertTrue(len(sessions) >= 3)
1759 for session in sessions:
1760 self.assertTrue(session.is_static)
1761 self.assertEqual(session.inside_ip_address[0:4],
1762 self.pg6.remote_ip4n)
1763 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1764 map(int, static_nat_ip.split('.')))
1765 self.assertTrue(session.inside_port in
1766 [self.tcp_port_in, self.udp_port_in,
1769 def test_hairpinning(self):
1770 """ NAT44 hairpinning - 1:1 NAPT """
1772 host = self.pg0.remote_hosts[0]
1773 server = self.pg0.remote_hosts[1]
1776 server_in_port = 5678
1777 server_out_port = 8765
1779 self.nat44_add_address(self.nat_addr)
1780 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1781 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1783 # add static mapping for server
1784 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1785 server_in_port, server_out_port,
1786 proto=IP_PROTOS.tcp)
1788 # send packet from host to server
1789 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1790 IP(src=host.ip4, dst=self.nat_addr) /
1791 TCP(sport=host_in_port, dport=server_out_port))
1792 self.pg0.add_stream(p)
1793 self.pg_enable_capture(self.pg_interfaces)
1795 capture = self.pg0.get_capture(1)
1800 self.assertEqual(ip.src, self.nat_addr)
1801 self.assertEqual(ip.dst, server.ip4)
1802 self.assertNotEqual(tcp.sport, host_in_port)
1803 self.assertEqual(tcp.dport, server_in_port)
1804 self.assert_packet_checksums_valid(p)
1805 host_out_port = tcp.sport
1807 self.logger.error(ppp("Unexpected or invalid packet:", p))
1810 # send reply from server to host
1811 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1812 IP(src=server.ip4, dst=self.nat_addr) /
1813 TCP(sport=server_in_port, dport=host_out_port))
1814 self.pg0.add_stream(p)
1815 self.pg_enable_capture(self.pg_interfaces)
1817 capture = self.pg0.get_capture(1)
1822 self.assertEqual(ip.src, self.nat_addr)
1823 self.assertEqual(ip.dst, host.ip4)
1824 self.assertEqual(tcp.sport, server_out_port)
1825 self.assertEqual(tcp.dport, host_in_port)
1826 self.assert_packet_checksums_valid(p)
1828 self.logger.error(ppp("Unexpected or invalid packet:", p))
1831 def test_hairpinning2(self):
1832 """ NAT44 hairpinning - 1:1 NAT"""
1834 server1_nat_ip = "10.0.0.10"
1835 server2_nat_ip = "10.0.0.11"
1836 host = self.pg0.remote_hosts[0]
1837 server1 = self.pg0.remote_hosts[1]
1838 server2 = self.pg0.remote_hosts[2]
1839 server_tcp_port = 22
1840 server_udp_port = 20
1842 self.nat44_add_address(self.nat_addr)
1843 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1844 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1847 # add static mapping for servers
1848 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1849 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1853 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1854 IP(src=host.ip4, dst=server1_nat_ip) /
1855 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1857 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1858 IP(src=host.ip4, dst=server1_nat_ip) /
1859 UDP(sport=self.udp_port_in, dport=server_udp_port))
1861 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1862 IP(src=host.ip4, dst=server1_nat_ip) /
1863 ICMP(id=self.icmp_id_in, type='echo-request'))
1865 self.pg0.add_stream(pkts)
1866 self.pg_enable_capture(self.pg_interfaces)
1868 capture = self.pg0.get_capture(len(pkts))
1869 for packet in capture:
1871 self.assertEqual(packet[IP].src, self.nat_addr)
1872 self.assertEqual(packet[IP].dst, server1.ip4)
1873 if packet.haslayer(TCP):
1874 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1875 self.assertEqual(packet[TCP].dport, server_tcp_port)
1876 self.tcp_port_out = packet[TCP].sport
1877 self.assert_packet_checksums_valid(packet)
1878 elif packet.haslayer(UDP):
1879 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1880 self.assertEqual(packet[UDP].dport, server_udp_port)
1881 self.udp_port_out = packet[UDP].sport
1883 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1884 self.icmp_id_out = packet[ICMP].id
1886 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1891 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1892 IP(src=server1.ip4, dst=self.nat_addr) /
1893 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1895 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1896 IP(src=server1.ip4, dst=self.nat_addr) /
1897 UDP(sport=server_udp_port, dport=self.udp_port_out))
1899 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1900 IP(src=server1.ip4, dst=self.nat_addr) /
1901 ICMP(id=self.icmp_id_out, type='echo-reply'))
1903 self.pg0.add_stream(pkts)
1904 self.pg_enable_capture(self.pg_interfaces)
1906 capture = self.pg0.get_capture(len(pkts))
1907 for packet in capture:
1909 self.assertEqual(packet[IP].src, server1_nat_ip)
1910 self.assertEqual(packet[IP].dst, host.ip4)
1911 if packet.haslayer(TCP):
1912 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1913 self.assertEqual(packet[TCP].sport, server_tcp_port)
1914 self.assert_packet_checksums_valid(packet)
1915 elif packet.haslayer(UDP):
1916 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1917 self.assertEqual(packet[UDP].sport, server_udp_port)
1919 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1921 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1924 # server2 to server1
1926 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1927 IP(src=server2.ip4, dst=server1_nat_ip) /
1928 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1930 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1931 IP(src=server2.ip4, dst=server1_nat_ip) /
1932 UDP(sport=self.udp_port_in, dport=server_udp_port))
1934 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1935 IP(src=server2.ip4, dst=server1_nat_ip) /
1936 ICMP(id=self.icmp_id_in, type='echo-request'))
1938 self.pg0.add_stream(pkts)
1939 self.pg_enable_capture(self.pg_interfaces)
1941 capture = self.pg0.get_capture(len(pkts))
1942 for packet in capture:
1944 self.assertEqual(packet[IP].src, server2_nat_ip)
1945 self.assertEqual(packet[IP].dst, server1.ip4)
1946 if packet.haslayer(TCP):
1947 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1948 self.assertEqual(packet[TCP].dport, server_tcp_port)
1949 self.tcp_port_out = packet[TCP].sport
1950 self.assert_packet_checksums_valid(packet)
1951 elif packet.haslayer(UDP):
1952 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1953 self.assertEqual(packet[UDP].dport, server_udp_port)
1954 self.udp_port_out = packet[UDP].sport
1956 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1957 self.icmp_id_out = packet[ICMP].id
1959 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1962 # server1 to server2
1964 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1965 IP(src=server1.ip4, dst=server2_nat_ip) /
1966 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1968 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1969 IP(src=server1.ip4, dst=server2_nat_ip) /
1970 UDP(sport=server_udp_port, dport=self.udp_port_out))
1972 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1973 IP(src=server1.ip4, dst=server2_nat_ip) /
1974 ICMP(id=self.icmp_id_out, type='echo-reply'))
1976 self.pg0.add_stream(pkts)
1977 self.pg_enable_capture(self.pg_interfaces)
1979 capture = self.pg0.get_capture(len(pkts))
1980 for packet in capture:
1982 self.assertEqual(packet[IP].src, server1_nat_ip)
1983 self.assertEqual(packet[IP].dst, server2.ip4)
1984 if packet.haslayer(TCP):
1985 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1986 self.assertEqual(packet[TCP].sport, server_tcp_port)
1987 self.assert_packet_checksums_valid(packet)
1988 elif packet.haslayer(UDP):
1989 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1990 self.assertEqual(packet[UDP].sport, server_udp_port)
1992 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1994 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1997 def test_max_translations_per_user(self):
1998 """ MAX translations per user - recycle the least recently used """
2000 self.nat44_add_address(self.nat_addr)
2001 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2002 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2005 # get maximum number of translations per user
2006 nat44_config = self.vapi.nat_show_config()
2008 # send more than maximum number of translations per user packets
2009 pkts_num = nat44_config.max_translations_per_user + 5
2011 for port in range(0, pkts_num):
2012 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2013 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2014 TCP(sport=1025 + port))
2016 self.pg0.add_stream(pkts)
2017 self.pg_enable_capture(self.pg_interfaces)
2020 # verify number of translated packet
2021 self.pg1.get_capture(pkts_num)
2023 users = self.vapi.nat44_user_dump()
2025 if user.ip_address == self.pg0.remote_ip4n:
2026 self.assertEqual(user.nsessions,
2027 nat44_config.max_translations_per_user)
2028 self.assertEqual(user.nstaticsessions, 0)
2031 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2033 proto=IP_PROTOS.tcp)
2034 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2035 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2036 TCP(sport=tcp_port))
2037 self.pg0.add_stream(p)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 self.pg1.get_capture(1)
2041 users = self.vapi.nat44_user_dump()
2043 if user.ip_address == self.pg0.remote_ip4n:
2044 self.assertEqual(user.nsessions,
2045 nat44_config.max_translations_per_user - 1)
2046 self.assertEqual(user.nstaticsessions, 1)
2048 def test_interface_addr(self):
2049 """ Acquire NAT44 addresses from interface """
2050 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2052 # no address in NAT pool
2053 adresses = self.vapi.nat44_address_dump()
2054 self.assertEqual(0, len(adresses))
2056 # configure interface address and check NAT address pool
2057 self.pg7.config_ip4()
2058 adresses = self.vapi.nat44_address_dump()
2059 self.assertEqual(1, len(adresses))
2060 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2062 # remove interface address and check NAT address pool
2063 self.pg7.unconfig_ip4()
2064 adresses = self.vapi.nat44_address_dump()
2065 self.assertEqual(0, len(adresses))
2067 def test_interface_addr_static_mapping(self):
2068 """ Static mapping with addresses from interface """
2071 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2072 self.nat44_add_static_mapping(
2074 external_sw_if_index=self.pg7.sw_if_index,
2077 # static mappings with external interface
2078 static_mappings = self.vapi.nat44_static_mapping_dump()
2079 self.assertEqual(1, len(static_mappings))
2080 self.assertEqual(self.pg7.sw_if_index,
2081 static_mappings[0].external_sw_if_index)
2082 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2084 # configure interface address and check static mappings
2085 self.pg7.config_ip4()
2086 static_mappings = self.vapi.nat44_static_mapping_dump()
2087 self.assertEqual(2, len(static_mappings))
2089 for sm in static_mappings:
2090 if sm.external_sw_if_index == 0xFFFFFFFF:
2091 self.assertEqual(sm.external_ip_address[0:4],
2092 self.pg7.local_ip4n)
2093 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2095 self.assertTrue(resolved)
2097 # remove interface address and check static mappings
2098 self.pg7.unconfig_ip4()
2099 static_mappings = self.vapi.nat44_static_mapping_dump()
2100 self.assertEqual(1, len(static_mappings))
2101 self.assertEqual(self.pg7.sw_if_index,
2102 static_mappings[0].external_sw_if_index)
2103 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2105 # configure interface address again and check static mappings
2106 self.pg7.config_ip4()
2107 static_mappings = self.vapi.nat44_static_mapping_dump()
2108 self.assertEqual(2, len(static_mappings))
2110 for sm in static_mappings:
2111 if sm.external_sw_if_index == 0xFFFFFFFF:
2112 self.assertEqual(sm.external_ip_address[0:4],
2113 self.pg7.local_ip4n)
2114 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2116 self.assertTrue(resolved)
2118 # remove static mapping
2119 self.nat44_add_static_mapping(
2121 external_sw_if_index=self.pg7.sw_if_index,
2124 static_mappings = self.vapi.nat44_static_mapping_dump()
2125 self.assertEqual(0, len(static_mappings))
2127 def test_interface_addr_identity_nat(self):
2128 """ Identity NAT with addresses from interface """
2131 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2132 self.vapi.nat44_add_del_identity_mapping(
2133 sw_if_index=self.pg7.sw_if_index,
2135 protocol=IP_PROTOS.tcp,
2138 # identity mappings with external interface
2139 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2140 self.assertEqual(1, len(identity_mappings))
2141 self.assertEqual(self.pg7.sw_if_index,
2142 identity_mappings[0].sw_if_index)
2144 # configure interface address and check identity mappings
2145 self.pg7.config_ip4()
2146 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2148 self.assertEqual(2, len(identity_mappings))
2149 for sm in identity_mappings:
2150 if sm.sw_if_index == 0xFFFFFFFF:
2151 self.assertEqual(identity_mappings[0].ip_address,
2152 self.pg7.local_ip4n)
2153 self.assertEqual(port, identity_mappings[0].port)
2154 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2156 self.assertTrue(resolved)
2158 # remove interface address and check identity mappings
2159 self.pg7.unconfig_ip4()
2160 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2161 self.assertEqual(1, len(identity_mappings))
2162 self.assertEqual(self.pg7.sw_if_index,
2163 identity_mappings[0].sw_if_index)
2165 def test_ipfix_nat44_sess(self):
2166 """ IPFIX logging NAT44 session created/delted """
2167 self.ipfix_domain_id = 10
2168 self.ipfix_src_port = 20202
2169 colector_port = 30303
2170 bind_layers(UDP, IPFIX, dport=30303)
2171 self.nat44_add_address(self.nat_addr)
2172 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2173 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2175 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2176 src_address=self.pg3.local_ip4n,
2178 template_interval=10,
2179 collector_port=colector_port)
2180 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2181 src_port=self.ipfix_src_port)
2183 pkts = self.create_stream_in(self.pg0, self.pg1)
2184 self.pg0.add_stream(pkts)
2185 self.pg_enable_capture(self.pg_interfaces)
2187 capture = self.pg1.get_capture(len(pkts))
2188 self.verify_capture_out(capture)
2189 self.nat44_add_address(self.nat_addr, is_add=0)
2190 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2191 capture = self.pg3.get_capture(9)
2192 ipfix = IPFIXDecoder()
2193 # first load template
2195 self.assertTrue(p.haslayer(IPFIX))
2196 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2197 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2198 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2199 self.assertEqual(p[UDP].dport, colector_port)
2200 self.assertEqual(p[IPFIX].observationDomainID,
2201 self.ipfix_domain_id)
2202 if p.haslayer(Template):
2203 ipfix.add_template(p.getlayer(Template))
2204 # verify events in data set
2206 if p.haslayer(Data):
2207 data = ipfix.decode_data_set(p.getlayer(Set))
2208 self.verify_ipfix_nat44_ses(data)
2210 def test_ipfix_addr_exhausted(self):
2211 """ IPFIX logging NAT addresses exhausted """
2212 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2213 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2215 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2216 src_address=self.pg3.local_ip4n,
2218 template_interval=10)
2219 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2220 src_port=self.ipfix_src_port)
2222 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2223 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2225 self.pg0.add_stream(p)
2226 self.pg_enable_capture(self.pg_interfaces)
2228 self.pg1.assert_nothing_captured()
2230 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2231 capture = self.pg3.get_capture(9)
2232 ipfix = IPFIXDecoder()
2233 # first load template
2235 self.assertTrue(p.haslayer(IPFIX))
2236 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2237 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2238 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2239 self.assertEqual(p[UDP].dport, 4739)
2240 self.assertEqual(p[IPFIX].observationDomainID,
2241 self.ipfix_domain_id)
2242 if p.haslayer(Template):
2243 ipfix.add_template(p.getlayer(Template))
2244 # verify events in data set
2246 if p.haslayer(Data):
2247 data = ipfix.decode_data_set(p.getlayer(Set))
2248 self.verify_ipfix_addr_exhausted(data)
2250 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2251 def test_ipfix_max_sessions(self):
2252 """ IPFIX logging maximum session entries exceeded """
2253 self.nat44_add_address(self.nat_addr)
2254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2258 nat44_config = self.vapi.nat_show_config()
2259 max_sessions = 10 * nat44_config.translation_buckets
2262 for i in range(0, max_sessions):
2263 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2264 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2265 IP(src=src, dst=self.pg1.remote_ip4) /
2268 self.pg0.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2272 self.pg1.get_capture(max_sessions)
2273 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2274 src_address=self.pg3.local_ip4n,
2276 template_interval=10)
2277 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2278 src_port=self.ipfix_src_port)
2280 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2281 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2283 self.pg0.add_stream(p)
2284 self.pg_enable_capture(self.pg_interfaces)
2286 self.pg1.assert_nothing_captured()
2288 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2289 capture = self.pg3.get_capture(9)
2290 ipfix = IPFIXDecoder()
2291 # first load template
2293 self.assertTrue(p.haslayer(IPFIX))
2294 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2295 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2296 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2297 self.assertEqual(p[UDP].dport, 4739)
2298 self.assertEqual(p[IPFIX].observationDomainID,
2299 self.ipfix_domain_id)
2300 if p.haslayer(Template):
2301 ipfix.add_template(p.getlayer(Template))
2302 # verify events in data set
2304 if p.haslayer(Data):
2305 data = ipfix.decode_data_set(p.getlayer(Set))
2306 self.verify_ipfix_max_sessions(data, max_sessions)
2308 def test_pool_addr_fib(self):
2309 """ NAT44 add pool addresses to FIB """
2310 static_addr = '10.0.0.10'
2311 self.nat44_add_address(self.nat_addr)
2312 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2313 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2315 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2318 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2319 ARP(op=ARP.who_has, pdst=self.nat_addr,
2320 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2321 self.pg1.add_stream(p)
2322 self.pg_enable_capture(self.pg_interfaces)
2324 capture = self.pg1.get_capture(1)
2325 self.assertTrue(capture[0].haslayer(ARP))
2326 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2329 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2330 ARP(op=ARP.who_has, pdst=static_addr,
2331 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2332 self.pg1.add_stream(p)
2333 self.pg_enable_capture(self.pg_interfaces)
2335 capture = self.pg1.get_capture(1)
2336 self.assertTrue(capture[0].haslayer(ARP))
2337 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2339 # send ARP to non-NAT44 interface
2340 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2341 ARP(op=ARP.who_has, pdst=self.nat_addr,
2342 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2343 self.pg2.add_stream(p)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 self.pg1.assert_nothing_captured()
2348 # remove addresses and verify
2349 self.nat44_add_address(self.nat_addr, is_add=0)
2350 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2353 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2354 ARP(op=ARP.who_has, pdst=self.nat_addr,
2355 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2356 self.pg1.add_stream(p)
2357 self.pg_enable_capture(self.pg_interfaces)
2359 self.pg1.assert_nothing_captured()
2361 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2362 ARP(op=ARP.who_has, pdst=static_addr,
2363 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2364 self.pg1.add_stream(p)
2365 self.pg_enable_capture(self.pg_interfaces)
2367 self.pg1.assert_nothing_captured()
2369 def test_vrf_mode(self):
2370 """ NAT44 tenant VRF aware address pool mode """
2374 nat_ip1 = "10.0.0.10"
2375 nat_ip2 = "10.0.0.11"
2377 self.pg0.unconfig_ip4()
2378 self.pg1.unconfig_ip4()
2379 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2380 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2381 self.pg0.set_table_ip4(vrf_id1)
2382 self.pg1.set_table_ip4(vrf_id2)
2383 self.pg0.config_ip4()
2384 self.pg1.config_ip4()
2386 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2387 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2388 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2389 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2390 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2394 pkts = self.create_stream_in(self.pg0, self.pg2)
2395 self.pg0.add_stream(pkts)
2396 self.pg_enable_capture(self.pg_interfaces)
2398 capture = self.pg2.get_capture(len(pkts))
2399 self.verify_capture_out(capture, nat_ip1)
2402 pkts = self.create_stream_in(self.pg1, self.pg2)
2403 self.pg1.add_stream(pkts)
2404 self.pg_enable_capture(self.pg_interfaces)
2406 capture = self.pg2.get_capture(len(pkts))
2407 self.verify_capture_out(capture, nat_ip2)
2409 self.pg0.unconfig_ip4()
2410 self.pg1.unconfig_ip4()
2411 self.pg0.set_table_ip4(0)
2412 self.pg1.set_table_ip4(0)
2413 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2414 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2416 def test_vrf_feature_independent(self):
2417 """ NAT44 tenant VRF independent address pool mode """
2419 nat_ip1 = "10.0.0.10"
2420 nat_ip2 = "10.0.0.11"
2422 self.nat44_add_address(nat_ip1)
2423 self.nat44_add_address(nat_ip2, vrf_id=99)
2424 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2425 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2426 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2430 pkts = self.create_stream_in(self.pg0, self.pg2)
2431 self.pg0.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2434 capture = self.pg2.get_capture(len(pkts))
2435 self.verify_capture_out(capture, nat_ip1)
2438 pkts = self.create_stream_in(self.pg1, self.pg2)
2439 self.pg1.add_stream(pkts)
2440 self.pg_enable_capture(self.pg_interfaces)
2442 capture = self.pg2.get_capture(len(pkts))
2443 self.verify_capture_out(capture, nat_ip1)
2445 def test_dynamic_ipless_interfaces(self):
2446 """ NAT44 interfaces without configured IP address """
2448 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2449 mactobinary(self.pg7.remote_mac),
2450 self.pg7.remote_ip4n,
2452 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2453 mactobinary(self.pg8.remote_mac),
2454 self.pg8.remote_ip4n,
2457 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2458 dst_address_length=32,
2459 next_hop_address=self.pg7.remote_ip4n,
2460 next_hop_sw_if_index=self.pg7.sw_if_index)
2461 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2462 dst_address_length=32,
2463 next_hop_address=self.pg8.remote_ip4n,
2464 next_hop_sw_if_index=self.pg8.sw_if_index)
2466 self.nat44_add_address(self.nat_addr)
2467 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2468 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2472 pkts = self.create_stream_in(self.pg7, self.pg8)
2473 self.pg7.add_stream(pkts)
2474 self.pg_enable_capture(self.pg_interfaces)
2476 capture = self.pg8.get_capture(len(pkts))
2477 self.verify_capture_out(capture)
2480 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2481 self.pg8.add_stream(pkts)
2482 self.pg_enable_capture(self.pg_interfaces)
2484 capture = self.pg7.get_capture(len(pkts))
2485 self.verify_capture_in(capture, self.pg7)
2487 def test_static_ipless_interfaces(self):
2488 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2490 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2491 mactobinary(self.pg7.remote_mac),
2492 self.pg7.remote_ip4n,
2494 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2495 mactobinary(self.pg8.remote_mac),
2496 self.pg8.remote_ip4n,
2499 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2500 dst_address_length=32,
2501 next_hop_address=self.pg7.remote_ip4n,
2502 next_hop_sw_if_index=self.pg7.sw_if_index)
2503 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2504 dst_address_length=32,
2505 next_hop_address=self.pg8.remote_ip4n,
2506 next_hop_sw_if_index=self.pg8.sw_if_index)
2508 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2509 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2510 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2514 pkts = self.create_stream_out(self.pg8)
2515 self.pg8.add_stream(pkts)
2516 self.pg_enable_capture(self.pg_interfaces)
2518 capture = self.pg7.get_capture(len(pkts))
2519 self.verify_capture_in(capture, self.pg7)
2522 pkts = self.create_stream_in(self.pg7, self.pg8)
2523 self.pg7.add_stream(pkts)
2524 self.pg_enable_capture(self.pg_interfaces)
2526 capture = self.pg8.get_capture(len(pkts))
2527 self.verify_capture_out(capture, self.nat_addr, True)
2529 def test_static_with_port_ipless_interfaces(self):
2530 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2532 self.tcp_port_out = 30606
2533 self.udp_port_out = 30607
2534 self.icmp_id_out = 30608
2536 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2537 mactobinary(self.pg7.remote_mac),
2538 self.pg7.remote_ip4n,
2540 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2541 mactobinary(self.pg8.remote_mac),
2542 self.pg8.remote_ip4n,
2545 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2546 dst_address_length=32,
2547 next_hop_address=self.pg7.remote_ip4n,
2548 next_hop_sw_if_index=self.pg7.sw_if_index)
2549 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2550 dst_address_length=32,
2551 next_hop_address=self.pg8.remote_ip4n,
2552 next_hop_sw_if_index=self.pg8.sw_if_index)
2554 self.nat44_add_address(self.nat_addr)
2555 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2556 self.tcp_port_in, self.tcp_port_out,
2557 proto=IP_PROTOS.tcp)
2558 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2559 self.udp_port_in, self.udp_port_out,
2560 proto=IP_PROTOS.udp)
2561 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2562 self.icmp_id_in, self.icmp_id_out,
2563 proto=IP_PROTOS.icmp)
2564 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2565 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2569 pkts = self.create_stream_out(self.pg8)
2570 self.pg8.add_stream(pkts)
2571 self.pg_enable_capture(self.pg_interfaces)
2573 capture = self.pg7.get_capture(len(pkts))
2574 self.verify_capture_in(capture, self.pg7)
2577 pkts = self.create_stream_in(self.pg7, self.pg8)
2578 self.pg7.add_stream(pkts)
2579 self.pg_enable_capture(self.pg_interfaces)
2581 capture = self.pg8.get_capture(len(pkts))
2582 self.verify_capture_out(capture)
2584 def test_static_unknown_proto(self):
2585 """ 1:1 NAT translate packet with unknown protocol """
2586 nat_ip = "10.0.0.10"
2587 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2588 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2589 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2593 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2594 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2596 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2597 TCP(sport=1234, dport=1234))
2598 self.pg0.add_stream(p)
2599 self.pg_enable_capture(self.pg_interfaces)
2601 p = self.pg1.get_capture(1)
2604 self.assertEqual(packet[IP].src, nat_ip)
2605 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2606 self.assertTrue(packet.haslayer(GRE))
2607 self.assert_packet_checksums_valid(packet)
2609 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2613 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2614 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2616 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2617 TCP(sport=1234, dport=1234))
2618 self.pg1.add_stream(p)
2619 self.pg_enable_capture(self.pg_interfaces)
2621 p = self.pg0.get_capture(1)
2624 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2625 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2626 self.assertTrue(packet.haslayer(GRE))
2627 self.assert_packet_checksums_valid(packet)
2629 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2632 def test_hairpinning_static_unknown_proto(self):
2633 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2635 host = self.pg0.remote_hosts[0]
2636 server = self.pg0.remote_hosts[1]
2638 host_nat_ip = "10.0.0.10"
2639 server_nat_ip = "10.0.0.11"
2641 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2642 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2643 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2644 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2648 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2649 IP(src=host.ip4, dst=server_nat_ip) /
2651 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2652 TCP(sport=1234, dport=1234))
2653 self.pg0.add_stream(p)
2654 self.pg_enable_capture(self.pg_interfaces)
2656 p = self.pg0.get_capture(1)
2659 self.assertEqual(packet[IP].src, host_nat_ip)
2660 self.assertEqual(packet[IP].dst, server.ip4)
2661 self.assertTrue(packet.haslayer(GRE))
2662 self.assert_packet_checksums_valid(packet)
2664 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2668 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2669 IP(src=server.ip4, dst=host_nat_ip) /
2671 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2672 TCP(sport=1234, dport=1234))
2673 self.pg0.add_stream(p)
2674 self.pg_enable_capture(self.pg_interfaces)
2676 p = self.pg0.get_capture(1)
2679 self.assertEqual(packet[IP].src, server_nat_ip)
2680 self.assertEqual(packet[IP].dst, host.ip4)
2681 self.assertTrue(packet.haslayer(GRE))
2682 self.assert_packet_checksums_valid(packet)
2684 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2687 def test_output_feature(self):
2688 """ NAT44 interface output feature (in2out postrouting) """
2689 self.nat44_add_address(self.nat_addr)
2690 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2691 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2692 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2696 pkts = self.create_stream_in(self.pg0, self.pg3)
2697 self.pg0.add_stream(pkts)
2698 self.pg_enable_capture(self.pg_interfaces)
2700 capture = self.pg3.get_capture(len(pkts))
2701 self.verify_capture_out(capture)
2704 pkts = self.create_stream_out(self.pg3)
2705 self.pg3.add_stream(pkts)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 capture = self.pg0.get_capture(len(pkts))
2709 self.verify_capture_in(capture, self.pg0)
2711 # from non-NAT interface to NAT inside interface
2712 pkts = self.create_stream_in(self.pg2, self.pg0)
2713 self.pg2.add_stream(pkts)
2714 self.pg_enable_capture(self.pg_interfaces)
2716 capture = self.pg0.get_capture(len(pkts))
2717 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2719 def test_output_feature_vrf_aware(self):
2720 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2721 nat_ip_vrf10 = "10.0.0.10"
2722 nat_ip_vrf20 = "10.0.0.20"
2724 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2725 dst_address_length=32,
2726 next_hop_address=self.pg3.remote_ip4n,
2727 next_hop_sw_if_index=self.pg3.sw_if_index,
2729 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2730 dst_address_length=32,
2731 next_hop_address=self.pg3.remote_ip4n,
2732 next_hop_sw_if_index=self.pg3.sw_if_index,
2735 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2736 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2737 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2738 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2739 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2743 pkts = self.create_stream_in(self.pg4, self.pg3)
2744 self.pg4.add_stream(pkts)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 capture = self.pg3.get_capture(len(pkts))
2748 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2751 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2752 self.pg3.add_stream(pkts)
2753 self.pg_enable_capture(self.pg_interfaces)
2755 capture = self.pg4.get_capture(len(pkts))
2756 self.verify_capture_in(capture, self.pg4)
2759 pkts = self.create_stream_in(self.pg6, self.pg3)
2760 self.pg6.add_stream(pkts)
2761 self.pg_enable_capture(self.pg_interfaces)
2763 capture = self.pg3.get_capture(len(pkts))
2764 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2767 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2768 self.pg3.add_stream(pkts)
2769 self.pg_enable_capture(self.pg_interfaces)
2771 capture = self.pg6.get_capture(len(pkts))
2772 self.verify_capture_in(capture, self.pg6)
2774 def test_output_feature_hairpinning(self):
2775 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2776 host = self.pg0.remote_hosts[0]
2777 server = self.pg0.remote_hosts[1]
2780 server_in_port = 5678
2781 server_out_port = 8765
2783 self.nat44_add_address(self.nat_addr)
2784 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2785 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2788 # add static mapping for server
2789 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2790 server_in_port, server_out_port,
2791 proto=IP_PROTOS.tcp)
2793 # send packet from host to server
2794 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2795 IP(src=host.ip4, dst=self.nat_addr) /
2796 TCP(sport=host_in_port, dport=server_out_port))
2797 self.pg0.add_stream(p)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg0.get_capture(1)
2805 self.assertEqual(ip.src, self.nat_addr)
2806 self.assertEqual(ip.dst, server.ip4)
2807 self.assertNotEqual(tcp.sport, host_in_port)
2808 self.assertEqual(tcp.dport, server_in_port)
2809 self.assert_packet_checksums_valid(p)
2810 host_out_port = tcp.sport
2812 self.logger.error(ppp("Unexpected or invalid packet:", p))
2815 # send reply from server to host
2816 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2817 IP(src=server.ip4, dst=self.nat_addr) /
2818 TCP(sport=server_in_port, dport=host_out_port))
2819 self.pg0.add_stream(p)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg0.get_capture(1)
2827 self.assertEqual(ip.src, self.nat_addr)
2828 self.assertEqual(ip.dst, host.ip4)
2829 self.assertEqual(tcp.sport, server_out_port)
2830 self.assertEqual(tcp.dport, host_in_port)
2831 self.assert_packet_checksums_valid(p)
2833 self.logger.error(ppp("Unexpected or invalid packet:", p))
2836 def test_one_armed_nat44(self):
2837 """ One armed NAT44 """
2838 remote_host = self.pg9.remote_hosts[0]
2839 local_host = self.pg9.remote_hosts[1]
2842 self.nat44_add_address(self.nat_addr)
2843 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2844 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2848 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2849 IP(src=local_host.ip4, dst=remote_host.ip4) /
2850 TCP(sport=12345, dport=80))
2851 self.pg9.add_stream(p)
2852 self.pg_enable_capture(self.pg_interfaces)
2854 capture = self.pg9.get_capture(1)
2859 self.assertEqual(ip.src, self.nat_addr)
2860 self.assertEqual(ip.dst, remote_host.ip4)
2861 self.assertNotEqual(tcp.sport, 12345)
2862 external_port = tcp.sport
2863 self.assertEqual(tcp.dport, 80)
2864 self.assert_packet_checksums_valid(p)
2866 self.logger.error(ppp("Unexpected or invalid packet:", p))
2870 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2871 IP(src=remote_host.ip4, dst=self.nat_addr) /
2872 TCP(sport=80, dport=external_port))
2873 self.pg9.add_stream(p)
2874 self.pg_enable_capture(self.pg_interfaces)
2876 capture = self.pg9.get_capture(1)
2881 self.assertEqual(ip.src, remote_host.ip4)
2882 self.assertEqual(ip.dst, local_host.ip4)
2883 self.assertEqual(tcp.sport, 80)
2884 self.assertEqual(tcp.dport, 12345)
2885 self.assert_packet_checksums_valid(p)
2887 self.logger.error(ppp("Unexpected or invalid packet:", p))
2890 def test_del_session(self):
2891 """ Delete NAT44 session """
2892 self.nat44_add_address(self.nat_addr)
2893 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2894 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2897 pkts = self.create_stream_in(self.pg0, self.pg1)
2898 self.pg0.add_stream(pkts)
2899 self.pg_enable_capture(self.pg_interfaces)
2901 self.pg1.get_capture(len(pkts))
2903 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2904 nsessions = len(sessions)
2906 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2907 sessions[0].inside_port,
2908 sessions[0].protocol)
2909 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2910 sessions[1].outside_port,
2911 sessions[1].protocol,
2914 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2915 self.assertEqual(nsessions - len(sessions), 2)
2917 def test_set_get_reass(self):
2918 """ NAT44 set/get virtual fragmentation reassembly """
2919 reas_cfg1 = self.vapi.nat_get_reass()
2921 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2922 max_reass=reas_cfg1.ip4_max_reass * 2,
2923 max_frag=reas_cfg1.ip4_max_frag * 2)
2925 reas_cfg2 = self.vapi.nat_get_reass()
2927 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2928 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2929 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2931 self.vapi.nat_set_reass(drop_frag=1)
2932 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2934 def test_frag_in_order(self):
2935 """ NAT44 translate fragments arriving in order """
2936 self.nat44_add_address(self.nat_addr)
2937 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2938 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2941 data = "A" * 4 + "B" * 16 + "C" * 3
2942 self.tcp_port_in = random.randint(1025, 65535)
2944 reass = self.vapi.nat_reass_dump()
2945 reass_n_start = len(reass)
2948 pkts = self.create_stream_frag(self.pg0,
2949 self.pg1.remote_ip4,
2953 self.pg0.add_stream(pkts)
2954 self.pg_enable_capture(self.pg_interfaces)
2956 frags = self.pg1.get_capture(len(pkts))
2957 p = self.reass_frags_and_verify(frags,
2959 self.pg1.remote_ip4)
2960 self.assertEqual(p[TCP].dport, 20)
2961 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2962 self.tcp_port_out = p[TCP].sport
2963 self.assertEqual(data, p[Raw].load)
2966 pkts = self.create_stream_frag(self.pg1,
2971 self.pg1.add_stream(pkts)
2972 self.pg_enable_capture(self.pg_interfaces)
2974 frags = self.pg0.get_capture(len(pkts))
2975 p = self.reass_frags_and_verify(frags,
2976 self.pg1.remote_ip4,
2977 self.pg0.remote_ip4)
2978 self.assertEqual(p[TCP].sport, 20)
2979 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2980 self.assertEqual(data, p[Raw].load)
2982 reass = self.vapi.nat_reass_dump()
2983 reass_n_end = len(reass)
2985 self.assertEqual(reass_n_end - reass_n_start, 2)
2987 def test_reass_hairpinning(self):
2988 """ NAT44 fragments hairpinning """
2989 server = self.pg0.remote_hosts[1]
2990 host_in_port = random.randint(1025, 65535)
2991 server_in_port = random.randint(1025, 65535)
2992 server_out_port = random.randint(1025, 65535)
2993 data = "A" * 4 + "B" * 16 + "C" * 3
2995 self.nat44_add_address(self.nat_addr)
2996 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2997 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2999 # add static mapping for server
3000 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3001 server_in_port, server_out_port,
3002 proto=IP_PROTOS.tcp)
3004 # send packet from host to server
3005 pkts = self.create_stream_frag(self.pg0,
3010 self.pg0.add_stream(pkts)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 frags = self.pg0.get_capture(len(pkts))
3014 p = self.reass_frags_and_verify(frags,
3017 self.assertNotEqual(p[TCP].sport, host_in_port)
3018 self.assertEqual(p[TCP].dport, server_in_port)
3019 self.assertEqual(data, p[Raw].load)
3021 def test_frag_out_of_order(self):
3022 """ NAT44 translate fragments arriving out of order """
3023 self.nat44_add_address(self.nat_addr)
3024 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3025 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3028 data = "A" * 4 + "B" * 16 + "C" * 3
3029 random.randint(1025, 65535)
3032 pkts = self.create_stream_frag(self.pg0,
3033 self.pg1.remote_ip4,
3038 self.pg0.add_stream(pkts)
3039 self.pg_enable_capture(self.pg_interfaces)
3041 frags = self.pg1.get_capture(len(pkts))
3042 p = self.reass_frags_and_verify(frags,
3044 self.pg1.remote_ip4)
3045 self.assertEqual(p[TCP].dport, 20)
3046 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3047 self.tcp_port_out = p[TCP].sport
3048 self.assertEqual(data, p[Raw].load)
3051 pkts = self.create_stream_frag(self.pg1,
3057 self.pg1.add_stream(pkts)
3058 self.pg_enable_capture(self.pg_interfaces)
3060 frags = self.pg0.get_capture(len(pkts))
3061 p = self.reass_frags_and_verify(frags,
3062 self.pg1.remote_ip4,
3063 self.pg0.remote_ip4)
3064 self.assertEqual(p[TCP].sport, 20)
3065 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3066 self.assertEqual(data, p[Raw].load)
3068 def test_port_restricted(self):
3069 """ Port restricted NAT44 (MAP-E CE) """
3070 self.nat44_add_address(self.nat_addr)
3071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3074 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3075 "psid-offset 6 psid-len 6")
3077 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3078 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3079 TCP(sport=4567, dport=22))
3080 self.pg0.add_stream(p)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg1.get_capture(1)
3088 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3089 self.assertEqual(ip.src, self.nat_addr)
3090 self.assertEqual(tcp.dport, 22)
3091 self.assertNotEqual(tcp.sport, 4567)
3092 self.assertEqual((tcp.sport >> 6) & 63, 10)
3093 self.assert_packet_checksums_valid(p)
3095 self.logger.error(ppp("Unexpected or invalid packet:", p))
3098 def test_ipfix_max_frags(self):
3099 """ IPFIX logging maximum fragments pending reassembly exceeded """
3100 self.nat44_add_address(self.nat_addr)
3101 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3102 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3104 self.vapi.nat_set_reass(max_frag=0)
3105 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3106 src_address=self.pg3.local_ip4n,
3108 template_interval=10)
3109 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3110 src_port=self.ipfix_src_port)
3112 data = "A" * 4 + "B" * 16 + "C" * 3
3113 self.tcp_port_in = random.randint(1025, 65535)
3114 pkts = self.create_stream_frag(self.pg0,
3115 self.pg1.remote_ip4,
3119 self.pg0.add_stream(pkts[-1])
3120 self.pg_enable_capture(self.pg_interfaces)
3122 self.pg1.assert_nothing_captured()
3124 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3125 capture = self.pg3.get_capture(9)
3126 ipfix = IPFIXDecoder()
3127 # first load template
3129 self.assertTrue(p.haslayer(IPFIX))
3130 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3131 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3132 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3133 self.assertEqual(p[UDP].dport, 4739)
3134 self.assertEqual(p[IPFIX].observationDomainID,
3135 self.ipfix_domain_id)
3136 if p.haslayer(Template):
3137 ipfix.add_template(p.getlayer(Template))
3138 # verify events in data set
3140 if p.haslayer(Data):
3141 data = ipfix.decode_data_set(p.getlayer(Set))
3142 self.verify_ipfix_max_fragments_ip4(data, 0,
3143 self.pg0.remote_ip4n)
3146 super(TestNAT44, self).tearDown()
3147 if not self.vpp_dead:
3148 self.logger.info(self.vapi.cli("show nat44 addresses"))
3149 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3150 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3151 self.logger.info(self.vapi.cli("show nat44 interface address"))
3152 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3153 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3154 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3155 self.vapi.cli("nat addr-port-assignment-alg default")
3157 self.vapi.cli("clear logging")
3160 class TestNAT44EndpointDependent(MethodHolder):
3161 """ Endpoint-Dependent mapping and filtering test cases """
3164 def setUpConstants(cls):
3165 super(TestNAT44EndpointDependent, cls).setUpConstants()
3166 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3169 def setUpClass(cls):
3170 super(TestNAT44EndpointDependent, cls).setUpClass()
3171 cls.vapi.cli("set log class nat level debug")
3173 cls.tcp_port_in = 6303
3174 cls.tcp_port_out = 6303
3175 cls.udp_port_in = 6304
3176 cls.udp_port_out = 6304
3177 cls.icmp_id_in = 6305
3178 cls.icmp_id_out = 6305
3179 cls.nat_addr = '10.0.0.3'
3180 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3181 cls.ipfix_src_port = 4739
3182 cls.ipfix_domain_id = 1
3183 cls.tcp_external_port = 80
3185 cls.create_pg_interfaces(range(5))
3186 cls.interfaces = list(cls.pg_interfaces[0:3])
3188 for i in cls.interfaces:
3193 cls.pg0.generate_remote_hosts(3)
3194 cls.pg0.configure_ipv4_neighbors()
3198 cls.pg4.generate_remote_hosts(2)
3199 cls.pg4.config_ip4()
3200 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3201 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3205 cls.pg4.resolve_arp()
3206 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3207 cls.pg4.resolve_arp()
3210 super(TestNAT44EndpointDependent, cls).tearDownClass()
3213 def test_dynamic(self):
3214 """ NAT44 dynamic translation test """
3216 self.nat44_add_address(self.nat_addr)
3217 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3218 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3222 pkts = self.create_stream_in(self.pg0, self.pg1)
3223 self.pg0.add_stream(pkts)
3224 self.pg_enable_capture(self.pg_interfaces)
3226 capture = self.pg1.get_capture(len(pkts))
3227 self.verify_capture_out(capture)
3230 pkts = self.create_stream_out(self.pg1)
3231 self.pg1.add_stream(pkts)
3232 self.pg_enable_capture(self.pg_interfaces)
3234 capture = self.pg0.get_capture(len(pkts))
3235 self.verify_capture_in(capture, self.pg0)
3237 def test_forwarding(self):
3238 """ NAT44 forwarding test """
3240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3243 self.vapi.nat44_forwarding_enable_disable(1)
3245 real_ip = self.pg0.remote_ip4n
3246 alias_ip = self.nat_addr_n
3247 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3248 external_ip=alias_ip)
3251 # in2out - static mapping match
3253 pkts = self.create_stream_out(self.pg1)
3254 self.pg1.add_stream(pkts)
3255 self.pg_enable_capture(self.pg_interfaces)
3257 capture = self.pg0.get_capture(len(pkts))
3258 self.verify_capture_in(capture, self.pg0)
3260 pkts = self.create_stream_in(self.pg0, self.pg1)
3261 self.pg0.add_stream(pkts)
3262 self.pg_enable_capture(self.pg_interfaces)
3264 capture = self.pg1.get_capture(len(pkts))
3265 self.verify_capture_out(capture, same_port=True)
3267 # in2out - no static mapping match
3269 host0 = self.pg0.remote_hosts[0]
3270 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3272 pkts = self.create_stream_out(self.pg1,
3273 dst_ip=self.pg0.remote_ip4,
3274 use_inside_ports=True)
3275 self.pg1.add_stream(pkts)
3276 self.pg_enable_capture(self.pg_interfaces)
3278 capture = self.pg0.get_capture(len(pkts))
3279 self.verify_capture_in(capture, self.pg0)
3281 pkts = self.create_stream_in(self.pg0, self.pg1)
3282 self.pg0.add_stream(pkts)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg1.get_capture(len(pkts))
3286 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3289 self.pg0.remote_hosts[0] = host0
3291 user = self.pg0.remote_hosts[1]
3292 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3293 self.assertEqual(len(sessions), 3)
3294 self.assertTrue(sessions[0].ext_host_valid)
3295 self.vapi.nat44_del_session(
3296 sessions[0].inside_ip_address,
3297 sessions[0].inside_port,
3298 sessions[0].protocol,
3299 ext_host_address=sessions[0].ext_host_address,
3300 ext_host_port=sessions[0].ext_host_port)
3301 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3302 self.assertEqual(len(sessions), 2)
3305 self.vapi.nat44_forwarding_enable_disable(0)
3306 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3307 external_ip=alias_ip,
3310 def test_static_lb(self):
3311 """ NAT44 local service load balancing """
3312 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3315 server1 = self.pg0.remote_hosts[0]
3316 server2 = self.pg0.remote_hosts[1]
3318 locals = [{'addr': server1.ip4n,
3321 {'addr': server2.ip4n,
3325 self.nat44_add_address(self.nat_addr)
3326 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3329 local_num=len(locals),
3331 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3332 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3335 # from client to service
3336 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3337 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3338 TCP(sport=12345, dport=external_port))
3339 self.pg1.add_stream(p)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg0.get_capture(1)
3348 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3349 if ip.dst == server1.ip4:
3353 self.assertEqual(tcp.dport, local_port)
3354 self.assert_packet_checksums_valid(p)
3356 self.logger.error(ppp("Unexpected or invalid packet:", p))
3359 # from service back to client
3360 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3361 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3362 TCP(sport=local_port, dport=12345))
3363 self.pg0.add_stream(p)
3364 self.pg_enable_capture(self.pg_interfaces)
3366 capture = self.pg1.get_capture(1)
3371 self.assertEqual(ip.src, self.nat_addr)
3372 self.assertEqual(tcp.sport, external_port)
3373 self.assert_packet_checksums_valid(p)
3375 self.logger.error(ppp("Unexpected or invalid packet:", p))
3378 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3379 self.assertEqual(len(sessions), 1)
3380 self.assertTrue(sessions[0].ext_host_valid)
3381 self.vapi.nat44_del_session(
3382 sessions[0].inside_ip_address,
3383 sessions[0].inside_port,
3384 sessions[0].protocol,
3385 ext_host_address=sessions[0].ext_host_address,
3386 ext_host_port=sessions[0].ext_host_port)
3387 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3388 self.assertEqual(len(sessions), 0)
3390 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3391 def test_static_lb_multi_clients(self):
3392 """ NAT44 local service load balancing - multiple clients"""
3394 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3397 server1 = self.pg0.remote_hosts[0]
3398 server2 = self.pg0.remote_hosts[1]
3400 locals = [{'addr': server1.ip4n,
3403 {'addr': server2.ip4n,
3407 self.nat44_add_address(self.nat_addr)
3408 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3411 local_num=len(locals),
3413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3419 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3421 for client in clients:
3422 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3423 IP(src=client, dst=self.nat_addr) /
3424 TCP(sport=12345, dport=external_port))
3426 self.pg1.add_stream(pkts)
3427 self.pg_enable_capture(self.pg_interfaces)
3429 capture = self.pg0.get_capture(len(pkts))
3431 if p[IP].dst == server1.ip4:
3435 self.assertTrue(server1_n > server2_n)
3437 def test_static_lb_2(self):
3438 """ NAT44 local service load balancing (asymmetrical rule) """
3439 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3442 server1 = self.pg0.remote_hosts[0]
3443 server2 = self.pg0.remote_hosts[1]
3445 locals = [{'addr': server1.ip4n,
3448 {'addr': server2.ip4n,
3452 self.vapi.nat44_forwarding_enable_disable(1)
3453 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3457 local_num=len(locals),
3459 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3460 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3463 # from client to service
3464 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3465 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3466 TCP(sport=12345, dport=external_port))
3467 self.pg1.add_stream(p)
3468 self.pg_enable_capture(self.pg_interfaces)
3470 capture = self.pg0.get_capture(1)
3476 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3477 if ip.dst == server1.ip4:
3481 self.assertEqual(tcp.dport, local_port)
3482 self.assert_packet_checksums_valid(p)
3484 self.logger.error(ppp("Unexpected or invalid packet:", p))
3487 # from service back to client
3488 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3489 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3490 TCP(sport=local_port, dport=12345))
3491 self.pg0.add_stream(p)
3492 self.pg_enable_capture(self.pg_interfaces)
3494 capture = self.pg1.get_capture(1)
3499 self.assertEqual(ip.src, self.nat_addr)
3500 self.assertEqual(tcp.sport, external_port)
3501 self.assert_packet_checksums_valid(p)
3503 self.logger.error(ppp("Unexpected or invalid packet:", p))
3506 # from client to server (no translation)
3507 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3508 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3509 TCP(sport=12346, dport=local_port))
3510 self.pg1.add_stream(p)
3511 self.pg_enable_capture(self.pg_interfaces)
3513 capture = self.pg0.get_capture(1)
3519 self.assertEqual(ip.dst, server1.ip4)
3520 self.assertEqual(tcp.dport, local_port)
3521 self.assert_packet_checksums_valid(p)
3523 self.logger.error(ppp("Unexpected or invalid packet:", p))
3526 # from service back to client (no translation)
3527 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3528 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3529 TCP(sport=local_port, dport=12346))
3530 self.pg0.add_stream(p)
3531 self.pg_enable_capture(self.pg_interfaces)
3533 capture = self.pg1.get_capture(1)
3538 self.assertEqual(ip.src, server1.ip4)
3539 self.assertEqual(tcp.sport, local_port)
3540 self.assert_packet_checksums_valid(p)
3542 self.logger.error(ppp("Unexpected or invalid packet:", p))
3545 def test_unknown_proto(self):
3546 """ NAT44 translate packet with unknown protocol """
3547 self.nat44_add_address(self.nat_addr)
3548 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3549 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3553 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3554 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3555 TCP(sport=self.tcp_port_in, dport=20))
3556 self.pg0.add_stream(p)
3557 self.pg_enable_capture(self.pg_interfaces)
3559 p = self.pg1.get_capture(1)
3561 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3562 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3564 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3565 TCP(sport=1234, dport=1234))
3566 self.pg0.add_stream(p)
3567 self.pg_enable_capture(self.pg_interfaces)
3569 p = self.pg1.get_capture(1)
3572 self.assertEqual(packet[IP].src, self.nat_addr)
3573 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3574 self.assertTrue(packet.haslayer(GRE))
3575 self.assert_packet_checksums_valid(packet)
3577 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3581 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3582 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3584 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3585 TCP(sport=1234, dport=1234))
3586 self.pg1.add_stream(p)
3587 self.pg_enable_capture(self.pg_interfaces)
3589 p = self.pg0.get_capture(1)
3592 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3593 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3594 self.assertTrue(packet.haslayer(GRE))
3595 self.assert_packet_checksums_valid(packet)
3597 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3600 def test_hairpinning_unknown_proto(self):
3601 """ NAT44 translate packet with unknown protocol - hairpinning """
3602 host = self.pg0.remote_hosts[0]
3603 server = self.pg0.remote_hosts[1]
3605 server_out_port = 8765
3606 server_nat_ip = "10.0.0.11"
3608 self.nat44_add_address(self.nat_addr)
3609 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3610 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3613 # add static mapping for server
3614 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3617 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3618 IP(src=host.ip4, dst=server_nat_ip) /
3619 TCP(sport=host_in_port, dport=server_out_port))
3620 self.pg0.add_stream(p)
3621 self.pg_enable_capture(self.pg_interfaces)
3623 self.pg0.get_capture(1)
3625 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3626 IP(src=host.ip4, dst=server_nat_ip) /
3628 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3629 TCP(sport=1234, dport=1234))
3630 self.pg0.add_stream(p)
3631 self.pg_enable_capture(self.pg_interfaces)
3633 p = self.pg0.get_capture(1)
3636 self.assertEqual(packet[IP].src, self.nat_addr)
3637 self.assertEqual(packet[IP].dst, server.ip4)
3638 self.assertTrue(packet.haslayer(GRE))
3639 self.assert_packet_checksums_valid(packet)
3641 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3645 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3646 IP(src=server.ip4, dst=self.nat_addr) /
3648 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3649 TCP(sport=1234, dport=1234))
3650 self.pg0.add_stream(p)
3651 self.pg_enable_capture(self.pg_interfaces)
3653 p = self.pg0.get_capture(1)
3656 self.assertEqual(packet[IP].src, server_nat_ip)
3657 self.assertEqual(packet[IP].dst, host.ip4)
3658 self.assertTrue(packet.haslayer(GRE))
3659 self.assert_packet_checksums_valid(packet)
3661 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3664 def test_output_feature_and_service(self):
3665 """ NAT44 interface output feature and services """
3666 external_addr = '1.2.3.4'
3670 self.vapi.nat44_forwarding_enable_disable(1)
3671 self.nat44_add_address(self.nat_addr)
3672 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3673 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3674 local_port, external_port,
3675 proto=IP_PROTOS.tcp, out2in_only=1)
3676 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3677 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3679 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3682 # from client to service
3683 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3684 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3685 TCP(sport=12345, dport=external_port))
3686 self.pg1.add_stream(p)
3687 self.pg_enable_capture(self.pg_interfaces)
3689 capture = self.pg0.get_capture(1)
3694 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3695 self.assertEqual(tcp.dport, local_port)
3696 self.assert_packet_checksums_valid(p)
3698 self.logger.error(ppp("Unexpected or invalid packet:", p))
3701 # from service back to client
3702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3704 TCP(sport=local_port, dport=12345))
3705 self.pg0.add_stream(p)
3706 self.pg_enable_capture(self.pg_interfaces)
3708 capture = self.pg1.get_capture(1)
3713 self.assertEqual(ip.src, external_addr)
3714 self.assertEqual(tcp.sport, external_port)
3715 self.assert_packet_checksums_valid(p)
3717 self.logger.error(ppp("Unexpected or invalid packet:", p))
3720 # from local network host to external network
3721 pkts = self.create_stream_in(self.pg0, self.pg1)
3722 self.pg0.add_stream(pkts)
3723 self.pg_enable_capture(self.pg_interfaces)
3725 capture = self.pg1.get_capture(len(pkts))
3726 self.verify_capture_out(capture)
3727 pkts = self.create_stream_in(self.pg0, self.pg1)
3728 self.pg0.add_stream(pkts)
3729 self.pg_enable_capture(self.pg_interfaces)
3731 capture = self.pg1.get_capture(len(pkts))
3732 self.verify_capture_out(capture)
3734 # from external network back to local network host
3735 pkts = self.create_stream_out(self.pg1)
3736 self.pg1.add_stream(pkts)
3737 self.pg_enable_capture(self.pg_interfaces)
3739 capture = self.pg0.get_capture(len(pkts))
3740 self.verify_capture_in(capture, self.pg0)
3742 def test_output_feature_and_service2(self):
3743 """ NAT44 interface output feature and service host direct access """
3744 self.vapi.nat44_forwarding_enable_disable(1)
3745 self.nat44_add_address(self.nat_addr)
3746 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3749 # session initiaded from service host - translate
3750 pkts = self.create_stream_in(self.pg0, self.pg1)
3751 self.pg0.add_stream(pkts)
3752 self.pg_enable_capture(self.pg_interfaces)
3754 capture = self.pg1.get_capture(len(pkts))
3755 self.verify_capture_out(capture)
3757 pkts = self.create_stream_out(self.pg1)
3758 self.pg1.add_stream(pkts)
3759 self.pg_enable_capture(self.pg_interfaces)
3761 capture = self.pg0.get_capture(len(pkts))
3762 self.verify_capture_in(capture, self.pg0)
3764 # session initiaded from remote host - do not translate
3765 self.tcp_port_in = 60303
3766 self.udp_port_in = 60304
3767 self.icmp_id_in = 60305
3768 pkts = self.create_stream_out(self.pg1,
3769 self.pg0.remote_ip4,
3770 use_inside_ports=True)
3771 self.pg1.add_stream(pkts)
3772 self.pg_enable_capture(self.pg_interfaces)
3774 capture = self.pg0.get_capture(len(pkts))
3775 self.verify_capture_in(capture, self.pg0)
3777 pkts = self.create_stream_in(self.pg0, self.pg1)
3778 self.pg0.add_stream(pkts)
3779 self.pg_enable_capture(self.pg_interfaces)
3781 capture = self.pg1.get_capture(len(pkts))
3782 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3785 def test_output_feature_and_service3(self):
3786 """ NAT44 interface output feature and DST NAT """
3787 external_addr = '1.2.3.4'
3791 self.vapi.nat44_forwarding_enable_disable(1)
3792 self.nat44_add_address(self.nat_addr)
3793 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3794 local_port, external_port,
3795 proto=IP_PROTOS.tcp, out2in_only=1)
3796 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3799 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3802 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3803 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3804 TCP(sport=12345, dport=external_port))
3805 self.pg0.add_stream(p)
3806 self.pg_enable_capture(self.pg_interfaces)
3808 capture = self.pg1.get_capture(1)
3813 self.assertEqual(ip.src, self.pg0.remote_ip4)
3814 self.assertEqual(tcp.sport, 12345)
3815 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3816 self.assertEqual(tcp.dport, local_port)
3817 self.assert_packet_checksums_valid(p)
3819 self.logger.error(ppp("Unexpected or invalid packet:", p))
3822 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3823 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3824 TCP(sport=local_port, dport=12345))
3825 self.pg1.add_stream(p)
3826 self.pg_enable_capture(self.pg_interfaces)
3828 capture = self.pg0.get_capture(1)
3833 self.assertEqual(ip.src, external_addr)
3834 self.assertEqual(tcp.sport, external_port)
3835 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3836 self.assertEqual(tcp.dport, 12345)
3837 self.assert_packet_checksums_valid(p)
3839 self.logger.error(ppp("Unexpected or invalid packet:", p))
3842 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3844 twice_nat_addr = '10.0.1.3'
3852 port_in1 = port_in+1
3853 port_in2 = port_in+2
3858 server1 = self.pg0.remote_hosts[0]
3859 server2 = self.pg0.remote_hosts[1]
3871 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3874 self.nat44_add_address(self.nat_addr)
3875 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3877 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3879 proto=IP_PROTOS.tcp,
3880 twice_nat=int(not self_twice_nat),
3881 self_twice_nat=int(self_twice_nat))
3883 locals = [{'addr': server1.ip4n,
3886 {'addr': server2.ip4n,
3889 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3890 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3894 not self_twice_nat),
3897 local_num=len(locals),
3899 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3900 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3907 assert client_id is not None
3909 client = self.pg0.remote_hosts[0]
3910 elif client_id == 2:
3911 client = self.pg0.remote_hosts[1]
3913 client = pg1.remote_hosts[0]
3914 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3915 IP(src=client.ip4, dst=self.nat_addr) /
3916 TCP(sport=eh_port_out, dport=port_out))
3918 self.pg_enable_capture(self.pg_interfaces)
3920 capture = pg0.get_capture(1)
3926 if ip.dst == server1.ip4:
3932 self.assertEqual(ip.dst, server.ip4)
3934 self.assertIn(tcp.dport, [port_in1, port_in2])
3936 self.assertEqual(tcp.dport, port_in)
3938 self.assertEqual(ip.src, twice_nat_addr)
3939 self.assertNotEqual(tcp.sport, eh_port_out)
3941 self.assertEqual(ip.src, client.ip4)
3942 self.assertEqual(tcp.sport, eh_port_out)
3944 eh_port_in = tcp.sport
3945 saved_port_in = tcp.dport
3946 self.assert_packet_checksums_valid(p)
3948 self.logger.error(ppp("Unexpected or invalid packet:", p))
3951 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3952 IP(src=server.ip4, dst=eh_addr_in) /
3953 TCP(sport=saved_port_in, dport=eh_port_in))
3955 self.pg_enable_capture(self.pg_interfaces)
3957 capture = pg1.get_capture(1)
3962 self.assertEqual(ip.dst, client.ip4)
3963 self.assertEqual(ip.src, self.nat_addr)
3964 self.assertEqual(tcp.dport, eh_port_out)
3965 self.assertEqual(tcp.sport, port_out)
3966 self.assert_packet_checksums_valid(p)
3968 self.logger.error(ppp("Unexpected or invalid packet:", p))
3972 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3973 self.assertEqual(len(sessions), 1)
3974 self.assertTrue(sessions[0].ext_host_valid)
3975 self.assertTrue(sessions[0].is_twicenat)
3976 self.vapi.nat44_del_session(
3977 sessions[0].inside_ip_address,
3978 sessions[0].inside_port,
3979 sessions[0].protocol,
3980 ext_host_address=sessions[0].ext_host_nat_address,
3981 ext_host_port=sessions[0].ext_host_nat_port)
3982 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3983 self.assertEqual(len(sessions), 0)
3985 def test_twice_nat(self):
3987 self.twice_nat_common()
3989 def test_self_twice_nat_positive(self):
3990 """ Self Twice NAT44 (positive test) """
3991 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3993 def test_self_twice_nat_negative(self):
3994 """ Self Twice NAT44 (negative test) """
3995 self.twice_nat_common(self_twice_nat=True)
3997 def test_twice_nat_lb(self):
3998 """ Twice NAT44 local service load balancing """
3999 self.twice_nat_common(lb=True)
4001 def test_self_twice_nat_lb_positive(self):
4002 """ Self Twice NAT44 local service load balancing (positive test) """
4003 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4006 def test_self_twice_nat_lb_negative(self):
4007 """ Self Twice NAT44 local service load balancing (negative test) """
4008 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4011 def test_twice_nat_interface_addr(self):
4012 """ Acquire twice NAT44 addresses from interface """
4013 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4015 # no address in NAT pool
4016 adresses = self.vapi.nat44_address_dump()
4017 self.assertEqual(0, len(adresses))
4019 # configure interface address and check NAT address pool
4020 self.pg3.config_ip4()
4021 adresses = self.vapi.nat44_address_dump()
4022 self.assertEqual(1, len(adresses))
4023 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4024 self.assertEqual(adresses[0].twice_nat, 1)
4026 # remove interface address and check NAT address pool
4027 self.pg3.unconfig_ip4()
4028 adresses = self.vapi.nat44_address_dump()
4029 self.assertEqual(0, len(adresses))
4031 def test_tcp_session_close_in(self):
4032 """ Close TCP session from inside network """
4033 self.tcp_port_out = 10505
4034 self.nat44_add_address(self.nat_addr)
4035 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4039 proto=IP_PROTOS.tcp,
4041 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4042 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4045 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4046 start_sessnum = len(sessions)
4048 self.initiate_tcp_session(self.pg0, self.pg1)
4050 # FIN packet in -> out
4051 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4052 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4053 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4054 flags="FA", seq=100, ack=300))
4055 self.pg0.add_stream(p)
4056 self.pg_enable_capture(self.pg_interfaces)
4058 self.pg1.get_capture(1)
4062 # ACK packet out -> in
4063 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4064 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4065 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4066 flags="A", seq=300, ack=101))
4069 # FIN packet out -> in
4070 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4071 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4072 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4073 flags="FA", seq=300, ack=101))
4076 self.pg1.add_stream(pkts)
4077 self.pg_enable_capture(self.pg_interfaces)
4079 self.pg0.get_capture(2)
4081 # ACK packet in -> out
4082 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4083 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4084 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4085 flags="A", seq=101, ack=301))
4086 self.pg0.add_stream(p)
4087 self.pg_enable_capture(self.pg_interfaces)
4089 self.pg1.get_capture(1)
4091 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4093 self.assertEqual(len(sessions) - start_sessnum, 0)
4095 def test_tcp_session_close_out(self):
4096 """ Close TCP session from outside network """
4097 self.tcp_port_out = 10505
4098 self.nat44_add_address(self.nat_addr)
4099 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4103 proto=IP_PROTOS.tcp,
4105 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4106 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4109 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4110 start_sessnum = len(sessions)
4112 self.initiate_tcp_session(self.pg0, self.pg1)
4114 # FIN packet out -> in
4115 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4116 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4117 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4118 flags="FA", seq=100, ack=300))
4119 self.pg1.add_stream(p)
4120 self.pg_enable_capture(self.pg_interfaces)
4122 self.pg0.get_capture(1)
4124 # FIN+ACK packet in -> out
4125 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4126 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4127 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4128 flags="FA", seq=300, ack=101))
4130 self.pg0.add_stream(p)
4131 self.pg_enable_capture(self.pg_interfaces)
4133 self.pg1.get_capture(1)
4135 # ACK packet out -> in
4136 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4137 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4138 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4139 flags="A", seq=101, ack=301))
4140 self.pg1.add_stream(p)
4141 self.pg_enable_capture(self.pg_interfaces)
4143 self.pg0.get_capture(1)
4145 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4147 self.assertEqual(len(sessions) - start_sessnum, 0)
4149 def test_tcp_session_close_simultaneous(self):
4150 """ Close TCP session from inside network """
4151 self.tcp_port_out = 10505
4152 self.nat44_add_address(self.nat_addr)
4153 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4157 proto=IP_PROTOS.tcp,
4159 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4160 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4163 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4164 start_sessnum = len(sessions)
4166 self.initiate_tcp_session(self.pg0, self.pg1)
4168 # FIN packet in -> out
4169 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4170 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4171 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4172 flags="FA", seq=100, ack=300))
4173 self.pg0.add_stream(p)
4174 self.pg_enable_capture(self.pg_interfaces)
4176 self.pg1.get_capture(1)
4178 # FIN packet out -> in
4179 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4180 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4181 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4182 flags="FA", seq=300, ack=100))
4183 self.pg1.add_stream(p)
4184 self.pg_enable_capture(self.pg_interfaces)
4186 self.pg0.get_capture(1)
4188 # ACK packet in -> out
4189 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4190 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4191 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4192 flags="A", seq=101, ack=301))
4193 self.pg0.add_stream(p)
4194 self.pg_enable_capture(self.pg_interfaces)
4196 self.pg1.get_capture(1)
4198 # ACK packet out -> in
4199 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4200 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4201 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4202 flags="A", seq=301, ack=101))
4203 self.pg1.add_stream(p)
4204 self.pg_enable_capture(self.pg_interfaces)
4206 self.pg0.get_capture(1)
4208 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4210 self.assertEqual(len(sessions) - start_sessnum, 0)
4212 def test_one_armed_nat44_static(self):
4213 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4214 remote_host = self.pg4.remote_hosts[0]
4215 local_host = self.pg4.remote_hosts[1]
4220 self.vapi.nat44_forwarding_enable_disable(1)
4221 self.nat44_add_address(self.nat_addr, twice_nat=1)
4222 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4223 local_port, external_port,
4224 proto=IP_PROTOS.tcp, out2in_only=1,
4226 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4227 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4230 # from client to service
4231 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4232 IP(src=remote_host.ip4, dst=self.nat_addr) /
4233 TCP(sport=12345, dport=external_port))
4234 self.pg4.add_stream(p)
4235 self.pg_enable_capture(self.pg_interfaces)
4237 capture = self.pg4.get_capture(1)
4242 self.assertEqual(ip.dst, local_host.ip4)
4243 self.assertEqual(ip.src, self.nat_addr)
4244 self.assertEqual(tcp.dport, local_port)
4245 self.assertNotEqual(tcp.sport, 12345)
4246 eh_port_in = tcp.sport
4247 self.assert_packet_checksums_valid(p)
4249 self.logger.error(ppp("Unexpected or invalid packet:", p))
4252 # from service back to client
4253 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4254 IP(src=local_host.ip4, dst=self.nat_addr) /
4255 TCP(sport=local_port, dport=eh_port_in))
4256 self.pg4.add_stream(p)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 capture = self.pg4.get_capture(1)
4264 self.assertEqual(ip.src, self.nat_addr)
4265 self.assertEqual(ip.dst, remote_host.ip4)
4266 self.assertEqual(tcp.sport, external_port)
4267 self.assertEqual(tcp.dport, 12345)
4268 self.assert_packet_checksums_valid(p)
4270 self.logger.error(ppp("Unexpected or invalid packet:", p))
4273 def test_static_with_port_out2(self):
4274 """ 1:1 NAPT asymmetrical rule """
4279 self.vapi.nat44_forwarding_enable_disable(1)
4280 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4281 local_port, external_port,
4282 proto=IP_PROTOS.tcp, out2in_only=1)
4283 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4284 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4287 # from client to service
4288 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4289 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4290 TCP(sport=12345, dport=external_port))
4291 self.pg1.add_stream(p)
4292 self.pg_enable_capture(self.pg_interfaces)
4294 capture = self.pg0.get_capture(1)
4299 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4300 self.assertEqual(tcp.dport, local_port)
4301 self.assert_packet_checksums_valid(p)
4303 self.logger.error(ppp("Unexpected or invalid packet:", p))
4307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4308 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4309 ICMP(type=11) / capture[0][IP])
4310 self.pg0.add_stream(p)
4311 self.pg_enable_capture(self.pg_interfaces)
4313 capture = self.pg1.get_capture(1)
4316 self.assertEqual(p[IP].src, self.nat_addr)
4318 self.assertEqual(inner.dst, self.nat_addr)
4319 self.assertEqual(inner[TCPerror].dport, external_port)
4321 self.logger.error(ppp("Unexpected or invalid packet:", p))
4324 # from service back to client
4325 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4326 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4327 TCP(sport=local_port, dport=12345))
4328 self.pg0.add_stream(p)
4329 self.pg_enable_capture(self.pg_interfaces)
4331 capture = self.pg1.get_capture(1)
4336 self.assertEqual(ip.src, self.nat_addr)
4337 self.assertEqual(tcp.sport, external_port)
4338 self.assert_packet_checksums_valid(p)
4340 self.logger.error(ppp("Unexpected or invalid packet:", p))
4344 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4345 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4346 ICMP(type=11) / capture[0][IP])
4347 self.pg1.add_stream(p)
4348 self.pg_enable_capture(self.pg_interfaces)
4350 capture = self.pg0.get_capture(1)
4353 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4355 self.assertEqual(inner.src, self.pg0.remote_ip4)
4356 self.assertEqual(inner[TCPerror].sport, local_port)
4358 self.logger.error(ppp("Unexpected or invalid packet:", p))
4361 # from client to server (no translation)
4362 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4363 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4364 TCP(sport=12346, dport=local_port))
4365 self.pg1.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 capture = self.pg0.get_capture(1)
4373 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4374 self.assertEqual(tcp.dport, local_port)
4375 self.assert_packet_checksums_valid(p)
4377 self.logger.error(ppp("Unexpected or invalid packet:", p))
4380 # from service back to client (no translation)
4381 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4382 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4383 TCP(sport=local_port, dport=12346))
4384 self.pg0.add_stream(p)
4385 self.pg_enable_capture(self.pg_interfaces)
4387 capture = self.pg1.get_capture(1)
4392 self.assertEqual(ip.src, self.pg0.remote_ip4)
4393 self.assertEqual(tcp.sport, local_port)
4394 self.assert_packet_checksums_valid(p)
4396 self.logger.error(ppp("Unexpected or invalid packet:", p))
4399 def test_output_feature(self):
4400 """ NAT44 interface output feature (in2out postrouting) """
4401 self.vapi.nat44_forwarding_enable_disable(1)
4402 self.nat44_add_address(self.nat_addr)
4403 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4405 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4409 pkts = self.create_stream_in(self.pg0, self.pg1)
4410 self.pg0.add_stream(pkts)
4411 self.pg_enable_capture(self.pg_interfaces)
4413 capture = self.pg1.get_capture(len(pkts))
4414 self.verify_capture_out(capture)
4417 pkts = self.create_stream_out(self.pg1)
4418 self.pg1.add_stream(pkts)
4419 self.pg_enable_capture(self.pg_interfaces)
4421 capture = self.pg0.get_capture(len(pkts))
4422 self.verify_capture_in(capture, self.pg0)
4425 super(TestNAT44EndpointDependent, self).tearDown()
4426 if not self.vpp_dead:
4427 self.logger.info(self.vapi.cli("show nat44 addresses"))
4428 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4429 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4430 self.logger.info(self.vapi.cli("show nat44 interface address"))
4431 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4432 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4434 self.vapi.cli("clear logging")
4437 class TestNAT44Out2InDPO(MethodHolder):
4438 """ NAT44 Test Cases using out2in DPO """
4441 def setUpConstants(cls):
4442 super(TestNAT44Out2InDPO, cls).setUpConstants()
4443 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4446 def setUpClass(cls):
4447 super(TestNAT44Out2InDPO, cls).setUpClass()
4448 cls.vapi.cli("set log class nat level debug")
4451 cls.tcp_port_in = 6303
4452 cls.tcp_port_out = 6303
4453 cls.udp_port_in = 6304
4454 cls.udp_port_out = 6304
4455 cls.icmp_id_in = 6305
4456 cls.icmp_id_out = 6305
4457 cls.nat_addr = '10.0.0.3'
4458 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4459 cls.dst_ip4 = '192.168.70.1'
4461 cls.create_pg_interfaces(range(2))
4464 cls.pg0.config_ip4()
4465 cls.pg0.resolve_arp()
4468 cls.pg1.config_ip6()
4469 cls.pg1.resolve_ndp()
4471 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4472 dst_address_length=0,
4473 next_hop_address=cls.pg1.remote_ip6n,
4474 next_hop_sw_if_index=cls.pg1.sw_if_index)
4477 super(TestNAT44Out2InDPO, cls).tearDownClass()
4480 def configure_xlat(self):
4481 self.dst_ip6_pfx = '1:2:3::'
4482 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4484 self.dst_ip6_pfx_len = 96
4485 self.src_ip6_pfx = '4:5:6::'
4486 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4488 self.src_ip6_pfx_len = 96
4489 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4490 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4491 '\x00\x00\x00\x00', 0, is_translation=1,
4494 def test_464xlat_ce(self):
4495 """ Test 464XLAT CE with NAT44 """
4497 self.configure_xlat()
4499 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4500 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4502 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4503 self.dst_ip6_pfx_len)
4504 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4505 self.src_ip6_pfx_len)
4508 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4509 self.pg0.add_stream(pkts)
4510 self.pg_enable_capture(self.pg_interfaces)
4512 capture = self.pg1.get_capture(len(pkts))
4513 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4516 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4518 self.pg1.add_stream(pkts)
4519 self.pg_enable_capture(self.pg_interfaces)
4521 capture = self.pg0.get_capture(len(pkts))
4522 self.verify_capture_in(capture, self.pg0)
4524 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4526 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4527 self.nat_addr_n, is_add=0)
4529 def test_464xlat_ce_no_nat(self):
4530 """ Test 464XLAT CE without NAT44 """
4532 self.configure_xlat()
4534 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4535 self.dst_ip6_pfx_len)
4536 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4537 self.src_ip6_pfx_len)
4539 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4540 self.pg0.add_stream(pkts)
4541 self.pg_enable_capture(self.pg_interfaces)
4543 capture = self.pg1.get_capture(len(pkts))
4544 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4545 nat_ip=out_dst_ip6, same_port=True)
4547 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4548 self.pg1.add_stream(pkts)
4549 self.pg_enable_capture(self.pg_interfaces)
4551 capture = self.pg0.get_capture(len(pkts))
4552 self.verify_capture_in(capture, self.pg0)
4555 class TestDeterministicNAT(MethodHolder):
4556 """ Deterministic NAT Test Cases """
4559 def setUpConstants(cls):
4560 super(TestDeterministicNAT, cls).setUpConstants()
4561 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4564 def setUpClass(cls):
4565 super(TestDeterministicNAT, cls).setUpClass()
4566 cls.vapi.cli("set log class nat level debug")
4569 cls.tcp_port_in = 6303
4570 cls.tcp_external_port = 6303
4571 cls.udp_port_in = 6304
4572 cls.udp_external_port = 6304
4573 cls.icmp_id_in = 6305
4574 cls.nat_addr = '10.0.0.3'
4576 cls.create_pg_interfaces(range(3))
4577 cls.interfaces = list(cls.pg_interfaces)
4579 for i in cls.interfaces:
4584 cls.pg0.generate_remote_hosts(2)
4585 cls.pg0.configure_ipv4_neighbors()
4588 super(TestDeterministicNAT, cls).tearDownClass()
4591 def create_stream_in(self, in_if, out_if, ttl=64):
4593 Create packet stream for inside network
4595 :param in_if: Inside interface
4596 :param out_if: Outside interface
4597 :param ttl: TTL of generated packets
4601 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4602 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4603 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4607 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4608 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4609 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4613 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4614 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4615 ICMP(id=self.icmp_id_in, type='echo-request'))
4620 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4622 Create packet stream for outside network
4624 :param out_if: Outside interface
4625 :param dst_ip: Destination IP address (Default use global NAT address)
4626 :param ttl: TTL of generated packets
4629 dst_ip = self.nat_addr
4632 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4633 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4634 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4638 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4639 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4640 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4644 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4645 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4646 ICMP(id=self.icmp_external_id, type='echo-reply'))
4651 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4653 Verify captured packets on outside network
4655 :param capture: Captured packets
4656 :param nat_ip: Translated IP address (Default use global NAT address)
4657 :param same_port: Sorce port number is not translated (Default False)
4658 :param packet_num: Expected number of packets (Default 3)
4661 nat_ip = self.nat_addr
4662 self.assertEqual(packet_num, len(capture))
4663 for packet in capture:
4665 self.assertEqual(packet[IP].src, nat_ip)
4666 if packet.haslayer(TCP):
4667 self.tcp_port_out = packet[TCP].sport
4668 elif packet.haslayer(UDP):
4669 self.udp_port_out = packet[UDP].sport
4671 self.icmp_external_id = packet[ICMP].id
4673 self.logger.error(ppp("Unexpected or invalid packet "
4674 "(outside network):", packet))
4677 def verify_ipfix_max_entries_per_user(self, data):
4679 Verify IPFIX maximum entries per user exceeded event
4681 :param data: Decoded IPFIX data records
4683 self.assertEqual(1, len(data))
4686 self.assertEqual(ord(record[230]), 13)
4687 # natQuotaExceededEvent
4688 self.assertEqual('\x03\x00\x00\x00', record[466])
4690 self.assertEqual('\xe8\x03\x00\x00', record[473])
4692 self.assertEqual(self.pg0.remote_ip4n, record[8])
4694 def test_deterministic_mode(self):
4695 """ NAT plugin run deterministic mode """
4696 in_addr = '172.16.255.0'
4697 out_addr = '172.17.255.50'
4698 in_addr_t = '172.16.255.20'
4699 in_addr_n = socket.inet_aton(in_addr)
4700 out_addr_n = socket.inet_aton(out_addr)
4701 in_addr_t_n = socket.inet_aton(in_addr_t)
4705 nat_config = self.vapi.nat_show_config()
4706 self.assertEqual(1, nat_config.deterministic)
4708 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4710 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4711 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4712 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4713 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4715 deterministic_mappings = self.vapi.nat_det_map_dump()
4716 self.assertEqual(len(deterministic_mappings), 1)
4717 dsm = deterministic_mappings[0]
4718 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4719 self.assertEqual(in_plen, dsm.in_plen)
4720 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4721 self.assertEqual(out_plen, dsm.out_plen)
4723 self.clear_nat_det()
4724 deterministic_mappings = self.vapi.nat_det_map_dump()
4725 self.assertEqual(len(deterministic_mappings), 0)
4727 def test_set_timeouts(self):
4728 """ Set deterministic NAT timeouts """
4729 timeouts_before = self.vapi.nat_det_get_timeouts()
4731 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4732 timeouts_before.tcp_established + 10,
4733 timeouts_before.tcp_transitory + 10,
4734 timeouts_before.icmp + 10)
4736 timeouts_after = self.vapi.nat_det_get_timeouts()
4738 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4739 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4740 self.assertNotEqual(timeouts_before.tcp_established,
4741 timeouts_after.tcp_established)
4742 self.assertNotEqual(timeouts_before.tcp_transitory,
4743 timeouts_after.tcp_transitory)
4745 def test_det_in(self):
4746 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4748 nat_ip = "10.0.0.10"
4750 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4752 socket.inet_aton(nat_ip),
4754 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4755 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4759 pkts = self.create_stream_in(self.pg0, self.pg1)
4760 self.pg0.add_stream(pkts)
4761 self.pg_enable_capture(self.pg_interfaces)
4763 capture = self.pg1.get_capture(len(pkts))
4764 self.verify_capture_out(capture, nat_ip)
4767 pkts = self.create_stream_out(self.pg1, nat_ip)
4768 self.pg1.add_stream(pkts)
4769 self.pg_enable_capture(self.pg_interfaces)
4771 capture = self.pg0.get_capture(len(pkts))
4772 self.verify_capture_in(capture, self.pg0)
4775 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4776 self.assertEqual(len(sessions), 3)
4780 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4781 self.assertEqual(s.in_port, self.tcp_port_in)
4782 self.assertEqual(s.out_port, self.tcp_port_out)
4783 self.assertEqual(s.ext_port, self.tcp_external_port)
4787 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4788 self.assertEqual(s.in_port, self.udp_port_in)
4789 self.assertEqual(s.out_port, self.udp_port_out)
4790 self.assertEqual(s.ext_port, self.udp_external_port)
4794 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4795 self.assertEqual(s.in_port, self.icmp_id_in)
4796 self.assertEqual(s.out_port, self.icmp_external_id)
4798 def test_multiple_users(self):
4799 """ Deterministic NAT multiple users """
4801 nat_ip = "10.0.0.10"
4803 external_port = 6303
4805 host0 = self.pg0.remote_hosts[0]
4806 host1 = self.pg0.remote_hosts[1]
4808 self.vapi.nat_det_add_del_map(host0.ip4n,
4810 socket.inet_aton(nat_ip),
4812 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4813 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4817 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4818 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4819 TCP(sport=port_in, dport=external_port))
4820 self.pg0.add_stream(p)
4821 self.pg_enable_capture(self.pg_interfaces)
4823 capture = self.pg1.get_capture(1)
4828 self.assertEqual(ip.src, nat_ip)
4829 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4830 self.assertEqual(tcp.dport, external_port)
4831 port_out0 = tcp.sport
4833 self.logger.error(ppp("Unexpected or invalid packet:", p))
4837 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4838 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4839 TCP(sport=port_in, dport=external_port))
4840 self.pg0.add_stream(p)
4841 self.pg_enable_capture(self.pg_interfaces)
4843 capture = self.pg1.get_capture(1)
4848 self.assertEqual(ip.src, nat_ip)
4849 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4850 self.assertEqual(tcp.dport, external_port)
4851 port_out1 = tcp.sport
4853 self.logger.error(ppp("Unexpected or invalid packet:", p))
4856 dms = self.vapi.nat_det_map_dump()
4857 self.assertEqual(1, len(dms))
4858 self.assertEqual(2, dms[0].ses_num)
4861 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4862 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4863 TCP(sport=external_port, dport=port_out0))
4864 self.pg1.add_stream(p)
4865 self.pg_enable_capture(self.pg_interfaces)
4867 capture = self.pg0.get_capture(1)
4872 self.assertEqual(ip.src, self.pg1.remote_ip4)
4873 self.assertEqual(ip.dst, host0.ip4)
4874 self.assertEqual(tcp.dport, port_in)
4875 self.assertEqual(tcp.sport, external_port)
4877 self.logger.error(ppp("Unexpected or invalid packet:", p))
4881 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4882 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4883 TCP(sport=external_port, dport=port_out1))
4884 self.pg1.add_stream(p)
4885 self.pg_enable_capture(self.pg_interfaces)
4887 capture = self.pg0.get_capture(1)
4892 self.assertEqual(ip.src, self.pg1.remote_ip4)
4893 self.assertEqual(ip.dst, host1.ip4)
4894 self.assertEqual(tcp.dport, port_in)
4895 self.assertEqual(tcp.sport, external_port)
4897 self.logger.error(ppp("Unexpected or invalid packet", p))
4900 # session close api test
4901 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4903 self.pg1.remote_ip4n,
4905 dms = self.vapi.nat_det_map_dump()
4906 self.assertEqual(dms[0].ses_num, 1)
4908 self.vapi.nat_det_close_session_in(host0.ip4n,
4910 self.pg1.remote_ip4n,
4912 dms = self.vapi.nat_det_map_dump()
4913 self.assertEqual(dms[0].ses_num, 0)
4915 def test_tcp_session_close_detection_in(self):
4916 """ Deterministic NAT TCP session close from inside network """
4917 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4919 socket.inet_aton(self.nat_addr),
4921 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4922 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4925 self.initiate_tcp_session(self.pg0, self.pg1)
4927 # close the session from inside
4929 # FIN packet in -> out
4930 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4931 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4932 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4934 self.pg0.add_stream(p)
4935 self.pg_enable_capture(self.pg_interfaces)
4937 self.pg1.get_capture(1)
4941 # ACK packet out -> in
4942 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4943 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4944 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4948 # FIN packet out -> in
4949 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4950 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4951 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4955 self.pg1.add_stream(pkts)
4956 self.pg_enable_capture(self.pg_interfaces)
4958 self.pg0.get_capture(2)
4960 # ACK packet in -> out
4961 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4962 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4963 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4965 self.pg0.add_stream(p)
4966 self.pg_enable_capture(self.pg_interfaces)
4968 self.pg1.get_capture(1)
4970 # Check if deterministic NAT44 closed the session
4971 dms = self.vapi.nat_det_map_dump()
4972 self.assertEqual(0, dms[0].ses_num)
4974 self.logger.error("TCP session termination failed")
4977 def test_tcp_session_close_detection_out(self):
4978 """ Deterministic NAT TCP session close from outside network """
4979 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4981 socket.inet_aton(self.nat_addr),
4983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4987 self.initiate_tcp_session(self.pg0, self.pg1)
4989 # close the session from outside
4991 # FIN packet out -> in
4992 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4993 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4994 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4996 self.pg1.add_stream(p)
4997 self.pg_enable_capture(self.pg_interfaces)
4999 self.pg0.get_capture(1)
5003 # ACK packet in -> out
5004 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5005 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5006 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5010 # ACK packet in -> out
5011 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5012 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5013 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5017 self.pg0.add_stream(pkts)
5018 self.pg_enable_capture(self.pg_interfaces)
5020 self.pg1.get_capture(2)
5022 # ACK packet out -> in
5023 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5024 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5025 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5027 self.pg1.add_stream(p)
5028 self.pg_enable_capture(self.pg_interfaces)
5030 self.pg0.get_capture(1)
5032 # Check if deterministic NAT44 closed the session
5033 dms = self.vapi.nat_det_map_dump()
5034 self.assertEqual(0, dms[0].ses_num)
5036 self.logger.error("TCP session termination failed")
5039 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5040 def test_session_timeout(self):
5041 """ Deterministic NAT session timeouts """
5042 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5044 socket.inet_aton(self.nat_addr),
5046 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5047 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5050 self.initiate_tcp_session(self.pg0, self.pg1)
5051 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5052 pkts = self.create_stream_in(self.pg0, self.pg1)
5053 self.pg0.add_stream(pkts)
5054 self.pg_enable_capture(self.pg_interfaces)
5056 capture = self.pg1.get_capture(len(pkts))
5059 dms = self.vapi.nat_det_map_dump()
5060 self.assertEqual(0, dms[0].ses_num)
5062 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5063 def test_session_limit_per_user(self):
5064 """ Deterministic NAT maximum sessions per user limit """
5065 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5067 socket.inet_aton(self.nat_addr),
5069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5072 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5073 src_address=self.pg2.local_ip4n,
5075 template_interval=10)
5076 self.vapi.nat_ipfix()
5079 for port in range(1025, 2025):
5080 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5081 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5082 UDP(sport=port, dport=port))
5085 self.pg0.add_stream(pkts)
5086 self.pg_enable_capture(self.pg_interfaces)
5088 capture = self.pg1.get_capture(len(pkts))
5090 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5091 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5092 UDP(sport=3001, dport=3002))
5093 self.pg0.add_stream(p)
5094 self.pg_enable_capture(self.pg_interfaces)
5096 capture = self.pg1.assert_nothing_captured()
5098 # verify ICMP error packet
5099 capture = self.pg0.get_capture(1)
5101 self.assertTrue(p.haslayer(ICMP))
5103 self.assertEqual(icmp.type, 3)
5104 self.assertEqual(icmp.code, 1)
5105 self.assertTrue(icmp.haslayer(IPerror))
5106 inner_ip = icmp[IPerror]
5107 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5108 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5110 dms = self.vapi.nat_det_map_dump()
5112 self.assertEqual(1000, dms[0].ses_num)
5114 # verify IPFIX logging
5115 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5117 capture = self.pg2.get_capture(2)
5118 ipfix = IPFIXDecoder()
5119 # first load template
5121 self.assertTrue(p.haslayer(IPFIX))
5122 if p.haslayer(Template):
5123 ipfix.add_template(p.getlayer(Template))
5124 # verify events in data set
5126 if p.haslayer(Data):
5127 data = ipfix.decode_data_set(p.getlayer(Set))
5128 self.verify_ipfix_max_entries_per_user(data)
5130 def clear_nat_det(self):
5132 Clear deterministic NAT configuration.
5134 self.vapi.nat_ipfix(enable=0)
5135 self.vapi.nat_det_set_timeouts()
5136 deterministic_mappings = self.vapi.nat_det_map_dump()
5137 for dsm in deterministic_mappings:
5138 self.vapi.nat_det_add_del_map(dsm.in_addr,
5144 interfaces = self.vapi.nat44_interface_dump()
5145 for intf in interfaces:
5146 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5151 super(TestDeterministicNAT, self).tearDown()
5152 if not self.vpp_dead:
5153 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5155 self.vapi.cli("show nat44 deterministic mappings"))
5157 self.vapi.cli("show nat44 deterministic timeouts"))
5159 self.vapi.cli("show nat44 deterministic sessions"))
5160 self.clear_nat_det()
5163 class TestNAT64(MethodHolder):
5164 """ NAT64 Test Cases """
5167 def setUpConstants(cls):
5168 super(TestNAT64, cls).setUpConstants()
5169 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5170 "nat64 st hash buckets 256", "}"])
5173 def setUpClass(cls):
5174 super(TestNAT64, cls).setUpClass()
5177 cls.tcp_port_in = 6303
5178 cls.tcp_port_out = 6303
5179 cls.udp_port_in = 6304
5180 cls.udp_port_out = 6304
5181 cls.icmp_id_in = 6305
5182 cls.icmp_id_out = 6305
5183 cls.nat_addr = '10.0.0.3'
5184 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5186 cls.vrf1_nat_addr = '10.0.10.3'
5187 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5189 cls.ipfix_src_port = 4739
5190 cls.ipfix_domain_id = 1
5192 cls.create_pg_interfaces(range(5))
5193 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5194 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5195 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5197 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5199 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5201 cls.pg0.generate_remote_hosts(2)
5203 for i in cls.ip6_interfaces:
5206 i.configure_ipv6_neighbors()
5208 for i in cls.ip4_interfaces:
5214 cls.pg3.config_ip4()
5215 cls.pg3.resolve_arp()
5216 cls.pg3.config_ip6()
5217 cls.pg3.configure_ipv6_neighbors()
5220 super(TestNAT64, cls).tearDownClass()
5223 def test_pool(self):
5224 """ Add/delete address to NAT64 pool """
5225 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5227 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5229 addresses = self.vapi.nat64_pool_addr_dump()
5230 self.assertEqual(len(addresses), 1)
5231 self.assertEqual(addresses[0].address, nat_addr)
5233 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5235 addresses = self.vapi.nat64_pool_addr_dump()
5236 self.assertEqual(len(addresses), 0)
5238 def test_interface(self):
5239 """ Enable/disable NAT64 feature on the interface """
5240 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5241 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5243 interfaces = self.vapi.nat64_interface_dump()
5244 self.assertEqual(len(interfaces), 2)
5247 for intf in interfaces:
5248 if intf.sw_if_index == self.pg0.sw_if_index:
5249 self.assertEqual(intf.is_inside, 1)
5251 elif intf.sw_if_index == self.pg1.sw_if_index:
5252 self.assertEqual(intf.is_inside, 0)
5254 self.assertTrue(pg0_found)
5255 self.assertTrue(pg1_found)
5257 features = self.vapi.cli("show interface features pg0")
5258 self.assertNotEqual(features.find('nat64-in2out'), -1)
5259 features = self.vapi.cli("show interface features pg1")
5260 self.assertNotEqual(features.find('nat64-out2in'), -1)
5262 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5263 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5265 interfaces = self.vapi.nat64_interface_dump()
5266 self.assertEqual(len(interfaces), 0)
5268 def test_static_bib(self):
5269 """ Add/delete static BIB entry """
5270 in_addr = socket.inet_pton(socket.AF_INET6,
5271 '2001:db8:85a3::8a2e:370:7334')
5272 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5275 proto = IP_PROTOS.tcp
5277 self.vapi.nat64_add_del_static_bib(in_addr,
5282 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5287 self.assertEqual(bibe.i_addr, in_addr)
5288 self.assertEqual(bibe.o_addr, out_addr)
5289 self.assertEqual(bibe.i_port, in_port)
5290 self.assertEqual(bibe.o_port, out_port)
5291 self.assertEqual(static_bib_num, 1)
5293 self.vapi.nat64_add_del_static_bib(in_addr,
5299 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5304 self.assertEqual(static_bib_num, 0)
5306 def test_set_timeouts(self):
5307 """ Set NAT64 timeouts """
5308 # verify default values
5309 timeouts = self.vapi.nat64_get_timeouts()
5310 self.assertEqual(timeouts.udp, 300)
5311 self.assertEqual(timeouts.icmp, 60)
5312 self.assertEqual(timeouts.tcp_trans, 240)
5313 self.assertEqual(timeouts.tcp_est, 7440)
5314 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5316 # set and verify custom values
5317 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5318 tcp_est=7450, tcp_incoming_syn=10)
5319 timeouts = self.vapi.nat64_get_timeouts()
5320 self.assertEqual(timeouts.udp, 200)
5321 self.assertEqual(timeouts.icmp, 30)
5322 self.assertEqual(timeouts.tcp_trans, 250)
5323 self.assertEqual(timeouts.tcp_est, 7450)
5324 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5326 def test_dynamic(self):
5327 """ NAT64 dynamic translation test """
5328 self.tcp_port_in = 6303
5329 self.udp_port_in = 6304
5330 self.icmp_id_in = 6305
5332 ses_num_start = self.nat64_get_ses_num()
5334 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5336 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5337 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5340 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5341 self.pg0.add_stream(pkts)
5342 self.pg_enable_capture(self.pg_interfaces)
5344 capture = self.pg1.get_capture(len(pkts))
5345 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5346 dst_ip=self.pg1.remote_ip4)
5349 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5350 self.pg1.add_stream(pkts)
5351 self.pg_enable_capture(self.pg_interfaces)
5353 capture = self.pg0.get_capture(len(pkts))
5354 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5355 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5358 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5359 self.pg0.add_stream(pkts)
5360 self.pg_enable_capture(self.pg_interfaces)
5362 capture = self.pg1.get_capture(len(pkts))
5363 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5364 dst_ip=self.pg1.remote_ip4)
5367 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5368 self.pg1.add_stream(pkts)
5369 self.pg_enable_capture(self.pg_interfaces)
5371 capture = self.pg0.get_capture(len(pkts))
5372 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5374 ses_num_end = self.nat64_get_ses_num()
5376 self.assertEqual(ses_num_end - ses_num_start, 3)
5378 # tenant with specific VRF
5379 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5380 self.vrf1_nat_addr_n,
5381 vrf_id=self.vrf1_id)
5382 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5384 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5385 self.pg2.add_stream(pkts)
5386 self.pg_enable_capture(self.pg_interfaces)
5388 capture = self.pg1.get_capture(len(pkts))
5389 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5390 dst_ip=self.pg1.remote_ip4)
5392 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5393 self.pg1.add_stream(pkts)
5394 self.pg_enable_capture(self.pg_interfaces)
5396 capture = self.pg2.get_capture(len(pkts))
5397 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5399 def test_static(self):
5400 """ NAT64 static translation test """
5401 self.tcp_port_in = 60303
5402 self.udp_port_in = 60304
5403 self.icmp_id_in = 60305
5404 self.tcp_port_out = 60303
5405 self.udp_port_out = 60304
5406 self.icmp_id_out = 60305
5408 ses_num_start = self.nat64_get_ses_num()
5410 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5412 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5413 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5415 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5420 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5425 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5432 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5433 self.pg0.add_stream(pkts)
5434 self.pg_enable_capture(self.pg_interfaces)
5436 capture = self.pg1.get_capture(len(pkts))
5437 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5438 dst_ip=self.pg1.remote_ip4, same_port=True)
5441 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5442 self.pg1.add_stream(pkts)
5443 self.pg_enable_capture(self.pg_interfaces)
5445 capture = self.pg0.get_capture(len(pkts))
5446 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5447 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5449 ses_num_end = self.nat64_get_ses_num()
5451 self.assertEqual(ses_num_end - ses_num_start, 3)
5453 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5454 def test_session_timeout(self):
5455 """ NAT64 session timeout """
5456 self.icmp_id_in = 1234
5457 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5459 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5460 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5461 self.vapi.nat64_set_timeouts(icmp=5)
5463 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5464 self.pg0.add_stream(pkts)
5465 self.pg_enable_capture(self.pg_interfaces)
5467 capture = self.pg1.get_capture(len(pkts))
5469 ses_num_before_timeout = self.nat64_get_ses_num()
5473 # ICMP session after timeout
5474 ses_num_after_timeout = self.nat64_get_ses_num()
5475 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5477 def test_icmp_error(self):
5478 """ NAT64 ICMP Error message translation """
5479 self.tcp_port_in = 6303
5480 self.udp_port_in = 6304
5481 self.icmp_id_in = 6305
5483 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5485 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5486 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5488 # send some packets to create sessions
5489 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5490 self.pg0.add_stream(pkts)
5491 self.pg_enable_capture(self.pg_interfaces)
5493 capture_ip4 = self.pg1.get_capture(len(pkts))
5494 self.verify_capture_out(capture_ip4,
5495 nat_ip=self.nat_addr,
5496 dst_ip=self.pg1.remote_ip4)
5498 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5499 self.pg1.add_stream(pkts)
5500 self.pg_enable_capture(self.pg_interfaces)
5502 capture_ip6 = self.pg0.get_capture(len(pkts))
5503 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5504 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5505 self.pg0.remote_ip6)
5508 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5509 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5510 ICMPv6DestUnreach(code=1) /
5511 packet[IPv6] for packet in capture_ip6]
5512 self.pg0.add_stream(pkts)
5513 self.pg_enable_capture(self.pg_interfaces)
5515 capture = self.pg1.get_capture(len(pkts))
5516 for packet in capture:
5518 self.assertEqual(packet[IP].src, self.nat_addr)
5519 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5520 self.assertEqual(packet[ICMP].type, 3)
5521 self.assertEqual(packet[ICMP].code, 13)
5522 inner = packet[IPerror]
5523 self.assertEqual(inner.src, self.pg1.remote_ip4)
5524 self.assertEqual(inner.dst, self.nat_addr)
5525 self.assert_packet_checksums_valid(packet)
5526 if inner.haslayer(TCPerror):
5527 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5528 elif inner.haslayer(UDPerror):
5529 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5531 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5533 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5537 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5538 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5539 ICMP(type=3, code=13) /
5540 packet[IP] for packet in capture_ip4]
5541 self.pg1.add_stream(pkts)
5542 self.pg_enable_capture(self.pg_interfaces)
5544 capture = self.pg0.get_capture(len(pkts))
5545 for packet in capture:
5547 self.assertEqual(packet[IPv6].src, ip.src)
5548 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5549 icmp = packet[ICMPv6DestUnreach]
5550 self.assertEqual(icmp.code, 1)
5551 inner = icmp[IPerror6]
5552 self.assertEqual(inner.src, self.pg0.remote_ip6)
5553 self.assertEqual(inner.dst, ip.src)
5554 self.assert_icmpv6_checksum_valid(packet)
5555 if inner.haslayer(TCPerror):
5556 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5557 elif inner.haslayer(UDPerror):
5558 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5560 self.assertEqual(inner[ICMPv6EchoRequest].id,
5563 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5566 def test_hairpinning(self):
5567 """ NAT64 hairpinning """
5569 client = self.pg0.remote_hosts[0]
5570 server = self.pg0.remote_hosts[1]
5571 server_tcp_in_port = 22
5572 server_tcp_out_port = 4022
5573 server_udp_in_port = 23
5574 server_udp_out_port = 4023
5575 client_tcp_in_port = 1234
5576 client_udp_in_port = 1235
5577 client_tcp_out_port = 0
5578 client_udp_out_port = 0
5579 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5580 nat_addr_ip6 = ip.src
5582 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5584 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5585 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5587 self.vapi.nat64_add_del_static_bib(server.ip6n,
5590 server_tcp_out_port,
5592 self.vapi.nat64_add_del_static_bib(server.ip6n,
5595 server_udp_out_port,
5600 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5601 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5602 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5604 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5605 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5606 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5608 self.pg0.add_stream(pkts)
5609 self.pg_enable_capture(self.pg_interfaces)
5611 capture = self.pg0.get_capture(len(pkts))
5612 for packet in capture:
5614 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5615 self.assertEqual(packet[IPv6].dst, server.ip6)
5616 self.assert_packet_checksums_valid(packet)
5617 if packet.haslayer(TCP):
5618 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5619 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5620 client_tcp_out_port = packet[TCP].sport
5622 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5623 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5624 client_udp_out_port = packet[UDP].sport
5626 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5631 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5632 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5633 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5635 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5636 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5637 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5639 self.pg0.add_stream(pkts)
5640 self.pg_enable_capture(self.pg_interfaces)
5642 capture = self.pg0.get_capture(len(pkts))
5643 for packet in capture:
5645 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5646 self.assertEqual(packet[IPv6].dst, client.ip6)
5647 self.assert_packet_checksums_valid(packet)
5648 if packet.haslayer(TCP):
5649 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5650 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5652 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5653 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5655 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5660 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5661 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5662 ICMPv6DestUnreach(code=1) /
5663 packet[IPv6] for packet in capture]
5664 self.pg0.add_stream(pkts)
5665 self.pg_enable_capture(self.pg_interfaces)
5667 capture = self.pg0.get_capture(len(pkts))
5668 for packet in capture:
5670 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5671 self.assertEqual(packet[IPv6].dst, server.ip6)
5672 icmp = packet[ICMPv6DestUnreach]
5673 self.assertEqual(icmp.code, 1)
5674 inner = icmp[IPerror6]
5675 self.assertEqual(inner.src, server.ip6)
5676 self.assertEqual(inner.dst, nat_addr_ip6)
5677 self.assert_packet_checksums_valid(packet)
5678 if inner.haslayer(TCPerror):
5679 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5680 self.assertEqual(inner[TCPerror].dport,
5681 client_tcp_out_port)
5683 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5684 self.assertEqual(inner[UDPerror].dport,
5685 client_udp_out_port)
5687 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5690 def test_prefix(self):
5691 """ NAT64 Network-Specific Prefix """
5693 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5695 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5696 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5697 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5698 self.vrf1_nat_addr_n,
5699 vrf_id=self.vrf1_id)
5700 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5703 global_pref64 = "2001:db8::"
5704 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5705 global_pref64_len = 32
5706 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5708 prefix = self.vapi.nat64_prefix_dump()
5709 self.assertEqual(len(prefix), 1)
5710 self.assertEqual(prefix[0].prefix, global_pref64_n)
5711 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5712 self.assertEqual(prefix[0].vrf_id, 0)
5714 # Add tenant specific prefix
5715 vrf1_pref64 = "2001:db8:122:300::"
5716 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5717 vrf1_pref64_len = 56
5718 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5720 vrf_id=self.vrf1_id)
5721 prefix = self.vapi.nat64_prefix_dump()
5722 self.assertEqual(len(prefix), 2)
5725 pkts = self.create_stream_in_ip6(self.pg0,
5728 plen=global_pref64_len)
5729 self.pg0.add_stream(pkts)
5730 self.pg_enable_capture(self.pg_interfaces)
5732 capture = self.pg1.get_capture(len(pkts))
5733 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5734 dst_ip=self.pg1.remote_ip4)
5736 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5737 self.pg1.add_stream(pkts)
5738 self.pg_enable_capture(self.pg_interfaces)
5740 capture = self.pg0.get_capture(len(pkts))
5741 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5744 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5746 # Tenant specific prefix
5747 pkts = self.create_stream_in_ip6(self.pg2,
5750 plen=vrf1_pref64_len)
5751 self.pg2.add_stream(pkts)
5752 self.pg_enable_capture(self.pg_interfaces)
5754 capture = self.pg1.get_capture(len(pkts))
5755 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5756 dst_ip=self.pg1.remote_ip4)
5758 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5759 self.pg1.add_stream(pkts)
5760 self.pg_enable_capture(self.pg_interfaces)
5762 capture = self.pg2.get_capture(len(pkts))
5763 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5766 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5768 def test_unknown_proto(self):
5769 """ NAT64 translate packet with unknown protocol """
5771 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5773 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5774 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5775 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5778 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5779 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5780 TCP(sport=self.tcp_port_in, dport=20))
5781 self.pg0.add_stream(p)
5782 self.pg_enable_capture(self.pg_interfaces)
5784 p = self.pg1.get_capture(1)
5786 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5787 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5789 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5790 TCP(sport=1234, dport=1234))
5791 self.pg0.add_stream(p)
5792 self.pg_enable_capture(self.pg_interfaces)
5794 p = self.pg1.get_capture(1)
5797 self.assertEqual(packet[IP].src, self.nat_addr)
5798 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5799 self.assertTrue(packet.haslayer(GRE))
5800 self.assert_packet_checksums_valid(packet)
5802 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5806 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5807 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5809 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5810 TCP(sport=1234, dport=1234))
5811 self.pg1.add_stream(p)
5812 self.pg_enable_capture(self.pg_interfaces)
5814 p = self.pg0.get_capture(1)
5817 self.assertEqual(packet[IPv6].src, remote_ip6)
5818 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5819 self.assertEqual(packet[IPv6].nh, 47)
5821 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5824 def test_hairpinning_unknown_proto(self):
5825 """ NAT64 translate packet with unknown protocol - hairpinning """
5827 client = self.pg0.remote_hosts[0]
5828 server = self.pg0.remote_hosts[1]
5829 server_tcp_in_port = 22
5830 server_tcp_out_port = 4022
5831 client_tcp_in_port = 1234
5832 client_tcp_out_port = 1235
5833 server_nat_ip = "10.0.0.100"
5834 client_nat_ip = "10.0.0.110"
5835 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5836 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5837 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5838 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5840 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5842 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5843 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5845 self.vapi.nat64_add_del_static_bib(server.ip6n,
5848 server_tcp_out_port,
5851 self.vapi.nat64_add_del_static_bib(server.ip6n,
5857 self.vapi.nat64_add_del_static_bib(client.ip6n,
5860 client_tcp_out_port,
5864 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5865 IPv6(src=client.ip6, dst=server_nat_ip6) /
5866 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5867 self.pg0.add_stream(p)
5868 self.pg_enable_capture(self.pg_interfaces)
5870 p = self.pg0.get_capture(1)
5872 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5873 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5875 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5876 TCP(sport=1234, dport=1234))
5877 self.pg0.add_stream(p)
5878 self.pg_enable_capture(self.pg_interfaces)
5880 p = self.pg0.get_capture(1)
5883 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5884 self.assertEqual(packet[IPv6].dst, server.ip6)
5885 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5891 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5892 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5894 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5895 TCP(sport=1234, dport=1234))
5896 self.pg0.add_stream(p)
5897 self.pg_enable_capture(self.pg_interfaces)
5899 p = self.pg0.get_capture(1)
5902 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5903 self.assertEqual(packet[IPv6].dst, client.ip6)
5904 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5906 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5909 def test_one_armed_nat64(self):
5910 """ One armed NAT64 """
5912 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5916 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5918 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5919 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5922 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5923 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5924 TCP(sport=12345, dport=80))
5925 self.pg3.add_stream(p)
5926 self.pg_enable_capture(self.pg_interfaces)
5928 capture = self.pg3.get_capture(1)
5933 self.assertEqual(ip.src, self.nat_addr)
5934 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5935 self.assertNotEqual(tcp.sport, 12345)
5936 external_port = tcp.sport
5937 self.assertEqual(tcp.dport, 80)
5938 self.assert_packet_checksums_valid(p)
5940 self.logger.error(ppp("Unexpected or invalid packet:", p))
5944 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5945 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5946 TCP(sport=80, dport=external_port))
5947 self.pg3.add_stream(p)
5948 self.pg_enable_capture(self.pg_interfaces)
5950 capture = self.pg3.get_capture(1)
5955 self.assertEqual(ip.src, remote_host_ip6)
5956 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5957 self.assertEqual(tcp.sport, 80)
5958 self.assertEqual(tcp.dport, 12345)
5959 self.assert_packet_checksums_valid(p)
5961 self.logger.error(ppp("Unexpected or invalid packet:", p))
5964 def test_frag_in_order(self):
5965 """ NAT64 translate fragments arriving in order """
5966 self.tcp_port_in = random.randint(1025, 65535)
5968 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5970 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5971 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5973 reass = self.vapi.nat_reass_dump()
5974 reass_n_start = len(reass)
5978 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5979 self.tcp_port_in, 20, data)
5980 self.pg0.add_stream(pkts)
5981 self.pg_enable_capture(self.pg_interfaces)
5983 frags = self.pg1.get_capture(len(pkts))
5984 p = self.reass_frags_and_verify(frags,
5986 self.pg1.remote_ip4)
5987 self.assertEqual(p[TCP].dport, 20)
5988 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5989 self.tcp_port_out = p[TCP].sport
5990 self.assertEqual(data, p[Raw].load)
5993 data = "A" * 4 + "b" * 16 + "C" * 3
5994 pkts = self.create_stream_frag(self.pg1,
5999 self.pg1.add_stream(pkts)
6000 self.pg_enable_capture(self.pg_interfaces)
6002 frags = self.pg0.get_capture(len(pkts))
6003 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6004 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6005 self.assertEqual(p[TCP].sport, 20)
6006 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6007 self.assertEqual(data, p[Raw].load)
6009 reass = self.vapi.nat_reass_dump()
6010 reass_n_end = len(reass)
6012 self.assertEqual(reass_n_end - reass_n_start, 2)
6014 def test_reass_hairpinning(self):
6015 """ NAT64 fragments hairpinning """
6017 server = self.pg0.remote_hosts[1]
6018 server_in_port = random.randint(1025, 65535)
6019 server_out_port = random.randint(1025, 65535)
6020 client_in_port = random.randint(1025, 65535)
6021 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6022 nat_addr_ip6 = ip.src
6024 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6026 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6027 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6029 # add static BIB entry for server
6030 self.vapi.nat64_add_del_static_bib(server.ip6n,
6036 # send packet from host to server
6037 pkts = self.create_stream_frag_ip6(self.pg0,
6042 self.pg0.add_stream(pkts)
6043 self.pg_enable_capture(self.pg_interfaces)
6045 frags = self.pg0.get_capture(len(pkts))
6046 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6047 self.assertNotEqual(p[TCP].sport, client_in_port)
6048 self.assertEqual(p[TCP].dport, server_in_port)
6049 self.assertEqual(data, p[Raw].load)
6051 def test_frag_out_of_order(self):
6052 """ NAT64 translate fragments arriving out of order """
6053 self.tcp_port_in = random.randint(1025, 65535)
6055 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6057 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6058 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6062 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6063 self.tcp_port_in, 20, data)
6065 self.pg0.add_stream(pkts)
6066 self.pg_enable_capture(self.pg_interfaces)
6068 frags = self.pg1.get_capture(len(pkts))
6069 p = self.reass_frags_and_verify(frags,
6071 self.pg1.remote_ip4)
6072 self.assertEqual(p[TCP].dport, 20)
6073 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6074 self.tcp_port_out = p[TCP].sport
6075 self.assertEqual(data, p[Raw].load)
6078 data = "A" * 4 + "B" * 16 + "C" * 3
6079 pkts = self.create_stream_frag(self.pg1,
6085 self.pg1.add_stream(pkts)
6086 self.pg_enable_capture(self.pg_interfaces)
6088 frags = self.pg0.get_capture(len(pkts))
6089 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6090 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6091 self.assertEqual(p[TCP].sport, 20)
6092 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6093 self.assertEqual(data, p[Raw].load)
6095 def test_interface_addr(self):
6096 """ Acquire NAT64 pool addresses from interface """
6097 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6099 # no address in NAT64 pool
6100 adresses = self.vapi.nat44_address_dump()
6101 self.assertEqual(0, len(adresses))
6103 # configure interface address and check NAT64 address pool
6104 self.pg4.config_ip4()
6105 addresses = self.vapi.nat64_pool_addr_dump()
6106 self.assertEqual(len(addresses), 1)
6107 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6109 # remove interface address and check NAT64 address pool
6110 self.pg4.unconfig_ip4()
6111 addresses = self.vapi.nat64_pool_addr_dump()
6112 self.assertEqual(0, len(adresses))
6114 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6115 def test_ipfix_max_bibs_sessions(self):
6116 """ IPFIX logging maximum session and BIB entries exceeded """
6119 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6123 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6125 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6126 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6130 for i in range(0, max_bibs):
6131 src = "fd01:aa::%x" % (i)
6132 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6133 IPv6(src=src, dst=remote_host_ip6) /
6134 TCP(sport=12345, dport=80))
6136 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6137 IPv6(src=src, dst=remote_host_ip6) /
6138 TCP(sport=12345, dport=22))
6140 self.pg0.add_stream(pkts)
6141 self.pg_enable_capture(self.pg_interfaces)
6143 self.pg1.get_capture(max_sessions)
6145 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6146 src_address=self.pg3.local_ip4n,
6148 template_interval=10)
6149 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6150 src_port=self.ipfix_src_port)
6152 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6153 IPv6(src=src, dst=remote_host_ip6) /
6154 TCP(sport=12345, dport=25))
6155 self.pg0.add_stream(p)
6156 self.pg_enable_capture(self.pg_interfaces)
6158 self.pg1.assert_nothing_captured()
6160 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6161 capture = self.pg3.get_capture(9)
6162 ipfix = IPFIXDecoder()
6163 # first load template
6165 self.assertTrue(p.haslayer(IPFIX))
6166 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6167 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6168 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6169 self.assertEqual(p[UDP].dport, 4739)
6170 self.assertEqual(p[IPFIX].observationDomainID,
6171 self.ipfix_domain_id)
6172 if p.haslayer(Template):
6173 ipfix.add_template(p.getlayer(Template))
6174 # verify events in data set
6176 if p.haslayer(Data):
6177 data = ipfix.decode_data_set(p.getlayer(Set))
6178 self.verify_ipfix_max_sessions(data, max_sessions)
6180 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6181 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6182 TCP(sport=12345, dport=80))
6183 self.pg0.add_stream(p)
6184 self.pg_enable_capture(self.pg_interfaces)
6186 self.pg1.assert_nothing_captured()
6188 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6189 capture = self.pg3.get_capture(1)
6190 # verify events in data set
6192 self.assertTrue(p.haslayer(IPFIX))
6193 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6194 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6195 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6196 self.assertEqual(p[UDP].dport, 4739)
6197 self.assertEqual(p[IPFIX].observationDomainID,
6198 self.ipfix_domain_id)
6199 if p.haslayer(Data):
6200 data = ipfix.decode_data_set(p.getlayer(Set))
6201 self.verify_ipfix_max_bibs(data, max_bibs)
6203 def test_ipfix_max_frags(self):
6204 """ IPFIX logging maximum fragments pending reassembly exceeded """
6205 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6207 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6208 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6209 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6210 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6211 src_address=self.pg3.local_ip4n,
6213 template_interval=10)
6214 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6215 src_port=self.ipfix_src_port)
6218 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6219 self.tcp_port_in, 20, data)
6220 self.pg0.add_stream(pkts[-1])
6221 self.pg_enable_capture(self.pg_interfaces)
6223 self.pg1.assert_nothing_captured()
6225 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6226 capture = self.pg3.get_capture(9)
6227 ipfix = IPFIXDecoder()
6228 # first load template
6230 self.assertTrue(p.haslayer(IPFIX))
6231 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6232 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6233 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6234 self.assertEqual(p[UDP].dport, 4739)
6235 self.assertEqual(p[IPFIX].observationDomainID,
6236 self.ipfix_domain_id)
6237 if p.haslayer(Template):
6238 ipfix.add_template(p.getlayer(Template))
6239 # verify events in data set
6241 if p.haslayer(Data):
6242 data = ipfix.decode_data_set(p.getlayer(Set))
6243 self.verify_ipfix_max_fragments_ip6(data, 0,
6244 self.pg0.remote_ip6n)
6246 def test_ipfix_bib_ses(self):
6247 """ IPFIX logging NAT64 BIB/session create and delete events """
6248 self.tcp_port_in = random.randint(1025, 65535)
6249 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6253 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6255 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6256 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6257 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6258 src_address=self.pg3.local_ip4n,
6260 template_interval=10)
6261 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6262 src_port=self.ipfix_src_port)
6265 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6266 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6267 TCP(sport=self.tcp_port_in, dport=25))
6268 self.pg0.add_stream(p)
6269 self.pg_enable_capture(self.pg_interfaces)
6271 p = self.pg1.get_capture(1)
6272 self.tcp_port_out = p[0][TCP].sport
6273 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6274 capture = self.pg3.get_capture(10)
6275 ipfix = IPFIXDecoder()
6276 # first load template
6278 self.assertTrue(p.haslayer(IPFIX))
6279 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6280 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6281 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6282 self.assertEqual(p[UDP].dport, 4739)
6283 self.assertEqual(p[IPFIX].observationDomainID,
6284 self.ipfix_domain_id)
6285 if p.haslayer(Template):
6286 ipfix.add_template(p.getlayer(Template))
6287 # verify events in data set
6289 if p.haslayer(Data):
6290 data = ipfix.decode_data_set(p.getlayer(Set))
6291 if ord(data[0][230]) == 10:
6292 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6293 elif ord(data[0][230]) == 6:
6294 self.verify_ipfix_nat64_ses(data,
6296 self.pg0.remote_ip6n,
6297 self.pg1.remote_ip4,
6300 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6303 self.pg_enable_capture(self.pg_interfaces)
6304 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6307 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6308 capture = self.pg3.get_capture(2)
6309 # verify events in data set
6311 self.assertTrue(p.haslayer(IPFIX))
6312 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6313 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6314 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6315 self.assertEqual(p[UDP].dport, 4739)
6316 self.assertEqual(p[IPFIX].observationDomainID,
6317 self.ipfix_domain_id)
6318 if p.haslayer(Data):
6319 data = ipfix.decode_data_set(p.getlayer(Set))
6320 if ord(data[0][230]) == 11:
6321 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6322 elif ord(data[0][230]) == 7:
6323 self.verify_ipfix_nat64_ses(data,
6325 self.pg0.remote_ip6n,
6326 self.pg1.remote_ip4,
6329 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6331 def nat64_get_ses_num(self):
6333 Return number of active NAT64 sessions.
6335 st = self.vapi.nat64_st_dump()
6338 def clear_nat64(self):
6340 Clear NAT64 configuration.
6342 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6343 domain_id=self.ipfix_domain_id)
6344 self.ipfix_src_port = 4739
6345 self.ipfix_domain_id = 1
6347 self.vapi.nat64_set_timeouts()
6349 interfaces = self.vapi.nat64_interface_dump()
6350 for intf in interfaces:
6351 if intf.is_inside > 1:
6352 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6355 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6359 bib = self.vapi.nat64_bib_dump(255)
6362 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6370 adresses = self.vapi.nat64_pool_addr_dump()
6371 for addr in adresses:
6372 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6377 prefixes = self.vapi.nat64_prefix_dump()
6378 for prefix in prefixes:
6379 self.vapi.nat64_add_del_prefix(prefix.prefix,
6381 vrf_id=prefix.vrf_id,
6385 super(TestNAT64, self).tearDown()
6386 if not self.vpp_dead:
6387 self.logger.info(self.vapi.cli("show nat64 pool"))
6388 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6389 self.logger.info(self.vapi.cli("show nat64 prefix"))
6390 self.logger.info(self.vapi.cli("show nat64 bib all"))
6391 self.logger.info(self.vapi.cli("show nat64 session table all"))
6392 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6396 class TestDSlite(MethodHolder):
6397 """ DS-Lite Test Cases """
6400 def setUpClass(cls):
6401 super(TestDSlite, cls).setUpClass()
6404 cls.nat_addr = '10.0.0.3'
6405 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6407 cls.create_pg_interfaces(range(2))
6409 cls.pg0.config_ip4()
6410 cls.pg0.resolve_arp()
6412 cls.pg1.config_ip6()
6413 cls.pg1.generate_remote_hosts(2)
6414 cls.pg1.configure_ipv6_neighbors()
6417 super(TestDSlite, cls).tearDownClass()
6420 def test_dslite(self):
6421 """ Test DS-Lite """
6422 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6424 aftr_ip4 = '192.0.0.1'
6425 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6426 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6427 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6428 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6431 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6432 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6433 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6434 UDP(sport=20000, dport=10000))
6435 self.pg1.add_stream(p)
6436 self.pg_enable_capture(self.pg_interfaces)
6438 capture = self.pg0.get_capture(1)
6439 capture = capture[0]
6440 self.assertFalse(capture.haslayer(IPv6))
6441 self.assertEqual(capture[IP].src, self.nat_addr)
6442 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6443 self.assertNotEqual(capture[UDP].sport, 20000)
6444 self.assertEqual(capture[UDP].dport, 10000)
6445 self.assert_packet_checksums_valid(capture)
6446 out_port = capture[UDP].sport
6448 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6449 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6450 UDP(sport=10000, dport=out_port))
6451 self.pg0.add_stream(p)
6452 self.pg_enable_capture(self.pg_interfaces)
6454 capture = self.pg1.get_capture(1)
6455 capture = capture[0]
6456 self.assertEqual(capture[IPv6].src, aftr_ip6)
6457 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6458 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6459 self.assertEqual(capture[IP].dst, '192.168.1.1')
6460 self.assertEqual(capture[UDP].sport, 10000)
6461 self.assertEqual(capture[UDP].dport, 20000)
6462 self.assert_packet_checksums_valid(capture)
6465 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6466 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6467 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6468 TCP(sport=20001, dport=10001))
6469 self.pg1.add_stream(p)
6470 self.pg_enable_capture(self.pg_interfaces)
6472 capture = self.pg0.get_capture(1)
6473 capture = capture[0]
6474 self.assertFalse(capture.haslayer(IPv6))
6475 self.assertEqual(capture[IP].src, self.nat_addr)
6476 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6477 self.assertNotEqual(capture[TCP].sport, 20001)
6478 self.assertEqual(capture[TCP].dport, 10001)
6479 self.assert_packet_checksums_valid(capture)
6480 out_port = capture[TCP].sport
6482 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6483 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6484 TCP(sport=10001, dport=out_port))
6485 self.pg0.add_stream(p)
6486 self.pg_enable_capture(self.pg_interfaces)
6488 capture = self.pg1.get_capture(1)
6489 capture = capture[0]
6490 self.assertEqual(capture[IPv6].src, aftr_ip6)
6491 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6492 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6493 self.assertEqual(capture[IP].dst, '192.168.1.1')
6494 self.assertEqual(capture[TCP].sport, 10001)
6495 self.assertEqual(capture[TCP].dport, 20001)
6496 self.assert_packet_checksums_valid(capture)
6499 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6500 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6501 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6502 ICMP(id=4000, type='echo-request'))
6503 self.pg1.add_stream(p)
6504 self.pg_enable_capture(self.pg_interfaces)
6506 capture = self.pg0.get_capture(1)
6507 capture = capture[0]
6508 self.assertFalse(capture.haslayer(IPv6))
6509 self.assertEqual(capture[IP].src, self.nat_addr)
6510 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6511 self.assertNotEqual(capture[ICMP].id, 4000)
6512 self.assert_packet_checksums_valid(capture)
6513 out_id = capture[ICMP].id
6515 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6516 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6517 ICMP(id=out_id, type='echo-reply'))
6518 self.pg0.add_stream(p)
6519 self.pg_enable_capture(self.pg_interfaces)
6521 capture = self.pg1.get_capture(1)
6522 capture = capture[0]
6523 self.assertEqual(capture[IPv6].src, aftr_ip6)
6524 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6525 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6526 self.assertEqual(capture[IP].dst, '192.168.1.1')
6527 self.assertEqual(capture[ICMP].id, 4000)
6528 self.assert_packet_checksums_valid(capture)
6530 # ping DS-Lite AFTR tunnel endpoint address
6531 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6532 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6533 ICMPv6EchoRequest())
6534 self.pg1.add_stream(p)
6535 self.pg_enable_capture(self.pg_interfaces)
6537 capture = self.pg1.get_capture(1)
6538 self.assertEqual(1, len(capture))
6539 capture = capture[0]
6540 self.assertEqual(capture[IPv6].src, aftr_ip6)
6541 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6542 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6545 super(TestDSlite, self).tearDown()
6546 if not self.vpp_dead:
6547 self.logger.info(self.vapi.cli("show dslite pool"))
6549 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6550 self.logger.info(self.vapi.cli("show dslite sessions"))
6553 class TestDSliteCE(MethodHolder):
6554 """ DS-Lite CE Test Cases """
6557 def setUpConstants(cls):
6558 super(TestDSliteCE, cls).setUpConstants()
6559 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6562 def setUpClass(cls):
6563 super(TestDSliteCE, cls).setUpClass()
6566 cls.create_pg_interfaces(range(2))
6568 cls.pg0.config_ip4()
6569 cls.pg0.resolve_arp()
6571 cls.pg1.config_ip6()
6572 cls.pg1.generate_remote_hosts(1)
6573 cls.pg1.configure_ipv6_neighbors()
6576 super(TestDSliteCE, cls).tearDownClass()
6579 def test_dslite_ce(self):
6580 """ Test DS-Lite CE """
6582 b4_ip4 = '192.0.0.2'
6583 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6584 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6585 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6586 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6588 aftr_ip4 = '192.0.0.1'
6589 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6590 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6591 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6592 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6594 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6595 dst_address_length=128,
6596 next_hop_address=self.pg1.remote_ip6n,
6597 next_hop_sw_if_index=self.pg1.sw_if_index,
6601 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6602 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6603 UDP(sport=10000, dport=20000))
6604 self.pg0.add_stream(p)
6605 self.pg_enable_capture(self.pg_interfaces)
6607 capture = self.pg1.get_capture(1)
6608 capture = capture[0]
6609 self.assertEqual(capture[IPv6].src, b4_ip6)
6610 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6611 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6612 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6613 self.assertEqual(capture[UDP].sport, 10000)
6614 self.assertEqual(capture[UDP].dport, 20000)
6615 self.assert_packet_checksums_valid(capture)
6618 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6619 IPv6(dst=b4_ip6, src=aftr_ip6) /
6620 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6621 UDP(sport=20000, dport=10000))
6622 self.pg1.add_stream(p)
6623 self.pg_enable_capture(self.pg_interfaces)
6625 capture = self.pg0.get_capture(1)
6626 capture = capture[0]
6627 self.assertFalse(capture.haslayer(IPv6))
6628 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6629 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6630 self.assertEqual(capture[UDP].sport, 20000)
6631 self.assertEqual(capture[UDP].dport, 10000)
6632 self.assert_packet_checksums_valid(capture)
6634 # ping DS-Lite B4 tunnel endpoint address
6635 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6636 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6637 ICMPv6EchoRequest())
6638 self.pg1.add_stream(p)
6639 self.pg_enable_capture(self.pg_interfaces)
6641 capture = self.pg1.get_capture(1)
6642 self.assertEqual(1, len(capture))
6643 capture = capture[0]
6644 self.assertEqual(capture[IPv6].src, b4_ip6)
6645 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6646 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6649 super(TestDSliteCE, self).tearDown()
6650 if not self.vpp_dead:
6652 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6654 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6657 class TestNAT66(MethodHolder):
6658 """ NAT66 Test Cases """
6661 def setUpClass(cls):
6662 super(TestNAT66, cls).setUpClass()
6665 cls.nat_addr = 'fd01:ff::2'
6666 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6668 cls.create_pg_interfaces(range(2))
6669 cls.interfaces = list(cls.pg_interfaces)
6671 for i in cls.interfaces:
6674 i.configure_ipv6_neighbors()
6677 super(TestNAT66, cls).tearDownClass()
6680 def test_static(self):
6681 """ 1:1 NAT66 test """
6682 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6683 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6684 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6689 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6690 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6693 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6694 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6697 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6698 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6699 ICMPv6EchoRequest())
6701 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6702 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6703 GRE() / IP() / TCP())
6705 self.pg0.add_stream(pkts)
6706 self.pg_enable_capture(self.pg_interfaces)
6708 capture = self.pg1.get_capture(len(pkts))
6709 for packet in capture:
6711 self.assertEqual(packet[IPv6].src, self.nat_addr)
6712 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6713 self.assert_packet_checksums_valid(packet)
6715 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6720 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6721 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6724 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6725 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6728 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6729 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6732 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6733 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6734 GRE() / IP() / TCP())
6736 self.pg1.add_stream(pkts)
6737 self.pg_enable_capture(self.pg_interfaces)
6739 capture = self.pg0.get_capture(len(pkts))
6740 for packet in capture:
6742 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6743 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6744 self.assert_packet_checksums_valid(packet)
6746 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6749 sm = self.vapi.nat66_static_mapping_dump()
6750 self.assertEqual(len(sm), 1)
6751 self.assertEqual(sm[0].total_pkts, 8)
6753 def test_check_no_translate(self):
6754 """ NAT66 translate only when egress interface is outside interface """
6755 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6756 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6757 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6761 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6762 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6764 self.pg0.add_stream([p])
6765 self.pg_enable_capture(self.pg_interfaces)
6767 capture = self.pg1.get_capture(1)
6770 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6771 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6773 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6776 def clear_nat66(self):
6778 Clear NAT66 configuration.
6780 interfaces = self.vapi.nat66_interface_dump()
6781 for intf in interfaces:
6782 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6786 static_mappings = self.vapi.nat66_static_mapping_dump()
6787 for sm in static_mappings:
6788 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6789 sm.external_ip_address,
6794 super(TestNAT66, self).tearDown()
6795 if not self.vpp_dead:
6796 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6797 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6801 if __name__ == '__main__':
6802 unittest.main(testRunner=VppTestRunner)