9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
13 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
14 from scapy.layers.l2 import Ether, ARP, GRE
15 from scapy.data import IP_PROTOS
16 from scapy.packet import bind_layers, Raw
17 from scapy.all import fragment6
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from time import sleep
21 from util import ip4_range
22 from util import mactobinary
25 class MethodHolder(VppTestCase):
26 """ NAT create capture and verify method holder """
28 def clear_nat44(self):
30 Clear NAT44 configuration.
32 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
33 # I found no elegant way to do this
34 self.vapi.ip_add_del_route(
35 dst_address=self.pg7.remote_ip4n,
36 dst_address_length=32,
37 next_hop_address=self.pg7.remote_ip4n,
38 next_hop_sw_if_index=self.pg7.sw_if_index,
40 self.vapi.ip_add_del_route(
41 dst_address=self.pg8.remote_ip4n,
42 dst_address_length=32,
43 next_hop_address=self.pg8.remote_ip4n,
44 next_hop_sw_if_index=self.pg8.sw_if_index,
47 for intf in [self.pg7, self.pg8]:
48 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
50 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
55 if self.pg7.has_ip4_config:
56 self.pg7.unconfig_ip4()
58 self.vapi.nat44_forwarding_enable_disable(0)
60 interfaces = self.vapi.nat44_interface_addr_dump()
61 for intf in interfaces:
62 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
63 twice_nat=intf.twice_nat,
66 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
67 domain_id=self.ipfix_domain_id)
68 self.ipfix_src_port = 4739
69 self.ipfix_domain_id = 1
71 interfaces = self.vapi.nat44_interface_dump()
72 for intf in interfaces:
73 if intf.is_inside > 1:
74 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
77 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
81 interfaces = self.vapi.nat44_interface_output_feature_dump()
82 for intf in interfaces:
83 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
87 static_mappings = self.vapi.nat44_static_mapping_dump()
88 for sm in static_mappings:
89 self.vapi.nat44_add_del_static_mapping(
91 sm.external_ip_address,
92 local_port=sm.local_port,
93 external_port=sm.external_port,
94 addr_only=sm.addr_only,
97 twice_nat=sm.twice_nat,
98 self_twice_nat=sm.self_twice_nat,
99 out2in_only=sm.out2in_only,
101 external_sw_if_index=sm.external_sw_if_index,
104 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
105 for lb_sm in lb_static_mappings:
106 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
140 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
141 local_port=0, external_port=0, vrf_id=0,
142 is_add=1, external_sw_if_index=0xFFFFFFFF,
143 proto=0, twice_nat=0, self_twice_nat=0,
144 out2in_only=0, tag=""):
146 Add/delete NAT44 static mapping
148 :param local_ip: Local IP address
149 :param external_ip: External IP address
150 :param local_port: Local port number (Optional)
151 :param external_port: External port number (Optional)
152 :param vrf_id: VRF ID (Default 0)
153 :param is_add: 1 if add, 0 if delete (Default add)
154 :param external_sw_if_index: External interface instead of IP address
155 :param proto: IP protocol (Mandatory if port specified)
156 :param twice_nat: 1 if translate external host address and port
157 :param self_twice_nat: 1 if translate external host address and port
158 whenever external host address equals
159 local address of internal host
160 :param out2in_only: if 1 rule is matching only out2in direction
161 :param tag: Opaque string tag
164 if local_port and external_port:
166 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
167 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
168 self.vapi.nat44_add_del_static_mapping(
171 external_sw_if_index,
183 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
185 Add/delete NAT44 address
187 :param ip: IP address
188 :param is_add: 1 if add, 0 if delete (Default add)
189 :param twice_nat: twice NAT address for extenal hosts
191 nat_addr = socket.inet_pton(socket.AF_INET, ip)
192 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
196 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
198 Create packet stream for inside network
200 :param in_if: Inside interface
201 :param out_if: Outside interface
202 :param dst_ip: Destination address
203 :param ttl: TTL of generated packets
206 dst_ip = out_if.remote_ip4
210 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
211 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
212 TCP(sport=self.tcp_port_in, dport=20))
216 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
217 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
218 UDP(sport=self.udp_port_in, dport=20))
222 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
224 ICMP(id=self.icmp_id_in, type='echo-request'))
229 def compose_ip6(self, ip4, pref, plen):
231 Compose IPv4-embedded IPv6 addresses
233 :param ip4: IPv4 address
234 :param pref: IPv6 prefix
235 :param plen: IPv6 prefix length
236 :returns: IPv4-embedded IPv6 addresses
238 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
239 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
254 pref_n[10] = ip4_n[3]
258 pref_n[10] = ip4_n[2]
259 pref_n[11] = ip4_n[3]
262 pref_n[10] = ip4_n[1]
263 pref_n[11] = ip4_n[2]
264 pref_n[12] = ip4_n[3]
266 pref_n[12] = ip4_n[0]
267 pref_n[13] = ip4_n[1]
268 pref_n[14] = ip4_n[2]
269 pref_n[15] = ip4_n[3]
270 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
272 def extract_ip4(self, ip6, plen):
274 Extract IPv4 address embedded in IPv6 addresses
276 :param ip6: IPv6 address
277 :param plen: IPv6 prefix length
278 :returns: extracted IPv4 address
280 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
312 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
314 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
316 Create IPv6 packet stream for inside network
318 :param in_if: Inside interface
319 :param out_if: Outside interface
320 :param ttl: Hop Limit of generated packets
321 :param pref: NAT64 prefix
322 :param plen: NAT64 prefix length
326 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
328 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
331 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
332 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
333 TCP(sport=self.tcp_port_in, dport=20))
337 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
338 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
339 UDP(sport=self.udp_port_in, dport=20))
343 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
344 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
345 ICMPv6EchoRequest(id=self.icmp_id_in))
350 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
351 use_inside_ports=False):
353 Create packet stream for outside network
355 :param out_if: Outside interface
356 :param dst_ip: Destination IP address (Default use global NAT address)
357 :param ttl: TTL of generated packets
358 :param use_inside_ports: Use inside NAT ports as destination ports
359 instead of outside ports
362 dst_ip = self.nat_addr
363 if not use_inside_ports:
364 tcp_port = self.tcp_port_out
365 udp_port = self.udp_port_out
366 icmp_id = self.icmp_id_out
368 tcp_port = self.tcp_port_in
369 udp_port = self.udp_port_in
370 icmp_id = self.icmp_id_in
373 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
374 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
375 TCP(dport=tcp_port, sport=20))
379 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
380 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
381 UDP(dport=udp_port, sport=20))
385 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
386 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
387 ICMP(id=icmp_id, type='echo-reply'))
392 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
394 Create packet stream for outside network
396 :param out_if: Outside interface
397 :param dst_ip: Destination IP address (Default use global NAT address)
398 :param hl: HL of generated packets
402 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
403 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
404 TCP(dport=self.tcp_port_out, sport=20))
408 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
409 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
410 UDP(dport=self.udp_port_out, sport=20))
414 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
415 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
416 ICMPv6EchoReply(id=self.icmp_id_out))
421 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
422 packet_num=3, dst_ip=None, is_ip6=False):
424 Verify captured packets on outside network
426 :param capture: Captured packets
427 :param nat_ip: Translated IP address (Default use global NAT address)
428 :param same_port: Sorce port number is not translated (Default False)
429 :param packet_num: Expected number of packets (Default 3)
430 :param dst_ip: Destination IP address (Default do not verify)
431 :param is_ip6: If L3 protocol is IPv6 (Default False)
435 ICMP46 = ICMPv6EchoRequest
440 nat_ip = self.nat_addr
441 self.assertEqual(packet_num, len(capture))
442 for packet in capture:
445 self.assert_packet_checksums_valid(packet)
446 self.assertEqual(packet[IP46].src, nat_ip)
447 if dst_ip is not None:
448 self.assertEqual(packet[IP46].dst, dst_ip)
449 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
454 packet[TCP].sport, self.tcp_port_in)
455 self.tcp_port_out = packet[TCP].sport
456 self.assert_packet_checksums_valid(packet)
457 elif packet.haslayer(UDP):
459 self.assertEqual(packet[UDP].sport, self.udp_port_in)
462 packet[UDP].sport, self.udp_port_in)
463 self.udp_port_out = packet[UDP].sport
466 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
468 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.icmp_id_out = packet[ICMP46].id
470 self.assert_packet_checksums_valid(packet)
472 self.logger.error(ppp("Unexpected or invalid packet "
473 "(outside network):", packet))
476 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
477 packet_num=3, dst_ip=None):
479 Verify captured packets on outside network
481 :param capture: Captured packets
482 :param nat_ip: Translated IP address
483 :param same_port: Sorce port number is not translated (Default False)
484 :param packet_num: Expected number of packets (Default 3)
485 :param dst_ip: Destination IP address (Default do not verify)
487 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
490 def verify_capture_in(self, capture, in_if, packet_num=3):
492 Verify captured packets on inside network
494 :param capture: Captured packets
495 :param in_if: Inside interface
496 :param packet_num: Expected number of packets (Default 3)
498 self.assertEqual(packet_num, len(capture))
499 for packet in capture:
501 self.assert_packet_checksums_valid(packet)
502 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
503 if packet.haslayer(TCP):
504 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
505 elif packet.haslayer(UDP):
506 self.assertEqual(packet[UDP].dport, self.udp_port_in)
508 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
510 self.logger.error(ppp("Unexpected or invalid packet "
511 "(inside network):", packet))
514 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
516 Verify captured IPv6 packets on inside network
518 :param capture: Captured packets
519 :param src_ip: Source IP
520 :param dst_ip: Destination IP address
521 :param packet_num: Expected number of packets (Default 3)
523 self.assertEqual(packet_num, len(capture))
524 for packet in capture:
526 self.assertEqual(packet[IPv6].src, src_ip)
527 self.assertEqual(packet[IPv6].dst, dst_ip)
528 self.assert_packet_checksums_valid(packet)
529 if packet.haslayer(TCP):
530 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
531 elif packet.haslayer(UDP):
532 self.assertEqual(packet[UDP].dport, self.udp_port_in)
534 self.assertEqual(packet[ICMPv6EchoReply].id,
537 self.logger.error(ppp("Unexpected or invalid packet "
538 "(inside network):", packet))
541 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
543 Verify captured packet that don't have to be translated
545 :param capture: Captured packets
546 :param ingress_if: Ingress interface
547 :param egress_if: Egress interface
549 for packet in capture:
551 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
552 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
553 if packet.haslayer(TCP):
554 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
555 elif packet.haslayer(UDP):
556 self.assertEqual(packet[UDP].sport, self.udp_port_in)
558 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
560 self.logger.error(ppp("Unexpected or invalid packet "
561 "(inside network):", packet))
564 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
565 packet_num=3, icmp_type=11):
567 Verify captured packets with ICMP errors on outside network
569 :param capture: Captured packets
570 :param src_ip: Translated IP address or IP address of VPP
571 (Default use global NAT address)
572 :param packet_num: Expected number of packets (Default 3)
573 :param icmp_type: Type of error ICMP packet
574 we are expecting (Default 11)
577 src_ip = self.nat_addr
578 self.assertEqual(packet_num, len(capture))
579 for packet in capture:
581 self.assertEqual(packet[IP].src, src_ip)
582 self.assertTrue(packet.haslayer(ICMP))
584 self.assertEqual(icmp.type, icmp_type)
585 self.assertTrue(icmp.haslayer(IPerror))
586 inner_ip = icmp[IPerror]
587 if inner_ip.haslayer(TCPerror):
588 self.assertEqual(inner_ip[TCPerror].dport,
590 elif inner_ip.haslayer(UDPerror):
591 self.assertEqual(inner_ip[UDPerror].dport,
594 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
596 self.logger.error(ppp("Unexpected or invalid packet "
597 "(outside network):", packet))
600 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
603 Verify captured packets with ICMP errors on inside network
605 :param capture: Captured packets
606 :param in_if: Inside interface
607 :param packet_num: Expected number of packets (Default 3)
608 :param icmp_type: Type of error ICMP packet
609 we are expecting (Default 11)
611 self.assertEqual(packet_num, len(capture))
612 for packet in capture:
614 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
615 self.assertTrue(packet.haslayer(ICMP))
617 self.assertEqual(icmp.type, icmp_type)
618 self.assertTrue(icmp.haslayer(IPerror))
619 inner_ip = icmp[IPerror]
620 if inner_ip.haslayer(TCPerror):
621 self.assertEqual(inner_ip[TCPerror].sport,
623 elif inner_ip.haslayer(UDPerror):
624 self.assertEqual(inner_ip[UDPerror].sport,
627 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
629 self.logger.error(ppp("Unexpected or invalid packet "
630 "(inside network):", packet))
633 def create_stream_frag(self, src_if, dst, sport, dport, data):
635 Create fragmented packet stream
637 :param src_if: Source interface
638 :param dst: Destination IPv4 address
639 :param sport: Source TCP port
640 :param dport: Destination TCP port
641 :param data: Payload data
644 id = random.randint(0, 65535)
645 p = (IP(src=src_if.remote_ip4, dst=dst) /
646 TCP(sport=sport, dport=dport) /
648 p = p.__class__(str(p))
649 chksum = p['TCP'].chksum
651 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
652 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
653 TCP(sport=sport, dport=dport, chksum=chksum) /
656 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
657 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
658 proto=IP_PROTOS.tcp) /
661 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
662 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
668 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
669 pref=None, plen=0, frag_size=128):
671 Create fragmented packet stream
673 :param src_if: Source interface
674 :param dst: Destination IPv4 address
675 :param sport: Source TCP port
676 :param dport: Destination TCP port
677 :param data: Payload data
678 :param pref: NAT64 prefix
679 :param plen: NAT64 prefix length
680 :param fragsize: size of fragments
684 dst_ip6 = ''.join(['64:ff9b::', dst])
686 dst_ip6 = self.compose_ip6(dst, pref, plen)
688 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
689 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
690 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
691 TCP(sport=sport, dport=dport) /
694 return fragment6(p, frag_size)
696 def reass_frags_and_verify(self, frags, src, dst):
698 Reassemble and verify fragmented packet
700 :param frags: Captured fragments
701 :param src: Source IPv4 address to verify
702 :param dst: Destination IPv4 address to verify
704 :returns: Reassembled IPv4 packet
706 buffer = StringIO.StringIO()
708 self.assertEqual(p[IP].src, src)
709 self.assertEqual(p[IP].dst, dst)
710 self.assert_ip_checksum_valid(p)
711 buffer.seek(p[IP].frag * 8)
712 buffer.write(p[IP].payload)
713 ip = frags[0].getlayer(IP)
714 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
715 proto=frags[0][IP].proto)
716 if ip.proto == IP_PROTOS.tcp:
717 p = (ip / TCP(buffer.getvalue()))
718 self.assert_tcp_checksum_valid(p)
719 elif ip.proto == IP_PROTOS.udp:
720 p = (ip / UDP(buffer.getvalue()))
723 def reass_frags_and_verify_ip6(self, frags, src, dst):
725 Reassemble and verify fragmented packet
727 :param frags: Captured fragments
728 :param src: Source IPv6 address to verify
729 :param dst: Destination IPv6 address to verify
731 :returns: Reassembled IPv6 packet
733 buffer = StringIO.StringIO()
735 self.assertEqual(p[IPv6].src, src)
736 self.assertEqual(p[IPv6].dst, dst)
737 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
738 buffer.write(p[IPv6ExtHdrFragment].payload)
739 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
740 nh=frags[0][IPv6ExtHdrFragment].nh)
741 if ip.nh == IP_PROTOS.tcp:
742 p = (ip / TCP(buffer.getvalue()))
743 elif ip.nh == IP_PROTOS.udp:
744 p = (ip / UDP(buffer.getvalue()))
745 self.assert_packet_checksums_valid(p)
748 def initiate_tcp_session(self, in_if, out_if):
750 Initiates TCP session
752 :param in_if: Inside interface
753 :param out_if: Outside interface
757 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
758 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
759 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
762 self.pg_enable_capture(self.pg_interfaces)
764 capture = out_if.get_capture(1)
766 self.tcp_port_out = p[TCP].sport
768 # SYN + ACK packet out->in
769 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
770 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
771 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
774 self.pg_enable_capture(self.pg_interfaces)
779 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
780 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
781 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
784 self.pg_enable_capture(self.pg_interfaces)
786 out_if.get_capture(1)
789 self.logger.error("TCP 3 way handshake failed")
792 def verify_ipfix_nat44_ses(self, data):
794 Verify IPFIX NAT44 session create/delete event
796 :param data: Decoded IPFIX data records
798 nat44_ses_create_num = 0
799 nat44_ses_delete_num = 0
800 self.assertEqual(6, len(data))
803 self.assertIn(ord(record[230]), [4, 5])
804 if ord(record[230]) == 4:
805 nat44_ses_create_num += 1
807 nat44_ses_delete_num += 1
809 self.assertEqual(self.pg0.remote_ip4n, record[8])
810 # postNATSourceIPv4Address
811 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
814 self.assertEqual(struct.pack("!I", 0), record[234])
815 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
816 if IP_PROTOS.icmp == ord(record[4]):
817 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
818 self.assertEqual(struct.pack("!H", self.icmp_id_out),
820 elif IP_PROTOS.tcp == ord(record[4]):
821 self.assertEqual(struct.pack("!H", self.tcp_port_in),
823 self.assertEqual(struct.pack("!H", self.tcp_port_out),
825 elif IP_PROTOS.udp == ord(record[4]):
826 self.assertEqual(struct.pack("!H", self.udp_port_in),
828 self.assertEqual(struct.pack("!H", self.udp_port_out),
831 self.fail("Invalid protocol")
832 self.assertEqual(3, nat44_ses_create_num)
833 self.assertEqual(3, nat44_ses_delete_num)
835 def verify_ipfix_addr_exhausted(self, data):
837 Verify IPFIX NAT addresses event
839 :param data: Decoded IPFIX data records
841 self.assertEqual(1, len(data))
844 self.assertEqual(ord(record[230]), 3)
846 self.assertEqual(struct.pack("!I", 0), record[283])
848 def verify_ipfix_max_sessions(self, data, limit):
850 Verify IPFIX maximum session entries exceeded event
852 :param data: Decoded IPFIX data records
853 :param limit: Number of maximum session entries that can be created.
855 self.assertEqual(1, len(data))
858 self.assertEqual(ord(record[230]), 13)
859 # natQuotaExceededEvent
860 self.assertEqual(struct.pack("I", 1), record[466])
862 self.assertEqual(struct.pack("I", limit), record[471])
864 def verify_ipfix_max_bibs(self, data, limit):
866 Verify IPFIX maximum BIB entries exceeded event
868 :param data: Decoded IPFIX data records
869 :param limit: Number of maximum BIB entries that can be created.
871 self.assertEqual(1, len(data))
874 self.assertEqual(ord(record[230]), 13)
875 # natQuotaExceededEvent
876 self.assertEqual(struct.pack("I", 2), record[466])
878 self.assertEqual(struct.pack("I", limit), record[472])
880 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
882 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
884 :param data: Decoded IPFIX data records
885 :param limit: Number of maximum fragments pending reassembly
886 :param src_addr: IPv6 source address
888 self.assertEqual(1, len(data))
891 self.assertEqual(ord(record[230]), 13)
892 # natQuotaExceededEvent
893 self.assertEqual(struct.pack("I", 5), record[466])
894 # maxFragmentsPendingReassembly
895 self.assertEqual(struct.pack("I", limit), record[475])
897 self.assertEqual(src_addr, record[27])
899 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
901 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
903 :param data: Decoded IPFIX data records
904 :param limit: Number of maximum fragments pending reassembly
905 :param src_addr: IPv4 source address
907 self.assertEqual(1, len(data))
910 self.assertEqual(ord(record[230]), 13)
911 # natQuotaExceededEvent
912 self.assertEqual(struct.pack("I", 5), record[466])
913 # maxFragmentsPendingReassembly
914 self.assertEqual(struct.pack("I", limit), record[475])
916 self.assertEqual(src_addr, record[8])
918 def verify_ipfix_bib(self, data, is_create, src_addr):
920 Verify IPFIX NAT64 BIB create and delete events
922 :param data: Decoded IPFIX data records
923 :param is_create: Create event if nonzero value otherwise delete event
924 :param src_addr: IPv6 source address
926 self.assertEqual(1, len(data))
930 self.assertEqual(ord(record[230]), 10)
932 self.assertEqual(ord(record[230]), 11)
934 self.assertEqual(src_addr, record[27])
935 # postNATSourceIPv4Address
936 self.assertEqual(self.nat_addr_n, record[225])
938 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
940 self.assertEqual(struct.pack("!I", 0), record[234])
941 # sourceTransportPort
942 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
943 # postNAPTSourceTransportPort
944 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
946 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
949 Verify IPFIX NAT64 session create and delete events
951 :param data: Decoded IPFIX data records
952 :param is_create: Create event if nonzero value otherwise delete event
953 :param src_addr: IPv6 source address
954 :param dst_addr: IPv4 destination address
955 :param dst_port: destination TCP port
957 self.assertEqual(1, len(data))
961 self.assertEqual(ord(record[230]), 6)
963 self.assertEqual(ord(record[230]), 7)
965 self.assertEqual(src_addr, record[27])
966 # destinationIPv6Address
967 self.assertEqual(socket.inet_pton(socket.AF_INET6,
968 self.compose_ip6(dst_addr,
972 # postNATSourceIPv4Address
973 self.assertEqual(self.nat_addr_n, record[225])
974 # postNATDestinationIPv4Address
975 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
978 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
980 self.assertEqual(struct.pack("!I", 0), record[234])
981 # sourceTransportPort
982 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
983 # postNAPTSourceTransportPort
984 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
985 # destinationTransportPort
986 self.assertEqual(struct.pack("!H", dst_port), record[11])
987 # postNAPTDestinationTransportPort
988 self.assertEqual(struct.pack("!H", dst_port), record[228])
991 class TestNAT44(MethodHolder):
992 """ NAT44 Test Cases """
996 super(TestNAT44, cls).setUpClass()
997 cls.vapi.cli("set log class nat level debug")
1000 cls.tcp_port_in = 6303
1001 cls.tcp_port_out = 6303
1002 cls.udp_port_in = 6304
1003 cls.udp_port_out = 6304
1004 cls.icmp_id_in = 6305
1005 cls.icmp_id_out = 6305
1006 cls.nat_addr = '10.0.0.3'
1007 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1008 cls.ipfix_src_port = 4739
1009 cls.ipfix_domain_id = 1
1010 cls.tcp_external_port = 80
1012 cls.create_pg_interfaces(range(10))
1013 cls.interfaces = list(cls.pg_interfaces[0:4])
1015 for i in cls.interfaces:
1020 cls.pg0.generate_remote_hosts(3)
1021 cls.pg0.configure_ipv4_neighbors()
1023 cls.pg1.generate_remote_hosts(1)
1024 cls.pg1.configure_ipv4_neighbors()
1026 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1027 cls.vapi.ip_table_add_del(10, is_add=1)
1028 cls.vapi.ip_table_add_del(20, is_add=1)
1030 cls.pg4._local_ip4 = "172.16.255.1"
1031 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1032 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1033 cls.pg4.set_table_ip4(10)
1034 cls.pg5._local_ip4 = "172.17.255.3"
1035 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1036 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1037 cls.pg5.set_table_ip4(10)
1038 cls.pg6._local_ip4 = "172.16.255.1"
1039 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1040 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1041 cls.pg6.set_table_ip4(20)
1042 for i in cls.overlapping_interfaces:
1050 cls.pg9.generate_remote_hosts(2)
1051 cls.pg9.config_ip4()
1052 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1053 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1057 cls.pg9.resolve_arp()
1058 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1059 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1060 cls.pg9.resolve_arp()
1063 super(TestNAT44, cls).tearDownClass()
1066 def test_dynamic(self):
1067 """ NAT44 dynamic translation test """
1069 self.nat44_add_address(self.nat_addr)
1070 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1071 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075 pkts = self.create_stream_in(self.pg0, self.pg1)
1076 self.pg0.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg1.get_capture(len(pkts))
1080 self.verify_capture_out(capture)
1083 pkts = self.create_stream_out(self.pg1)
1084 self.pg1.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg0.get_capture(len(pkts))
1088 self.verify_capture_in(capture, self.pg0)
1090 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1091 """ NAT44 handling of client packets with TTL=1 """
1093 self.nat44_add_address(self.nat_addr)
1094 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1095 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1098 # Client side - generate traffic
1099 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1100 self.pg0.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1104 # Client side - verify ICMP type 11 packets
1105 capture = self.pg0.get_capture(len(pkts))
1106 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1108 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1109 """ NAT44 handling of server packets with TTL=1 """
1111 self.nat44_add_address(self.nat_addr)
1112 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1113 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1116 # Client side - create sessions
1117 pkts = self.create_stream_in(self.pg0, self.pg1)
1118 self.pg0.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1122 # Server side - generate traffic
1123 capture = self.pg1.get_capture(len(pkts))
1124 self.verify_capture_out(capture)
1125 pkts = self.create_stream_out(self.pg1, ttl=1)
1126 self.pg1.add_stream(pkts)
1127 self.pg_enable_capture(self.pg_interfaces)
1130 # Server side - verify ICMP type 11 packets
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out_with_icmp_errors(capture,
1133 src_ip=self.pg1.local_ip4)
1135 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1136 """ NAT44 handling of error responses to client packets with TTL=2 """
1138 self.nat44_add_address(self.nat_addr)
1139 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143 # Client side - generate traffic
1144 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1145 self.pg0.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1149 # Server side - simulate ICMP type 11 response
1150 capture = self.pg1.get_capture(len(pkts))
1151 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1152 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1153 ICMP(type=11) / packet[IP] for packet in capture]
1154 self.pg1.add_stream(pkts)
1155 self.pg_enable_capture(self.pg_interfaces)
1158 # Client side - verify ICMP type 11 packets
1159 capture = self.pg0.get_capture(len(pkts))
1160 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1162 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1163 """ NAT44 handling of error responses to server packets with TTL=2 """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170 # Client side - create sessions
1171 pkts = self.create_stream_in(self.pg0, self.pg1)
1172 self.pg0.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Server side - generate traffic
1177 capture = self.pg1.get_capture(len(pkts))
1178 self.verify_capture_out(capture)
1179 pkts = self.create_stream_out(self.pg1, ttl=2)
1180 self.pg1.add_stream(pkts)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 # Client side - simulate ICMP type 11 response
1185 capture = self.pg0.get_capture(len(pkts))
1186 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1187 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1188 ICMP(type=11) / packet[IP] for packet in capture]
1189 self.pg0.add_stream(pkts)
1190 self.pg_enable_capture(self.pg_interfaces)
1193 # Server side - verify ICMP type 11 packets
1194 capture = self.pg1.get_capture(len(pkts))
1195 self.verify_capture_out_with_icmp_errors(capture)
1197 def test_ping_out_interface_from_outside(self):
1198 """ Ping NAT44 out interface from outside network """
1200 self.nat44_add_address(self.nat_addr)
1201 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1202 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1206 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1207 ICMP(id=self.icmp_id_out, type='echo-request'))
1209 self.pg1.add_stream(pkts)
1210 self.pg_enable_capture(self.pg_interfaces)
1212 capture = self.pg1.get_capture(len(pkts))
1213 self.assertEqual(1, len(capture))
1216 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1217 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1218 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1219 self.assertEqual(packet[ICMP].type, 0) # echo reply
1221 self.logger.error(ppp("Unexpected or invalid packet "
1222 "(outside network):", packet))
1225 def test_ping_internal_host_from_outside(self):
1226 """ Ping internal host from outside network """
1228 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1229 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1230 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1234 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1235 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1236 ICMP(id=self.icmp_id_out, type='echo-request'))
1237 self.pg1.add_stream(pkt)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg0.get_capture(1)
1241 self.verify_capture_in(capture, self.pg0, packet_num=1)
1242 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1245 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1246 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1247 ICMP(id=self.icmp_id_in, type='echo-reply'))
1248 self.pg0.add_stream(pkt)
1249 self.pg_enable_capture(self.pg_interfaces)
1251 capture = self.pg1.get_capture(1)
1252 self.verify_capture_out(capture, same_port=True, packet_num=1)
1253 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1255 def _test_forwarding(self):
1256 """ NAT44 forwarding test """
1258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 self.vapi.nat44_forwarding_enable_disable(1)
1263 real_ip = self.pg0.remote_ip4n
1264 alias_ip = self.nat_addr_n
1265 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1266 external_ip=alias_ip)
1269 # in2out - static mapping match
1271 pkts = self.create_stream_out(self.pg1)
1272 self.pg1.add_stream(pkts)
1273 self.pg_enable_capture(self.pg_interfaces)
1275 capture = self.pg0.get_capture(len(pkts))
1276 self.verify_capture_in(capture, self.pg0)
1278 pkts = self.create_stream_in(self.pg0, self.pg1)
1279 self.pg0.add_stream(pkts)
1280 self.pg_enable_capture(self.pg_interfaces)
1282 capture = self.pg1.get_capture(len(pkts))
1283 self.verify_capture_out(capture, same_port=True)
1285 # in2out - no static mapping match
1287 host0 = self.pg0.remote_hosts[0]
1288 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1290 pkts = self.create_stream_out(self.pg1,
1291 dst_ip=self.pg0.remote_ip4,
1292 use_inside_ports=True)
1293 self.pg1.add_stream(pkts)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(len(pkts))
1297 self.verify_capture_in(capture, self.pg0)
1299 pkts = self.create_stream_in(self.pg0, self.pg1)
1300 self.pg0.add_stream(pkts)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(len(pkts))
1304 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1307 self.pg0.remote_hosts[0] = host0
1309 user = self.pg0.remote_hosts[1]
1310 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1311 self.assertEqual(len(sessions), 3)
1312 self.assertTrue(sessions[0].ext_host_valid)
1313 self.vapi.nat44_del_session(
1314 sessions[0].inside_ip_address,
1315 sessions[0].inside_port,
1316 sessions[0].protocol,
1317 ext_host_address=sessions[0].ext_host_address,
1318 ext_host_port=sessions[0].ext_host_port)
1319 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1320 self.assertEqual(len(sessions), 2)
1323 self.vapi.nat44_forwarding_enable_disable(0)
1324 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1325 external_ip=alias_ip,
1328 def test_static_in(self):
1329 """ 1:1 NAT initialized from inside network """
1331 nat_ip = "10.0.0.10"
1332 self.tcp_port_out = 6303
1333 self.udp_port_out = 6304
1334 self.icmp_id_out = 6305
1336 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1337 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1338 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1340 sm = self.vapi.nat44_static_mapping_dump()
1341 self.assertEqual(len(sm), 1)
1342 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1343 self.assertEqual(sm[0].protocol, 0)
1344 self.assertEqual(sm[0].local_port, 0)
1345 self.assertEqual(sm[0].external_port, 0)
1348 pkts = self.create_stream_in(self.pg0, self.pg1)
1349 self.pg0.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg1.get_capture(len(pkts))
1353 self.verify_capture_out(capture, nat_ip, True)
1356 pkts = self.create_stream_out(self.pg1, nat_ip)
1357 self.pg1.add_stream(pkts)
1358 self.pg_enable_capture(self.pg_interfaces)
1360 capture = self.pg0.get_capture(len(pkts))
1361 self.verify_capture_in(capture, self.pg0)
1363 def test_static_out(self):
1364 """ 1:1 NAT initialized from outside network """
1366 nat_ip = "10.0.0.20"
1367 self.tcp_port_out = 6303
1368 self.udp_port_out = 6304
1369 self.icmp_id_out = 6305
1372 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1373 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1374 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1376 sm = self.vapi.nat44_static_mapping_dump()
1377 self.assertEqual(len(sm), 1)
1378 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1381 pkts = self.create_stream_out(self.pg1, nat_ip)
1382 self.pg1.add_stream(pkts)
1383 self.pg_enable_capture(self.pg_interfaces)
1385 capture = self.pg0.get_capture(len(pkts))
1386 self.verify_capture_in(capture, self.pg0)
1389 pkts = self.create_stream_in(self.pg0, self.pg1)
1390 self.pg0.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg1.get_capture(len(pkts))
1394 self.verify_capture_out(capture, nat_ip, True)
1396 def test_static_with_port_in(self):
1397 """ 1:1 NAPT initialized from inside network """
1399 self.tcp_port_out = 3606
1400 self.udp_port_out = 3607
1401 self.icmp_id_out = 3608
1403 self.nat44_add_address(self.nat_addr)
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1405 self.tcp_port_in, self.tcp_port_out,
1406 proto=IP_PROTOS.tcp)
1407 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1408 self.udp_port_in, self.udp_port_out,
1409 proto=IP_PROTOS.udp)
1410 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1411 self.icmp_id_in, self.icmp_id_out,
1412 proto=IP_PROTOS.icmp)
1413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1418 pkts = self.create_stream_in(self.pg0, self.pg1)
1419 self.pg0.add_stream(pkts)
1420 self.pg_enable_capture(self.pg_interfaces)
1422 capture = self.pg1.get_capture(len(pkts))
1423 self.verify_capture_out(capture)
1426 pkts = self.create_stream_out(self.pg1)
1427 self.pg1.add_stream(pkts)
1428 self.pg_enable_capture(self.pg_interfaces)
1430 capture = self.pg0.get_capture(len(pkts))
1431 self.verify_capture_in(capture, self.pg0)
1433 def test_static_with_port_out(self):
1434 """ 1:1 NAPT initialized from outside network """
1436 self.tcp_port_out = 30606
1437 self.udp_port_out = 30607
1438 self.icmp_id_out = 30608
1440 self.nat44_add_address(self.nat_addr)
1441 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1442 self.tcp_port_in, self.tcp_port_out,
1443 proto=IP_PROTOS.tcp)
1444 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445 self.udp_port_in, self.udp_port_out,
1446 proto=IP_PROTOS.udp)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.icmp_id_in, self.icmp_id_out,
1449 proto=IP_PROTOS.icmp)
1450 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1451 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1455 pkts = self.create_stream_out(self.pg1)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1463 pkts = self.create_stream_in(self.pg0, self.pg1)
1464 self.pg0.add_stream(pkts)
1465 self.pg_enable_capture(self.pg_interfaces)
1467 capture = self.pg1.get_capture(len(pkts))
1468 self.verify_capture_out(capture)
1470 def test_static_vrf_aware(self):
1471 """ 1:1 NAT VRF awareness """
1473 nat_ip1 = "10.0.0.30"
1474 nat_ip2 = "10.0.0.40"
1475 self.tcp_port_out = 6303
1476 self.udp_port_out = 6304
1477 self.icmp_id_out = 6305
1479 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1481 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1483 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1486 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1488 # inside interface VRF match NAT44 static mapping VRF
1489 pkts = self.create_stream_in(self.pg4, self.pg3)
1490 self.pg4.add_stream(pkts)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg3.get_capture(len(pkts))
1494 self.verify_capture_out(capture, nat_ip1, True)
1496 # inside interface VRF don't match NAT44 static mapping VRF (packets
1498 pkts = self.create_stream_in(self.pg0, self.pg3)
1499 self.pg0.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 self.pg3.assert_nothing_captured()
1504 def test_dynamic_to_static(self):
1505 """ Switch from dynamic translation to 1:1NAT """
1506 nat_ip = "10.0.0.10"
1507 self.tcp_port_out = 6303
1508 self.udp_port_out = 6304
1509 self.icmp_id_out = 6305
1511 self.nat44_add_address(self.nat_addr)
1512 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1513 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1525 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1526 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1527 self.assertEqual(len(sessions), 0)
1528 pkts = self.create_stream_in(self.pg0, self.pg1)
1529 self.pg0.add_stream(pkts)
1530 self.pg_enable_capture(self.pg_interfaces)
1532 capture = self.pg1.get_capture(len(pkts))
1533 self.verify_capture_out(capture, nat_ip, True)
1535 def test_identity_nat(self):
1536 """ Identity NAT """
1538 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1540 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1543 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1544 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1545 TCP(sport=12345, dport=56789))
1546 self.pg1.add_stream(p)
1547 self.pg_enable_capture(self.pg_interfaces)
1549 capture = self.pg0.get_capture(1)
1554 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1555 self.assertEqual(ip.src, self.pg1.remote_ip4)
1556 self.assertEqual(tcp.dport, 56789)
1557 self.assertEqual(tcp.sport, 12345)
1558 self.assert_packet_checksums_valid(p)
1560 self.logger.error(ppp("Unexpected or invalid packet:", p))
1563 def test_multiple_inside_interfaces(self):
1564 """ NAT44 multiple non-overlapping address space inside interfaces """
1566 self.nat44_add_address(self.nat_addr)
1567 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1568 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1572 # between two NAT44 inside interfaces (no translation)
1573 pkts = self.create_stream_in(self.pg0, self.pg1)
1574 self.pg0.add_stream(pkts)
1575 self.pg_enable_capture(self.pg_interfaces)
1577 capture = self.pg1.get_capture(len(pkts))
1578 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1580 # from NAT44 inside to interface without NAT44 feature (no translation)
1581 pkts = self.create_stream_in(self.pg0, self.pg2)
1582 self.pg0.add_stream(pkts)
1583 self.pg_enable_capture(self.pg_interfaces)
1585 capture = self.pg2.get_capture(len(pkts))
1586 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1588 # in2out 1st interface
1589 pkts = self.create_stream_in(self.pg0, self.pg3)
1590 self.pg0.add_stream(pkts)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg3.get_capture(len(pkts))
1594 self.verify_capture_out(capture)
1596 # out2in 1st interface
1597 pkts = self.create_stream_out(self.pg3)
1598 self.pg3.add_stream(pkts)
1599 self.pg_enable_capture(self.pg_interfaces)
1601 capture = self.pg0.get_capture(len(pkts))
1602 self.verify_capture_in(capture, self.pg0)
1604 # in2out 2nd interface
1605 pkts = self.create_stream_in(self.pg1, self.pg3)
1606 self.pg1.add_stream(pkts)
1607 self.pg_enable_capture(self.pg_interfaces)
1609 capture = self.pg3.get_capture(len(pkts))
1610 self.verify_capture_out(capture)
1612 # out2in 2nd interface
1613 pkts = self.create_stream_out(self.pg3)
1614 self.pg3.add_stream(pkts)
1615 self.pg_enable_capture(self.pg_interfaces)
1617 capture = self.pg1.get_capture(len(pkts))
1618 self.verify_capture_in(capture, self.pg1)
1620 def test_inside_overlapping_interfaces(self):
1621 """ NAT44 multiple inside interfaces with overlapping address space """
1623 static_nat_ip = "10.0.0.10"
1624 self.nat44_add_address(self.nat_addr)
1625 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1627 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1628 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1630 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1633 # between NAT44 inside interfaces with same VRF (no translation)
1634 pkts = self.create_stream_in(self.pg4, self.pg5)
1635 self.pg4.add_stream(pkts)
1636 self.pg_enable_capture(self.pg_interfaces)
1638 capture = self.pg5.get_capture(len(pkts))
1639 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1641 # between NAT44 inside interfaces with different VRF (hairpinning)
1642 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1643 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1644 TCP(sport=1234, dport=5678))
1645 self.pg4.add_stream(p)
1646 self.pg_enable_capture(self.pg_interfaces)
1648 capture = self.pg6.get_capture(1)
1653 self.assertEqual(ip.src, self.nat_addr)
1654 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1655 self.assertNotEqual(tcp.sport, 1234)
1656 self.assertEqual(tcp.dport, 5678)
1658 self.logger.error(ppp("Unexpected or invalid packet:", p))
1661 # in2out 1st interface
1662 pkts = self.create_stream_in(self.pg4, self.pg3)
1663 self.pg4.add_stream(pkts)
1664 self.pg_enable_capture(self.pg_interfaces)
1666 capture = self.pg3.get_capture(len(pkts))
1667 self.verify_capture_out(capture)
1669 # out2in 1st interface
1670 pkts = self.create_stream_out(self.pg3)
1671 self.pg3.add_stream(pkts)
1672 self.pg_enable_capture(self.pg_interfaces)
1674 capture = self.pg4.get_capture(len(pkts))
1675 self.verify_capture_in(capture, self.pg4)
1677 # in2out 2nd interface
1678 pkts = self.create_stream_in(self.pg5, self.pg3)
1679 self.pg5.add_stream(pkts)
1680 self.pg_enable_capture(self.pg_interfaces)
1682 capture = self.pg3.get_capture(len(pkts))
1683 self.verify_capture_out(capture)
1685 # out2in 2nd interface
1686 pkts = self.create_stream_out(self.pg3)
1687 self.pg3.add_stream(pkts)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg5.get_capture(len(pkts))
1691 self.verify_capture_in(capture, self.pg5)
1694 addresses = self.vapi.nat44_address_dump()
1695 self.assertEqual(len(addresses), 1)
1696 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1697 self.assertEqual(len(sessions), 3)
1698 for session in sessions:
1699 self.assertFalse(session.is_static)
1700 self.assertEqual(session.inside_ip_address[0:4],
1701 self.pg5.remote_ip4n)
1702 self.assertEqual(session.outside_ip_address,
1703 addresses[0].ip_address)
1704 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1705 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1706 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1707 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1708 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1709 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1710 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1711 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1712 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1714 # in2out 3rd interface
1715 pkts = self.create_stream_in(self.pg6, self.pg3)
1716 self.pg6.add_stream(pkts)
1717 self.pg_enable_capture(self.pg_interfaces)
1719 capture = self.pg3.get_capture(len(pkts))
1720 self.verify_capture_out(capture, static_nat_ip, True)
1722 # out2in 3rd interface
1723 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1724 self.pg3.add_stream(pkts)
1725 self.pg_enable_capture(self.pg_interfaces)
1727 capture = self.pg6.get_capture(len(pkts))
1728 self.verify_capture_in(capture, self.pg6)
1730 # general user and session dump verifications
1731 users = self.vapi.nat44_user_dump()
1732 self.assertTrue(len(users) >= 3)
1733 addresses = self.vapi.nat44_address_dump()
1734 self.assertEqual(len(addresses), 1)
1736 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1738 for session in sessions:
1739 self.assertEqual(user.ip_address, session.inside_ip_address)
1740 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1741 self.assertTrue(session.protocol in
1742 [IP_PROTOS.tcp, IP_PROTOS.udp,
1744 self.assertFalse(session.ext_host_valid)
1747 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1748 self.assertTrue(len(sessions) >= 4)
1749 for session in sessions:
1750 self.assertFalse(session.is_static)
1751 self.assertEqual(session.inside_ip_address[0:4],
1752 self.pg4.remote_ip4n)
1753 self.assertEqual(session.outside_ip_address,
1754 addresses[0].ip_address)
1757 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1758 self.assertTrue(len(sessions) >= 3)
1759 for session in sessions:
1760 self.assertTrue(session.is_static)
1761 self.assertEqual(session.inside_ip_address[0:4],
1762 self.pg6.remote_ip4n)
1763 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1764 map(int, static_nat_ip.split('.')))
1765 self.assertTrue(session.inside_port in
1766 [self.tcp_port_in, self.udp_port_in,
1769 def test_hairpinning(self):
1770 """ NAT44 hairpinning - 1:1 NAPT """
1772 host = self.pg0.remote_hosts[0]
1773 server = self.pg0.remote_hosts[1]
1776 server_in_port = 5678
1777 server_out_port = 8765
1779 self.nat44_add_address(self.nat_addr)
1780 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1781 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1783 # add static mapping for server
1784 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1785 server_in_port, server_out_port,
1786 proto=IP_PROTOS.tcp)
1788 # send packet from host to server
1789 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1790 IP(src=host.ip4, dst=self.nat_addr) /
1791 TCP(sport=host_in_port, dport=server_out_port))
1792 self.pg0.add_stream(p)
1793 self.pg_enable_capture(self.pg_interfaces)
1795 capture = self.pg0.get_capture(1)
1800 self.assertEqual(ip.src, self.nat_addr)
1801 self.assertEqual(ip.dst, server.ip4)
1802 self.assertNotEqual(tcp.sport, host_in_port)
1803 self.assertEqual(tcp.dport, server_in_port)
1804 self.assert_packet_checksums_valid(p)
1805 host_out_port = tcp.sport
1807 self.logger.error(ppp("Unexpected or invalid packet:", p))
1810 # send reply from server to host
1811 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1812 IP(src=server.ip4, dst=self.nat_addr) /
1813 TCP(sport=server_in_port, dport=host_out_port))
1814 self.pg0.add_stream(p)
1815 self.pg_enable_capture(self.pg_interfaces)
1817 capture = self.pg0.get_capture(1)
1822 self.assertEqual(ip.src, self.nat_addr)
1823 self.assertEqual(ip.dst, host.ip4)
1824 self.assertEqual(tcp.sport, server_out_port)
1825 self.assertEqual(tcp.dport, host_in_port)
1826 self.assert_packet_checksums_valid(p)
1828 self.logger.error(ppp("Unexpected or invalid packet:", p))
1831 def test_hairpinning2(self):
1832 """ NAT44 hairpinning - 1:1 NAT"""
1834 server1_nat_ip = "10.0.0.10"
1835 server2_nat_ip = "10.0.0.11"
1836 host = self.pg0.remote_hosts[0]
1837 server1 = self.pg0.remote_hosts[1]
1838 server2 = self.pg0.remote_hosts[2]
1839 server_tcp_port = 22
1840 server_udp_port = 20
1842 self.nat44_add_address(self.nat_addr)
1843 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1844 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1847 # add static mapping for servers
1848 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1849 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1853 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1854 IP(src=host.ip4, dst=server1_nat_ip) /
1855 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1857 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1858 IP(src=host.ip4, dst=server1_nat_ip) /
1859 UDP(sport=self.udp_port_in, dport=server_udp_port))
1861 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1862 IP(src=host.ip4, dst=server1_nat_ip) /
1863 ICMP(id=self.icmp_id_in, type='echo-request'))
1865 self.pg0.add_stream(pkts)
1866 self.pg_enable_capture(self.pg_interfaces)
1868 capture = self.pg0.get_capture(len(pkts))
1869 for packet in capture:
1871 self.assertEqual(packet[IP].src, self.nat_addr)
1872 self.assertEqual(packet[IP].dst, server1.ip4)
1873 if packet.haslayer(TCP):
1874 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1875 self.assertEqual(packet[TCP].dport, server_tcp_port)
1876 self.tcp_port_out = packet[TCP].sport
1877 self.assert_packet_checksums_valid(packet)
1878 elif packet.haslayer(UDP):
1879 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1880 self.assertEqual(packet[UDP].dport, server_udp_port)
1881 self.udp_port_out = packet[UDP].sport
1883 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1884 self.icmp_id_out = packet[ICMP].id
1886 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1891 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1892 IP(src=server1.ip4, dst=self.nat_addr) /
1893 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1895 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1896 IP(src=server1.ip4, dst=self.nat_addr) /
1897 UDP(sport=server_udp_port, dport=self.udp_port_out))
1899 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1900 IP(src=server1.ip4, dst=self.nat_addr) /
1901 ICMP(id=self.icmp_id_out, type='echo-reply'))
1903 self.pg0.add_stream(pkts)
1904 self.pg_enable_capture(self.pg_interfaces)
1906 capture = self.pg0.get_capture(len(pkts))
1907 for packet in capture:
1909 self.assertEqual(packet[IP].src, server1_nat_ip)
1910 self.assertEqual(packet[IP].dst, host.ip4)
1911 if packet.haslayer(TCP):
1912 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1913 self.assertEqual(packet[TCP].sport, server_tcp_port)
1914 self.assert_packet_checksums_valid(packet)
1915 elif packet.haslayer(UDP):
1916 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1917 self.assertEqual(packet[UDP].sport, server_udp_port)
1919 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1921 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1924 # server2 to server1
1926 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1927 IP(src=server2.ip4, dst=server1_nat_ip) /
1928 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1930 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1931 IP(src=server2.ip4, dst=server1_nat_ip) /
1932 UDP(sport=self.udp_port_in, dport=server_udp_port))
1934 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1935 IP(src=server2.ip4, dst=server1_nat_ip) /
1936 ICMP(id=self.icmp_id_in, type='echo-request'))
1938 self.pg0.add_stream(pkts)
1939 self.pg_enable_capture(self.pg_interfaces)
1941 capture = self.pg0.get_capture(len(pkts))
1942 for packet in capture:
1944 self.assertEqual(packet[IP].src, server2_nat_ip)
1945 self.assertEqual(packet[IP].dst, server1.ip4)
1946 if packet.haslayer(TCP):
1947 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1948 self.assertEqual(packet[TCP].dport, server_tcp_port)
1949 self.tcp_port_out = packet[TCP].sport
1950 self.assert_packet_checksums_valid(packet)
1951 elif packet.haslayer(UDP):
1952 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1953 self.assertEqual(packet[UDP].dport, server_udp_port)
1954 self.udp_port_out = packet[UDP].sport
1956 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1957 self.icmp_id_out = packet[ICMP].id
1959 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1962 # server1 to server2
1964 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1965 IP(src=server1.ip4, dst=server2_nat_ip) /
1966 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1968 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1969 IP(src=server1.ip4, dst=server2_nat_ip) /
1970 UDP(sport=server_udp_port, dport=self.udp_port_out))
1972 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1973 IP(src=server1.ip4, dst=server2_nat_ip) /
1974 ICMP(id=self.icmp_id_out, type='echo-reply'))
1976 self.pg0.add_stream(pkts)
1977 self.pg_enable_capture(self.pg_interfaces)
1979 capture = self.pg0.get_capture(len(pkts))
1980 for packet in capture:
1982 self.assertEqual(packet[IP].src, server1_nat_ip)
1983 self.assertEqual(packet[IP].dst, server2.ip4)
1984 if packet.haslayer(TCP):
1985 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1986 self.assertEqual(packet[TCP].sport, server_tcp_port)
1987 self.assert_packet_checksums_valid(packet)
1988 elif packet.haslayer(UDP):
1989 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1990 self.assertEqual(packet[UDP].sport, server_udp_port)
1992 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1994 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1997 def test_max_translations_per_user(self):
1998 """ MAX translations per user - recycle the least recently used """
2000 self.nat44_add_address(self.nat_addr)
2001 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2002 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2005 # get maximum number of translations per user
2006 nat44_config = self.vapi.nat_show_config()
2008 # send more than maximum number of translations per user packets
2009 pkts_num = nat44_config.max_translations_per_user + 5
2011 for port in range(0, pkts_num):
2012 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2013 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2014 TCP(sport=1025 + port))
2016 self.pg0.add_stream(pkts)
2017 self.pg_enable_capture(self.pg_interfaces)
2020 # verify number of translated packet
2021 self.pg1.get_capture(pkts_num)
2023 users = self.vapi.nat44_user_dump()
2025 if user.ip_address == self.pg0.remote_ip4n:
2026 self.assertEqual(user.nsessions,
2027 nat44_config.max_translations_per_user)
2028 self.assertEqual(user.nstaticsessions, 0)
2031 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2033 proto=IP_PROTOS.tcp)
2034 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2035 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2036 TCP(sport=tcp_port))
2037 self.pg0.add_stream(p)
2038 self.pg_enable_capture(self.pg_interfaces)
2040 self.pg1.get_capture(1)
2041 users = self.vapi.nat44_user_dump()
2043 if user.ip_address == self.pg0.remote_ip4n:
2044 self.assertEqual(user.nsessions,
2045 nat44_config.max_translations_per_user - 1)
2046 self.assertEqual(user.nstaticsessions, 1)
2048 def test_interface_addr(self):
2049 """ Acquire NAT44 addresses from interface """
2050 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2052 # no address in NAT pool
2053 adresses = self.vapi.nat44_address_dump()
2054 self.assertEqual(0, len(adresses))
2056 # configure interface address and check NAT address pool
2057 self.pg7.config_ip4()
2058 adresses = self.vapi.nat44_address_dump()
2059 self.assertEqual(1, len(adresses))
2060 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2062 # remove interface address and check NAT address pool
2063 self.pg7.unconfig_ip4()
2064 adresses = self.vapi.nat44_address_dump()
2065 self.assertEqual(0, len(adresses))
2067 def test_interface_addr_static_mapping(self):
2068 """ Static mapping with addresses from interface """
2071 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2072 self.nat44_add_static_mapping(
2074 external_sw_if_index=self.pg7.sw_if_index,
2077 # static mappings with external interface
2078 static_mappings = self.vapi.nat44_static_mapping_dump()
2079 self.assertEqual(1, len(static_mappings))
2080 self.assertEqual(self.pg7.sw_if_index,
2081 static_mappings[0].external_sw_if_index)
2082 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2084 # configure interface address and check static mappings
2085 self.pg7.config_ip4()
2086 static_mappings = self.vapi.nat44_static_mapping_dump()
2087 self.assertEqual(2, len(static_mappings))
2089 for sm in static_mappings:
2090 if sm.external_sw_if_index == 0xFFFFFFFF:
2091 self.assertEqual(sm.external_ip_address[0:4],
2092 self.pg7.local_ip4n)
2093 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2095 self.assertTrue(resolved)
2097 # remove interface address and check static mappings
2098 self.pg7.unconfig_ip4()
2099 static_mappings = self.vapi.nat44_static_mapping_dump()
2100 self.assertEqual(1, len(static_mappings))
2101 self.assertEqual(self.pg7.sw_if_index,
2102 static_mappings[0].external_sw_if_index)
2103 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2105 # configure interface address again and check static mappings
2106 self.pg7.config_ip4()
2107 static_mappings = self.vapi.nat44_static_mapping_dump()
2108 self.assertEqual(2, len(static_mappings))
2110 for sm in static_mappings:
2111 if sm.external_sw_if_index == 0xFFFFFFFF:
2112 self.assertEqual(sm.external_ip_address[0:4],
2113 self.pg7.local_ip4n)
2114 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2116 self.assertTrue(resolved)
2118 # remove static mapping
2119 self.nat44_add_static_mapping(
2121 external_sw_if_index=self.pg7.sw_if_index,
2124 static_mappings = self.vapi.nat44_static_mapping_dump()
2125 self.assertEqual(0, len(static_mappings))
2127 def test_interface_addr_identity_nat(self):
2128 """ Identity NAT with addresses from interface """
2131 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2132 self.vapi.nat44_add_del_identity_mapping(
2133 sw_if_index=self.pg7.sw_if_index,
2135 protocol=IP_PROTOS.tcp,
2138 # identity mappings with external interface
2139 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2140 self.assertEqual(1, len(identity_mappings))
2141 self.assertEqual(self.pg7.sw_if_index,
2142 identity_mappings[0].sw_if_index)
2144 # configure interface address and check identity mappings
2145 self.pg7.config_ip4()
2146 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2148 self.assertEqual(2, len(identity_mappings))
2149 for sm in identity_mappings:
2150 if sm.sw_if_index == 0xFFFFFFFF:
2151 self.assertEqual(identity_mappings[0].ip_address,
2152 self.pg7.local_ip4n)
2153 self.assertEqual(port, identity_mappings[0].port)
2154 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2156 self.assertTrue(resolved)
2158 # remove interface address and check identity mappings
2159 self.pg7.unconfig_ip4()
2160 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2161 self.assertEqual(1, len(identity_mappings))
2162 self.assertEqual(self.pg7.sw_if_index,
2163 identity_mappings[0].sw_if_index)
2165 def test_ipfix_nat44_sess(self):
2166 """ IPFIX logging NAT44 session created/delted """
2167 self.ipfix_domain_id = 10
2168 self.ipfix_src_port = 20202
2169 colector_port = 30303
2170 bind_layers(UDP, IPFIX, dport=30303)
2171 self.nat44_add_address(self.nat_addr)
2172 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2173 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2175 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2176 src_address=self.pg3.local_ip4n,
2178 template_interval=10,
2179 collector_port=colector_port)
2180 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2181 src_port=self.ipfix_src_port)
2183 pkts = self.create_stream_in(self.pg0, self.pg1)
2184 self.pg0.add_stream(pkts)
2185 self.pg_enable_capture(self.pg_interfaces)
2187 capture = self.pg1.get_capture(len(pkts))
2188 self.verify_capture_out(capture)
2189 self.nat44_add_address(self.nat_addr, is_add=0)
2190 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2191 capture = self.pg3.get_capture(9)
2192 ipfix = IPFIXDecoder()
2193 # first load template
2195 self.assertTrue(p.haslayer(IPFIX))
2196 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2197 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2198 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2199 self.assertEqual(p[UDP].dport, colector_port)
2200 self.assertEqual(p[IPFIX].observationDomainID,
2201 self.ipfix_domain_id)
2202 if p.haslayer(Template):
2203 ipfix.add_template(p.getlayer(Template))
2204 # verify events in data set
2206 if p.haslayer(Data):
2207 data = ipfix.decode_data_set(p.getlayer(Set))
2208 self.verify_ipfix_nat44_ses(data)
2210 def test_ipfix_addr_exhausted(self):
2211 """ IPFIX logging NAT addresses exhausted """
2212 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2213 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2215 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2216 src_address=self.pg3.local_ip4n,
2218 template_interval=10)
2219 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2220 src_port=self.ipfix_src_port)
2222 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2223 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2225 self.pg0.add_stream(p)
2226 self.pg_enable_capture(self.pg_interfaces)
2228 self.pg1.assert_nothing_captured()
2230 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2231 capture = self.pg3.get_capture(9)
2232 ipfix = IPFIXDecoder()
2233 # first load template
2235 self.assertTrue(p.haslayer(IPFIX))
2236 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2237 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2238 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2239 self.assertEqual(p[UDP].dport, 4739)
2240 self.assertEqual(p[IPFIX].observationDomainID,
2241 self.ipfix_domain_id)
2242 if p.haslayer(Template):
2243 ipfix.add_template(p.getlayer(Template))
2244 # verify events in data set
2246 if p.haslayer(Data):
2247 data = ipfix.decode_data_set(p.getlayer(Set))
2248 self.verify_ipfix_addr_exhausted(data)
2250 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2251 def test_ipfix_max_sessions(self):
2252 """ IPFIX logging maximum session entries exceeded """
2253 self.nat44_add_address(self.nat_addr)
2254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2258 nat44_config = self.vapi.nat_show_config()
2259 max_sessions = 10 * nat44_config.translation_buckets
2262 for i in range(0, max_sessions):
2263 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2264 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2265 IP(src=src, dst=self.pg1.remote_ip4) /
2268 self.pg0.add_stream(pkts)
2269 self.pg_enable_capture(self.pg_interfaces)
2272 self.pg1.get_capture(max_sessions)
2273 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2274 src_address=self.pg3.local_ip4n,
2276 template_interval=10)
2277 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2278 src_port=self.ipfix_src_port)
2280 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2281 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2283 self.pg0.add_stream(p)
2284 self.pg_enable_capture(self.pg_interfaces)
2286 self.pg1.assert_nothing_captured()
2288 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2289 capture = self.pg3.get_capture(9)
2290 ipfix = IPFIXDecoder()
2291 # first load template
2293 self.assertTrue(p.haslayer(IPFIX))
2294 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2295 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2296 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2297 self.assertEqual(p[UDP].dport, 4739)
2298 self.assertEqual(p[IPFIX].observationDomainID,
2299 self.ipfix_domain_id)
2300 if p.haslayer(Template):
2301 ipfix.add_template(p.getlayer(Template))
2302 # verify events in data set
2304 if p.haslayer(Data):
2305 data = ipfix.decode_data_set(p.getlayer(Set))
2306 self.verify_ipfix_max_sessions(data, max_sessions)
2308 def test_pool_addr_fib(self):
2309 """ NAT44 add pool addresses to FIB """
2310 static_addr = '10.0.0.10'
2311 self.nat44_add_address(self.nat_addr)
2312 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2313 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2315 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2318 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2319 ARP(op=ARP.who_has, pdst=self.nat_addr,
2320 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2321 self.pg1.add_stream(p)
2322 self.pg_enable_capture(self.pg_interfaces)
2324 capture = self.pg1.get_capture(1)
2325 self.assertTrue(capture[0].haslayer(ARP))
2326 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2329 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2330 ARP(op=ARP.who_has, pdst=static_addr,
2331 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2332 self.pg1.add_stream(p)
2333 self.pg_enable_capture(self.pg_interfaces)
2335 capture = self.pg1.get_capture(1)
2336 self.assertTrue(capture[0].haslayer(ARP))
2337 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2339 # send ARP to non-NAT44 interface
2340 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2341 ARP(op=ARP.who_has, pdst=self.nat_addr,
2342 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2343 self.pg2.add_stream(p)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 self.pg1.assert_nothing_captured()
2348 # remove addresses and verify
2349 self.nat44_add_address(self.nat_addr, is_add=0)
2350 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2353 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2354 ARP(op=ARP.who_has, pdst=self.nat_addr,
2355 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2356 self.pg1.add_stream(p)
2357 self.pg_enable_capture(self.pg_interfaces)
2359 self.pg1.assert_nothing_captured()
2361 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2362 ARP(op=ARP.who_has, pdst=static_addr,
2363 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2364 self.pg1.add_stream(p)
2365 self.pg_enable_capture(self.pg_interfaces)
2367 self.pg1.assert_nothing_captured()
2369 def test_vrf_mode(self):
2370 """ NAT44 tenant VRF aware address pool mode """
2374 nat_ip1 = "10.0.0.10"
2375 nat_ip2 = "10.0.0.11"
2377 self.pg0.unconfig_ip4()
2378 self.pg1.unconfig_ip4()
2379 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2380 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2381 self.pg0.set_table_ip4(vrf_id1)
2382 self.pg1.set_table_ip4(vrf_id2)
2383 self.pg0.config_ip4()
2384 self.pg1.config_ip4()
2386 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2387 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2388 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2389 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2390 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2394 pkts = self.create_stream_in(self.pg0, self.pg2)
2395 self.pg0.add_stream(pkts)
2396 self.pg_enable_capture(self.pg_interfaces)
2398 capture = self.pg2.get_capture(len(pkts))
2399 self.verify_capture_out(capture, nat_ip1)
2402 pkts = self.create_stream_in(self.pg1, self.pg2)
2403 self.pg1.add_stream(pkts)
2404 self.pg_enable_capture(self.pg_interfaces)
2406 capture = self.pg2.get_capture(len(pkts))
2407 self.verify_capture_out(capture, nat_ip2)
2409 self.pg0.unconfig_ip4()
2410 self.pg1.unconfig_ip4()
2411 self.pg0.set_table_ip4(0)
2412 self.pg1.set_table_ip4(0)
2413 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2414 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2416 def test_vrf_feature_independent(self):
2417 """ NAT44 tenant VRF independent address pool mode """
2419 nat_ip1 = "10.0.0.10"
2420 nat_ip2 = "10.0.0.11"
2422 self.nat44_add_address(nat_ip1)
2423 self.nat44_add_address(nat_ip2, vrf_id=99)
2424 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2425 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2426 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2430 pkts = self.create_stream_in(self.pg0, self.pg2)
2431 self.pg0.add_stream(pkts)
2432 self.pg_enable_capture(self.pg_interfaces)
2434 capture = self.pg2.get_capture(len(pkts))
2435 self.verify_capture_out(capture, nat_ip1)
2438 pkts = self.create_stream_in(self.pg1, self.pg2)
2439 self.pg1.add_stream(pkts)
2440 self.pg_enable_capture(self.pg_interfaces)
2442 capture = self.pg2.get_capture(len(pkts))
2443 self.verify_capture_out(capture, nat_ip1)
2445 def test_dynamic_ipless_interfaces(self):
2446 """ NAT44 interfaces without configured IP address """
2448 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2449 mactobinary(self.pg7.remote_mac),
2450 self.pg7.remote_ip4n,
2452 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2453 mactobinary(self.pg8.remote_mac),
2454 self.pg8.remote_ip4n,
2457 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2458 dst_address_length=32,
2459 next_hop_address=self.pg7.remote_ip4n,
2460 next_hop_sw_if_index=self.pg7.sw_if_index)
2461 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2462 dst_address_length=32,
2463 next_hop_address=self.pg8.remote_ip4n,
2464 next_hop_sw_if_index=self.pg8.sw_if_index)
2466 self.nat44_add_address(self.nat_addr)
2467 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2468 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2472 pkts = self.create_stream_in(self.pg7, self.pg8)
2473 self.pg7.add_stream(pkts)
2474 self.pg_enable_capture(self.pg_interfaces)
2476 capture = self.pg8.get_capture(len(pkts))
2477 self.verify_capture_out(capture)
2480 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2481 self.pg8.add_stream(pkts)
2482 self.pg_enable_capture(self.pg_interfaces)
2484 capture = self.pg7.get_capture(len(pkts))
2485 self.verify_capture_in(capture, self.pg7)
2487 def test_static_ipless_interfaces(self):
2488 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2490 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2491 mactobinary(self.pg7.remote_mac),
2492 self.pg7.remote_ip4n,
2494 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2495 mactobinary(self.pg8.remote_mac),
2496 self.pg8.remote_ip4n,
2499 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2500 dst_address_length=32,
2501 next_hop_address=self.pg7.remote_ip4n,
2502 next_hop_sw_if_index=self.pg7.sw_if_index)
2503 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2504 dst_address_length=32,
2505 next_hop_address=self.pg8.remote_ip4n,
2506 next_hop_sw_if_index=self.pg8.sw_if_index)
2508 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2509 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2510 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2514 pkts = self.create_stream_out(self.pg8)
2515 self.pg8.add_stream(pkts)
2516 self.pg_enable_capture(self.pg_interfaces)
2518 capture = self.pg7.get_capture(len(pkts))
2519 self.verify_capture_in(capture, self.pg7)
2522 pkts = self.create_stream_in(self.pg7, self.pg8)
2523 self.pg7.add_stream(pkts)
2524 self.pg_enable_capture(self.pg_interfaces)
2526 capture = self.pg8.get_capture(len(pkts))
2527 self.verify_capture_out(capture, self.nat_addr, True)
2529 def test_static_with_port_ipless_interfaces(self):
2530 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2532 self.tcp_port_out = 30606
2533 self.udp_port_out = 30607
2534 self.icmp_id_out = 30608
2536 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2537 mactobinary(self.pg7.remote_mac),
2538 self.pg7.remote_ip4n,
2540 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2541 mactobinary(self.pg8.remote_mac),
2542 self.pg8.remote_ip4n,
2545 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2546 dst_address_length=32,
2547 next_hop_address=self.pg7.remote_ip4n,
2548 next_hop_sw_if_index=self.pg7.sw_if_index)
2549 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2550 dst_address_length=32,
2551 next_hop_address=self.pg8.remote_ip4n,
2552 next_hop_sw_if_index=self.pg8.sw_if_index)
2554 self.nat44_add_address(self.nat_addr)
2555 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2556 self.tcp_port_in, self.tcp_port_out,
2557 proto=IP_PROTOS.tcp)
2558 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2559 self.udp_port_in, self.udp_port_out,
2560 proto=IP_PROTOS.udp)
2561 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2562 self.icmp_id_in, self.icmp_id_out,
2563 proto=IP_PROTOS.icmp)
2564 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2565 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2569 pkts = self.create_stream_out(self.pg8)
2570 self.pg8.add_stream(pkts)
2571 self.pg_enable_capture(self.pg_interfaces)
2573 capture = self.pg7.get_capture(len(pkts))
2574 self.verify_capture_in(capture, self.pg7)
2577 pkts = self.create_stream_in(self.pg7, self.pg8)
2578 self.pg7.add_stream(pkts)
2579 self.pg_enable_capture(self.pg_interfaces)
2581 capture = self.pg8.get_capture(len(pkts))
2582 self.verify_capture_out(capture)
2584 def test_static_unknown_proto(self):
2585 """ 1:1 NAT translate packet with unknown protocol """
2586 nat_ip = "10.0.0.10"
2587 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2588 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2589 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2593 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2594 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2596 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2597 TCP(sport=1234, dport=1234))
2598 self.pg0.add_stream(p)
2599 self.pg_enable_capture(self.pg_interfaces)
2601 p = self.pg1.get_capture(1)
2604 self.assertEqual(packet[IP].src, nat_ip)
2605 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2606 self.assertTrue(packet.haslayer(GRE))
2607 self.assert_packet_checksums_valid(packet)
2609 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2613 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2614 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2616 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2617 TCP(sport=1234, dport=1234))
2618 self.pg1.add_stream(p)
2619 self.pg_enable_capture(self.pg_interfaces)
2621 p = self.pg0.get_capture(1)
2624 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2625 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2626 self.assertTrue(packet.haslayer(GRE))
2627 self.assert_packet_checksums_valid(packet)
2629 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2632 def test_hairpinning_static_unknown_proto(self):
2633 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2635 host = self.pg0.remote_hosts[0]
2636 server = self.pg0.remote_hosts[1]
2638 host_nat_ip = "10.0.0.10"
2639 server_nat_ip = "10.0.0.11"
2641 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2642 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2643 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2644 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2648 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2649 IP(src=host.ip4, dst=server_nat_ip) /
2651 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2652 TCP(sport=1234, dport=1234))
2653 self.pg0.add_stream(p)
2654 self.pg_enable_capture(self.pg_interfaces)
2656 p = self.pg0.get_capture(1)
2659 self.assertEqual(packet[IP].src, host_nat_ip)
2660 self.assertEqual(packet[IP].dst, server.ip4)
2661 self.assertTrue(packet.haslayer(GRE))
2662 self.assert_packet_checksums_valid(packet)
2664 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2668 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2669 IP(src=server.ip4, dst=host_nat_ip) /
2671 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2672 TCP(sport=1234, dport=1234))
2673 self.pg0.add_stream(p)
2674 self.pg_enable_capture(self.pg_interfaces)
2676 p = self.pg0.get_capture(1)
2679 self.assertEqual(packet[IP].src, server_nat_ip)
2680 self.assertEqual(packet[IP].dst, host.ip4)
2681 self.assertTrue(packet.haslayer(GRE))
2682 self.assert_packet_checksums_valid(packet)
2684 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2687 def test_output_feature(self):
2688 """ NAT44 interface output feature (in2out postrouting) """
2689 self.nat44_add_address(self.nat_addr)
2690 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2691 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2692 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2696 pkts = self.create_stream_in(self.pg0, self.pg3)
2697 self.pg0.add_stream(pkts)
2698 self.pg_enable_capture(self.pg_interfaces)
2700 capture = self.pg3.get_capture(len(pkts))
2701 self.verify_capture_out(capture)
2704 pkts = self.create_stream_out(self.pg3)
2705 self.pg3.add_stream(pkts)
2706 self.pg_enable_capture(self.pg_interfaces)
2708 capture = self.pg0.get_capture(len(pkts))
2709 self.verify_capture_in(capture, self.pg0)
2711 # from non-NAT interface to NAT inside interface
2712 pkts = self.create_stream_in(self.pg2, self.pg0)
2713 self.pg2.add_stream(pkts)
2714 self.pg_enable_capture(self.pg_interfaces)
2716 capture = self.pg0.get_capture(len(pkts))
2717 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2719 def test_output_feature_vrf_aware(self):
2720 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2721 nat_ip_vrf10 = "10.0.0.10"
2722 nat_ip_vrf20 = "10.0.0.20"
2724 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2725 dst_address_length=32,
2726 next_hop_address=self.pg3.remote_ip4n,
2727 next_hop_sw_if_index=self.pg3.sw_if_index,
2729 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2730 dst_address_length=32,
2731 next_hop_address=self.pg3.remote_ip4n,
2732 next_hop_sw_if_index=self.pg3.sw_if_index,
2735 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2736 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2737 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2738 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2739 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2743 pkts = self.create_stream_in(self.pg4, self.pg3)
2744 self.pg4.add_stream(pkts)
2745 self.pg_enable_capture(self.pg_interfaces)
2747 capture = self.pg3.get_capture(len(pkts))
2748 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2751 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2752 self.pg3.add_stream(pkts)
2753 self.pg_enable_capture(self.pg_interfaces)
2755 capture = self.pg4.get_capture(len(pkts))
2756 self.verify_capture_in(capture, self.pg4)
2759 pkts = self.create_stream_in(self.pg6, self.pg3)
2760 self.pg6.add_stream(pkts)
2761 self.pg_enable_capture(self.pg_interfaces)
2763 capture = self.pg3.get_capture(len(pkts))
2764 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2767 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2768 self.pg3.add_stream(pkts)
2769 self.pg_enable_capture(self.pg_interfaces)
2771 capture = self.pg6.get_capture(len(pkts))
2772 self.verify_capture_in(capture, self.pg6)
2774 def test_output_feature_hairpinning(self):
2775 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2776 host = self.pg0.remote_hosts[0]
2777 server = self.pg0.remote_hosts[1]
2780 server_in_port = 5678
2781 server_out_port = 8765
2783 self.nat44_add_address(self.nat_addr)
2784 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2785 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2788 # add static mapping for server
2789 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2790 server_in_port, server_out_port,
2791 proto=IP_PROTOS.tcp)
2793 # send packet from host to server
2794 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2795 IP(src=host.ip4, dst=self.nat_addr) /
2796 TCP(sport=host_in_port, dport=server_out_port))
2797 self.pg0.add_stream(p)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg0.get_capture(1)
2805 self.assertEqual(ip.src, self.nat_addr)
2806 self.assertEqual(ip.dst, server.ip4)
2807 self.assertNotEqual(tcp.sport, host_in_port)
2808 self.assertEqual(tcp.dport, server_in_port)
2809 self.assert_packet_checksums_valid(p)
2810 host_out_port = tcp.sport
2812 self.logger.error(ppp("Unexpected or invalid packet:", p))
2815 # send reply from server to host
2816 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2817 IP(src=server.ip4, dst=self.nat_addr) /
2818 TCP(sport=server_in_port, dport=host_out_port))
2819 self.pg0.add_stream(p)
2820 self.pg_enable_capture(self.pg_interfaces)
2822 capture = self.pg0.get_capture(1)
2827 self.assertEqual(ip.src, self.nat_addr)
2828 self.assertEqual(ip.dst, host.ip4)
2829 self.assertEqual(tcp.sport, server_out_port)
2830 self.assertEqual(tcp.dport, host_in_port)
2831 self.assert_packet_checksums_valid(p)
2833 self.logger.error(ppp("Unexpected or invalid packet:", p))
2836 def test_one_armed_nat44(self):
2837 """ One armed NAT44 """
2838 remote_host = self.pg9.remote_hosts[0]
2839 local_host = self.pg9.remote_hosts[1]
2842 self.nat44_add_address(self.nat_addr)
2843 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2844 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2848 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2849 IP(src=local_host.ip4, dst=remote_host.ip4) /
2850 TCP(sport=12345, dport=80))
2851 self.pg9.add_stream(p)
2852 self.pg_enable_capture(self.pg_interfaces)
2854 capture = self.pg9.get_capture(1)
2859 self.assertEqual(ip.src, self.nat_addr)
2860 self.assertEqual(ip.dst, remote_host.ip4)
2861 self.assertNotEqual(tcp.sport, 12345)
2862 external_port = tcp.sport
2863 self.assertEqual(tcp.dport, 80)
2864 self.assert_packet_checksums_valid(p)
2866 self.logger.error(ppp("Unexpected or invalid packet:", p))
2870 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2871 IP(src=remote_host.ip4, dst=self.nat_addr) /
2872 TCP(sport=80, dport=external_port))
2873 self.pg9.add_stream(p)
2874 self.pg_enable_capture(self.pg_interfaces)
2876 capture = self.pg9.get_capture(1)
2881 self.assertEqual(ip.src, remote_host.ip4)
2882 self.assertEqual(ip.dst, local_host.ip4)
2883 self.assertEqual(tcp.sport, 80)
2884 self.assertEqual(tcp.dport, 12345)
2885 self.assert_packet_checksums_valid(p)
2887 self.logger.error(ppp("Unexpected or invalid packet:", p))
2890 def test_del_session(self):
2891 """ Delete NAT44 session """
2892 self.nat44_add_address(self.nat_addr)
2893 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2894 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2897 pkts = self.create_stream_in(self.pg0, self.pg1)
2898 self.pg0.add_stream(pkts)
2899 self.pg_enable_capture(self.pg_interfaces)
2901 self.pg1.get_capture(len(pkts))
2903 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2904 nsessions = len(sessions)
2906 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2907 sessions[0].inside_port,
2908 sessions[0].protocol)
2909 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2910 sessions[1].outside_port,
2911 sessions[1].protocol,
2914 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2915 self.assertEqual(nsessions - len(sessions), 2)
2917 def test_set_get_reass(self):
2918 """ NAT44 set/get virtual fragmentation reassembly """
2919 reas_cfg1 = self.vapi.nat_get_reass()
2921 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2922 max_reass=reas_cfg1.ip4_max_reass * 2,
2923 max_frag=reas_cfg1.ip4_max_frag * 2)
2925 reas_cfg2 = self.vapi.nat_get_reass()
2927 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2928 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2929 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2931 self.vapi.nat_set_reass(drop_frag=1)
2932 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2934 def test_frag_in_order(self):
2935 """ NAT44 translate fragments arriving in order """
2936 self.nat44_add_address(self.nat_addr)
2937 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2938 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2941 data = "A" * 4 + "B" * 16 + "C" * 3
2942 self.tcp_port_in = random.randint(1025, 65535)
2944 reass = self.vapi.nat_reass_dump()
2945 reass_n_start = len(reass)
2948 pkts = self.create_stream_frag(self.pg0,
2949 self.pg1.remote_ip4,
2953 self.pg0.add_stream(pkts)
2954 self.pg_enable_capture(self.pg_interfaces)
2956 frags = self.pg1.get_capture(len(pkts))
2957 p = self.reass_frags_and_verify(frags,
2959 self.pg1.remote_ip4)
2960 self.assertEqual(p[TCP].dport, 20)
2961 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2962 self.tcp_port_out = p[TCP].sport
2963 self.assertEqual(data, p[Raw].load)
2966 pkts = self.create_stream_frag(self.pg1,
2971 self.pg1.add_stream(pkts)
2972 self.pg_enable_capture(self.pg_interfaces)
2974 frags = self.pg0.get_capture(len(pkts))
2975 p = self.reass_frags_and_verify(frags,
2976 self.pg1.remote_ip4,
2977 self.pg0.remote_ip4)
2978 self.assertEqual(p[TCP].sport, 20)
2979 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2980 self.assertEqual(data, p[Raw].load)
2982 reass = self.vapi.nat_reass_dump()
2983 reass_n_end = len(reass)
2985 self.assertEqual(reass_n_end - reass_n_start, 2)
2987 def test_reass_hairpinning(self):
2988 """ NAT44 fragments hairpinning """
2989 server = self.pg0.remote_hosts[1]
2990 host_in_port = random.randint(1025, 65535)
2991 server_in_port = random.randint(1025, 65535)
2992 server_out_port = random.randint(1025, 65535)
2993 data = "A" * 4 + "B" * 16 + "C" * 3
2995 self.nat44_add_address(self.nat_addr)
2996 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2997 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2999 # add static mapping for server
3000 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3001 server_in_port, server_out_port,
3002 proto=IP_PROTOS.tcp)
3004 # send packet from host to server
3005 pkts = self.create_stream_frag(self.pg0,
3010 self.pg0.add_stream(pkts)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 frags = self.pg0.get_capture(len(pkts))
3014 p = self.reass_frags_and_verify(frags,
3017 self.assertNotEqual(p[TCP].sport, host_in_port)
3018 self.assertEqual(p[TCP].dport, server_in_port)
3019 self.assertEqual(data, p[Raw].load)
3021 def test_frag_out_of_order(self):
3022 """ NAT44 translate fragments arriving out of order """
3023 self.nat44_add_address(self.nat_addr)
3024 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3025 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3028 data = "A" * 4 + "B" * 16 + "C" * 3
3029 random.randint(1025, 65535)
3032 pkts = self.create_stream_frag(self.pg0,
3033 self.pg1.remote_ip4,
3038 self.pg0.add_stream(pkts)
3039 self.pg_enable_capture(self.pg_interfaces)
3041 frags = self.pg1.get_capture(len(pkts))
3042 p = self.reass_frags_and_verify(frags,
3044 self.pg1.remote_ip4)
3045 self.assertEqual(p[TCP].dport, 20)
3046 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3047 self.tcp_port_out = p[TCP].sport
3048 self.assertEqual(data, p[Raw].load)
3051 pkts = self.create_stream_frag(self.pg1,
3057 self.pg1.add_stream(pkts)
3058 self.pg_enable_capture(self.pg_interfaces)
3060 frags = self.pg0.get_capture(len(pkts))
3061 p = self.reass_frags_and_verify(frags,
3062 self.pg1.remote_ip4,
3063 self.pg0.remote_ip4)
3064 self.assertEqual(p[TCP].sport, 20)
3065 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3066 self.assertEqual(data, p[Raw].load)
3068 def test_port_restricted(self):
3069 """ Port restricted NAT44 (MAP-E CE) """
3070 self.nat44_add_address(self.nat_addr)
3071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3074 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3075 "psid-offset 6 psid-len 6")
3077 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3078 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3079 TCP(sport=4567, dport=22))
3080 self.pg0.add_stream(p)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg1.get_capture(1)
3088 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3089 self.assertEqual(ip.src, self.nat_addr)
3090 self.assertEqual(tcp.dport, 22)
3091 self.assertNotEqual(tcp.sport, 4567)
3092 self.assertEqual((tcp.sport >> 6) & 63, 10)
3093 self.assert_packet_checksums_valid(p)
3095 self.logger.error(ppp("Unexpected or invalid packet:", p))
3098 def test_ipfix_max_frags(self):
3099 """ IPFIX logging maximum fragments pending reassembly exceeded """
3100 self.nat44_add_address(self.nat_addr)
3101 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3102 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3104 self.vapi.nat_set_reass(max_frag=0)
3105 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3106 src_address=self.pg3.local_ip4n,
3108 template_interval=10)
3109 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3110 src_port=self.ipfix_src_port)
3112 data = "A" * 4 + "B" * 16 + "C" * 3
3113 self.tcp_port_in = random.randint(1025, 65535)
3114 pkts = self.create_stream_frag(self.pg0,
3115 self.pg1.remote_ip4,
3119 self.pg0.add_stream(pkts[-1])
3120 self.pg_enable_capture(self.pg_interfaces)
3122 self.pg1.assert_nothing_captured()
3124 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3125 capture = self.pg3.get_capture(9)
3126 ipfix = IPFIXDecoder()
3127 # first load template
3129 self.assertTrue(p.haslayer(IPFIX))
3130 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3131 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3132 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3133 self.assertEqual(p[UDP].dport, 4739)
3134 self.assertEqual(p[IPFIX].observationDomainID,
3135 self.ipfix_domain_id)
3136 if p.haslayer(Template):
3137 ipfix.add_template(p.getlayer(Template))
3138 # verify events in data set
3140 if p.haslayer(Data):
3141 data = ipfix.decode_data_set(p.getlayer(Set))
3142 self.verify_ipfix_max_fragments_ip4(data, 0,
3143 self.pg0.remote_ip4n)
3146 super(TestNAT44, self).tearDown()
3147 if not self.vpp_dead:
3148 self.logger.info(self.vapi.cli("show nat44 addresses"))
3149 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3150 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3151 self.logger.info(self.vapi.cli("show nat44 interface address"))
3152 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3153 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3154 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3155 self.vapi.cli("nat addr-port-assignment-alg default")
3157 self.vapi.cli("clear logging")
3160 class TestNAT44EndpointDependent(MethodHolder):
3161 """ Endpoint-Dependent mapping and filtering test cases """
3164 def setUpConstants(cls):
3165 super(TestNAT44EndpointDependent, cls).setUpConstants()
3166 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3169 def setUpClass(cls):
3170 super(TestNAT44EndpointDependent, cls).setUpClass()
3171 cls.vapi.cli("set log class nat level debug")
3173 cls.tcp_port_in = 6303
3174 cls.tcp_port_out = 6303
3175 cls.udp_port_in = 6304
3176 cls.udp_port_out = 6304
3177 cls.icmp_id_in = 6305
3178 cls.icmp_id_out = 6305
3179 cls.nat_addr = '10.0.0.3'
3180 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3181 cls.ipfix_src_port = 4739
3182 cls.ipfix_domain_id = 1
3183 cls.tcp_external_port = 80
3185 cls.create_pg_interfaces(range(5))
3186 cls.interfaces = list(cls.pg_interfaces[0:3])
3188 for i in cls.interfaces:
3193 cls.pg0.generate_remote_hosts(3)
3194 cls.pg0.configure_ipv4_neighbors()
3198 cls.pg4.generate_remote_hosts(2)
3199 cls.pg4.config_ip4()
3200 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3201 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3205 cls.pg4.resolve_arp()
3206 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3207 cls.pg4.resolve_arp()
3210 super(TestNAT44EndpointDependent, cls).tearDownClass()
3213 def test_dynamic(self):
3214 """ NAT44 dynamic translation test """
3216 self.nat44_add_address(self.nat_addr)
3217 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3218 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3222 pkts = self.create_stream_in(self.pg0, self.pg1)
3223 self.pg0.add_stream(pkts)
3224 self.pg_enable_capture(self.pg_interfaces)
3226 capture = self.pg1.get_capture(len(pkts))
3227 self.verify_capture_out(capture)
3230 pkts = self.create_stream_out(self.pg1)
3231 self.pg1.add_stream(pkts)
3232 self.pg_enable_capture(self.pg_interfaces)
3234 capture = self.pg0.get_capture(len(pkts))
3235 self.verify_capture_in(capture, self.pg0)
3237 def test_forwarding(self):
3238 """ NAT44 forwarding test """
3240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3243 self.vapi.nat44_forwarding_enable_disable(1)
3245 real_ip = self.pg0.remote_ip4n
3246 alias_ip = self.nat_addr_n
3247 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3248 external_ip=alias_ip)
3251 # in2out - static mapping match
3253 pkts = self.create_stream_out(self.pg1)
3254 self.pg1.add_stream(pkts)
3255 self.pg_enable_capture(self.pg_interfaces)
3257 capture = self.pg0.get_capture(len(pkts))
3258 self.verify_capture_in(capture, self.pg0)
3260 pkts = self.create_stream_in(self.pg0, self.pg1)
3261 self.pg0.add_stream(pkts)
3262 self.pg_enable_capture(self.pg_interfaces)
3264 capture = self.pg1.get_capture(len(pkts))
3265 self.verify_capture_out(capture, same_port=True)
3267 # in2out - no static mapping match
3269 host0 = self.pg0.remote_hosts[0]
3270 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3272 pkts = self.create_stream_out(self.pg1,
3273 dst_ip=self.pg0.remote_ip4,
3274 use_inside_ports=True)
3275 self.pg1.add_stream(pkts)
3276 self.pg_enable_capture(self.pg_interfaces)
3278 capture = self.pg0.get_capture(len(pkts))
3279 self.verify_capture_in(capture, self.pg0)
3281 pkts = self.create_stream_in(self.pg0, self.pg1)
3282 self.pg0.add_stream(pkts)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg1.get_capture(len(pkts))
3286 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3289 self.pg0.remote_hosts[0] = host0
3291 user = self.pg0.remote_hosts[1]
3292 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3293 self.assertEqual(len(sessions), 3)
3294 self.assertTrue(sessions[0].ext_host_valid)
3295 self.vapi.nat44_del_session(
3296 sessions[0].inside_ip_address,
3297 sessions[0].inside_port,
3298 sessions[0].protocol,
3299 ext_host_address=sessions[0].ext_host_address,
3300 ext_host_port=sessions[0].ext_host_port)
3301 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3302 self.assertEqual(len(sessions), 2)
3305 self.vapi.nat44_forwarding_enable_disable(0)
3306 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3307 external_ip=alias_ip,
3310 def test_static_lb(self):
3311 """ NAT44 local service load balancing """
3312 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3315 server1 = self.pg0.remote_hosts[0]
3316 server2 = self.pg0.remote_hosts[1]
3318 locals = [{'addr': server1.ip4n,
3321 {'addr': server2.ip4n,
3325 self.nat44_add_address(self.nat_addr)
3326 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3329 local_num=len(locals),
3331 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3332 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3335 # from client to service
3336 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3337 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3338 TCP(sport=12345, dport=external_port))
3339 self.pg1.add_stream(p)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg0.get_capture(1)
3348 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3349 if ip.dst == server1.ip4:
3353 self.assertEqual(tcp.dport, local_port)
3354 self.assert_packet_checksums_valid(p)
3356 self.logger.error(ppp("Unexpected or invalid packet:", p))
3359 # from service back to client
3360 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3361 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3362 TCP(sport=local_port, dport=12345))
3363 self.pg0.add_stream(p)
3364 self.pg_enable_capture(self.pg_interfaces)
3366 capture = self.pg1.get_capture(1)
3371 self.assertEqual(ip.src, self.nat_addr)
3372 self.assertEqual(tcp.sport, external_port)
3373 self.assert_packet_checksums_valid(p)
3375 self.logger.error(ppp("Unexpected or invalid packet:", p))
3378 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3379 self.assertEqual(len(sessions), 1)
3380 self.assertTrue(sessions[0].ext_host_valid)
3381 self.vapi.nat44_del_session(
3382 sessions[0].inside_ip_address,
3383 sessions[0].inside_port,
3384 sessions[0].protocol,
3385 ext_host_address=sessions[0].ext_host_address,
3386 ext_host_port=sessions[0].ext_host_port)
3387 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3388 self.assertEqual(len(sessions), 0)
3390 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3391 def test_static_lb_multi_clients(self):
3392 """ NAT44 local service load balancing - multiple clients"""
3394 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3397 server1 = self.pg0.remote_hosts[0]
3398 server2 = self.pg0.remote_hosts[1]
3400 locals = [{'addr': server1.ip4n,
3403 {'addr': server2.ip4n,
3407 self.nat44_add_address(self.nat_addr)
3408 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3411 local_num=len(locals),
3413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3419 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3421 for client in clients:
3422 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3423 IP(src=client, dst=self.nat_addr) /
3424 TCP(sport=12345, dport=external_port))
3426 self.pg1.add_stream(pkts)
3427 self.pg_enable_capture(self.pg_interfaces)
3429 capture = self.pg0.get_capture(len(pkts))
3431 if p[IP].dst == server1.ip4:
3435 self.assertTrue(server1_n > server2_n)
3437 def test_static_lb_2(self):
3438 """ NAT44 local service load balancing (asymmetrical rule) """
3439 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3442 server1 = self.pg0.remote_hosts[0]
3443 server2 = self.pg0.remote_hosts[1]
3445 locals = [{'addr': server1.ip4n,
3448 {'addr': server2.ip4n,
3452 self.vapi.nat44_forwarding_enable_disable(1)
3453 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3457 local_num=len(locals),
3459 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3460 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3463 # from client to service
3464 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3465 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3466 TCP(sport=12345, dport=external_port))
3467 self.pg1.add_stream(p)
3468 self.pg_enable_capture(self.pg_interfaces)
3470 capture = self.pg0.get_capture(1)
3476 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3477 if ip.dst == server1.ip4:
3481 self.assertEqual(tcp.dport, local_port)
3482 self.assert_packet_checksums_valid(p)
3484 self.logger.error(ppp("Unexpected or invalid packet:", p))
3487 # from service back to client
3488 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3489 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3490 TCP(sport=local_port, dport=12345))
3491 self.pg0.add_stream(p)
3492 self.pg_enable_capture(self.pg_interfaces)
3494 capture = self.pg1.get_capture(1)
3499 self.assertEqual(ip.src, self.nat_addr)
3500 self.assertEqual(tcp.sport, external_port)
3501 self.assert_packet_checksums_valid(p)
3503 self.logger.error(ppp("Unexpected or invalid packet:", p))
3506 # from client to server (no translation)
3507 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3508 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3509 TCP(sport=12346, dport=local_port))
3510 self.pg1.add_stream(p)
3511 self.pg_enable_capture(self.pg_interfaces)
3513 capture = self.pg0.get_capture(1)
3519 self.assertEqual(ip.dst, server1.ip4)
3520 self.assertEqual(tcp.dport, local_port)
3521 self.assert_packet_checksums_valid(p)
3523 self.logger.error(ppp("Unexpected or invalid packet:", p))
3526 # from service back to client (no translation)
3527 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3528 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3529 TCP(sport=local_port, dport=12346))
3530 self.pg0.add_stream(p)
3531 self.pg_enable_capture(self.pg_interfaces)
3533 capture = self.pg1.get_capture(1)
3538 self.assertEqual(ip.src, server1.ip4)
3539 self.assertEqual(tcp.sport, local_port)
3540 self.assert_packet_checksums_valid(p)
3542 self.logger.error(ppp("Unexpected or invalid packet:", p))
3545 def test_unknown_proto(self):
3546 """ NAT44 translate packet with unknown protocol """
3547 self.nat44_add_address(self.nat_addr)
3548 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3549 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3553 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3554 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3555 TCP(sport=self.tcp_port_in, dport=20))
3556 self.pg0.add_stream(p)
3557 self.pg_enable_capture(self.pg_interfaces)
3559 p = self.pg1.get_capture(1)
3561 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3562 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3564 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3565 TCP(sport=1234, dport=1234))
3566 self.pg0.add_stream(p)
3567 self.pg_enable_capture(self.pg_interfaces)
3569 p = self.pg1.get_capture(1)
3572 self.assertEqual(packet[IP].src, self.nat_addr)
3573 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3574 self.assertTrue(packet.haslayer(GRE))
3575 self.assert_packet_checksums_valid(packet)
3577 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3581 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3582 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3584 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3585 TCP(sport=1234, dport=1234))
3586 self.pg1.add_stream(p)
3587 self.pg_enable_capture(self.pg_interfaces)
3589 p = self.pg0.get_capture(1)
3592 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3593 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3594 self.assertTrue(packet.haslayer(GRE))
3595 self.assert_packet_checksums_valid(packet)
3597 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3600 def test_hairpinning_unknown_proto(self):
3601 """ NAT44 translate packet with unknown protocol - hairpinning """
3602 host = self.pg0.remote_hosts[0]
3603 server = self.pg0.remote_hosts[1]
3605 server_out_port = 8765
3606 server_nat_ip = "10.0.0.11"
3608 self.nat44_add_address(self.nat_addr)
3609 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3610 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3613 # add static mapping for server
3614 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3617 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3618 IP(src=host.ip4, dst=server_nat_ip) /
3619 TCP(sport=host_in_port, dport=server_out_port))
3620 self.pg0.add_stream(p)
3621 self.pg_enable_capture(self.pg_interfaces)
3623 self.pg0.get_capture(1)
3625 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3626 IP(src=host.ip4, dst=server_nat_ip) /
3628 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3629 TCP(sport=1234, dport=1234))
3630 self.pg0.add_stream(p)
3631 self.pg_enable_capture(self.pg_interfaces)
3633 p = self.pg0.get_capture(1)
3636 self.assertEqual(packet[IP].src, self.nat_addr)
3637 self.assertEqual(packet[IP].dst, server.ip4)
3638 self.assertTrue(packet.haslayer(GRE))
3639 self.assert_packet_checksums_valid(packet)
3641 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3645 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3646 IP(src=server.ip4, dst=self.nat_addr) /
3648 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3649 TCP(sport=1234, dport=1234))
3650 self.pg0.add_stream(p)
3651 self.pg_enable_capture(self.pg_interfaces)
3653 p = self.pg0.get_capture(1)
3656 self.assertEqual(packet[IP].src, server_nat_ip)
3657 self.assertEqual(packet[IP].dst, host.ip4)
3658 self.assertTrue(packet.haslayer(GRE))
3659 self.assert_packet_checksums_valid(packet)
3661 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3664 def test_output_feature_and_service(self):
3665 """ NAT44 interface output feature and services """
3666 external_addr = '1.2.3.4'
3670 self.vapi.nat44_forwarding_enable_disable(1)
3671 self.nat44_add_address(self.nat_addr)
3672 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3673 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3674 local_port, external_port,
3675 proto=IP_PROTOS.tcp, out2in_only=1)
3676 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3677 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3679 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3682 # from client to service
3683 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3684 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3685 TCP(sport=12345, dport=external_port))
3686 self.pg1.add_stream(p)
3687 self.pg_enable_capture(self.pg_interfaces)
3689 capture = self.pg0.get_capture(1)
3694 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3695 self.assertEqual(tcp.dport, local_port)
3696 self.assert_packet_checksums_valid(p)
3698 self.logger.error(ppp("Unexpected or invalid packet:", p))
3701 # from service back to client
3702 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3703 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3704 TCP(sport=local_port, dport=12345))
3705 self.pg0.add_stream(p)
3706 self.pg_enable_capture(self.pg_interfaces)
3708 capture = self.pg1.get_capture(1)
3713 self.assertEqual(ip.src, external_addr)
3714 self.assertEqual(tcp.sport, external_port)
3715 self.assert_packet_checksums_valid(p)
3717 self.logger.error(ppp("Unexpected or invalid packet:", p))
3720 # from local network host to external network
3721 pkts = self.create_stream_in(self.pg0, self.pg1)
3722 self.pg0.add_stream(pkts)
3723 self.pg_enable_capture(self.pg_interfaces)
3725 capture = self.pg1.get_capture(len(pkts))
3726 self.verify_capture_out(capture)
3727 pkts = self.create_stream_in(self.pg0, self.pg1)
3728 self.pg0.add_stream(pkts)
3729 self.pg_enable_capture(self.pg_interfaces)
3731 capture = self.pg1.get_capture(len(pkts))
3732 self.verify_capture_out(capture)
3734 # from external network back to local network host
3735 pkts = self.create_stream_out(self.pg1)
3736 self.pg1.add_stream(pkts)
3737 self.pg_enable_capture(self.pg_interfaces)
3739 capture = self.pg0.get_capture(len(pkts))
3740 self.verify_capture_in(capture, self.pg0)
3742 def test_output_feature_and_service2(self):
3743 """ NAT44 interface output feature and service host direct access """
3744 self.vapi.nat44_forwarding_enable_disable(1)
3745 self.nat44_add_address(self.nat_addr)
3746 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3749 # session initiaded from service host - translate
3750 pkts = self.create_stream_in(self.pg0, self.pg1)
3751 self.pg0.add_stream(pkts)
3752 self.pg_enable_capture(self.pg_interfaces)
3754 capture = self.pg1.get_capture(len(pkts))
3755 self.verify_capture_out(capture)
3757 pkts = self.create_stream_out(self.pg1)
3758 self.pg1.add_stream(pkts)
3759 self.pg_enable_capture(self.pg_interfaces)
3761 capture = self.pg0.get_capture(len(pkts))
3762 self.verify_capture_in(capture, self.pg0)
3764 # session initiaded from remote host - do not translate
3765 self.tcp_port_in = 60303
3766 self.udp_port_in = 60304
3767 self.icmp_id_in = 60305
3768 pkts = self.create_stream_out(self.pg1,
3769 self.pg0.remote_ip4,
3770 use_inside_ports=True)
3771 self.pg1.add_stream(pkts)
3772 self.pg_enable_capture(self.pg_interfaces)
3774 capture = self.pg0.get_capture(len(pkts))
3775 self.verify_capture_in(capture, self.pg0)
3777 pkts = self.create_stream_in(self.pg0, self.pg1)
3778 self.pg0.add_stream(pkts)
3779 self.pg_enable_capture(self.pg_interfaces)
3781 capture = self.pg1.get_capture(len(pkts))
3782 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3785 def test_output_feature_and_service3(self):
3786 """ NAT44 interface output feature and DST NAT """
3787 external_addr = '1.2.3.4'
3791 self.vapi.nat44_forwarding_enable_disable(1)
3792 self.nat44_add_address(self.nat_addr)
3793 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3794 local_port, external_port,
3795 proto=IP_PROTOS.tcp, out2in_only=1)
3796 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3799 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3802 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3803 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3804 TCP(sport=12345, dport=external_port))
3805 self.pg0.add_stream(p)
3806 self.pg_enable_capture(self.pg_interfaces)
3808 capture = self.pg1.get_capture(1)
3813 self.assertEqual(ip.src, self.pg0.remote_ip4)
3814 self.assertEqual(tcp.sport, 12345)
3815 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3816 self.assertEqual(tcp.dport, local_port)
3817 self.assert_packet_checksums_valid(p)
3819 self.logger.error(ppp("Unexpected or invalid packet:", p))
3822 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3823 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3824 TCP(sport=local_port, dport=12345))
3825 self.pg1.add_stream(p)
3826 self.pg_enable_capture(self.pg_interfaces)
3828 capture = self.pg0.get_capture(1)
3833 self.assertEqual(ip.src, external_addr)
3834 self.assertEqual(tcp.sport, external_port)
3835 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3836 self.assertEqual(tcp.dport, 12345)
3837 self.assert_packet_checksums_valid(p)
3839 self.logger.error(ppp("Unexpected or invalid packet:", p))
3842 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3844 twice_nat_addr = '10.0.1.3'
3852 port_in1 = port_in+1
3853 port_in2 = port_in+2
3858 server1 = self.pg0.remote_hosts[0]
3859 server2 = self.pg0.remote_hosts[1]
3871 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3874 self.nat44_add_address(self.nat_addr)
3875 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3877 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3879 proto=IP_PROTOS.tcp,
3880 twice_nat=int(not self_twice_nat),
3881 self_twice_nat=int(self_twice_nat))
3883 locals = [{'addr': server1.ip4n,
3886 {'addr': server2.ip4n,
3889 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3890 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3894 not self_twice_nat),
3897 local_num=len(locals),
3899 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3900 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3907 assert client_id is not None
3909 client = self.pg0.remote_hosts[0]
3910 elif client_id == 2:
3911 client = self.pg0.remote_hosts[1]
3913 client = pg1.remote_hosts[0]
3914 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3915 IP(src=client.ip4, dst=self.nat_addr) /
3916 TCP(sport=eh_port_out, dport=port_out))
3918 self.pg_enable_capture(self.pg_interfaces)
3920 capture = pg0.get_capture(1)
3926 if ip.dst == server1.ip4:
3932 self.assertEqual(ip.dst, server.ip4)
3934 self.assertIn(tcp.dport, [port_in1, port_in2])
3936 self.assertEqual(tcp.dport, port_in)
3938 self.assertEqual(ip.src, twice_nat_addr)
3939 self.assertNotEqual(tcp.sport, eh_port_out)
3941 self.assertEqual(ip.src, client.ip4)
3942 self.assertEqual(tcp.sport, eh_port_out)
3944 eh_port_in = tcp.sport
3945 saved_port_in = tcp.dport
3946 self.assert_packet_checksums_valid(p)
3948 self.logger.error(ppp("Unexpected or invalid packet:", p))
3951 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3952 IP(src=server.ip4, dst=eh_addr_in) /
3953 TCP(sport=saved_port_in, dport=eh_port_in))
3955 self.pg_enable_capture(self.pg_interfaces)
3957 capture = pg1.get_capture(1)
3962 self.assertEqual(ip.dst, client.ip4)
3963 self.assertEqual(ip.src, self.nat_addr)
3964 self.assertEqual(tcp.dport, eh_port_out)
3965 self.assertEqual(tcp.sport, port_out)
3966 self.assert_packet_checksums_valid(p)
3968 self.logger.error(ppp("Unexpected or invalid packet:", p))
3972 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3973 self.assertEqual(len(sessions), 1)
3974 self.assertTrue(sessions[0].ext_host_valid)
3975 self.assertTrue(sessions[0].is_twicenat)
3976 self.vapi.nat44_del_session(
3977 sessions[0].inside_ip_address,
3978 sessions[0].inside_port,
3979 sessions[0].protocol,
3980 ext_host_address=sessions[0].ext_host_nat_address,
3981 ext_host_port=sessions[0].ext_host_nat_port)
3982 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3983 self.assertEqual(len(sessions), 0)
3985 def test_twice_nat(self):
3987 self.twice_nat_common()
3989 def test_self_twice_nat_positive(self):
3990 """ Self Twice NAT44 (positive test) """
3991 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3993 def test_self_twice_nat_negative(self):
3994 """ Self Twice NAT44 (negative test) """
3995 self.twice_nat_common(self_twice_nat=True)
3997 def test_twice_nat_lb(self):
3998 """ Twice NAT44 local service load balancing """
3999 self.twice_nat_common(lb=True)
4001 def test_self_twice_nat_lb_positive(self):
4002 """ Self Twice NAT44 local service load balancing (positive test) """
4003 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4006 def test_self_twice_nat_lb_negative(self):
4007 """ Self Twice NAT44 local service load balancing (negative test) """
4008 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4011 def test_twice_nat_interface_addr(self):
4012 """ Acquire twice NAT44 addresses from interface """
4013 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4015 # no address in NAT pool
4016 adresses = self.vapi.nat44_address_dump()
4017 self.assertEqual(0, len(adresses))
4019 # configure interface address and check NAT address pool
4020 self.pg3.config_ip4()
4021 adresses = self.vapi.nat44_address_dump()
4022 self.assertEqual(1, len(adresses))
4023 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4024 self.assertEqual(adresses[0].twice_nat, 1)
4026 # remove interface address and check NAT address pool
4027 self.pg3.unconfig_ip4()
4028 adresses = self.vapi.nat44_address_dump()
4029 self.assertEqual(0, len(adresses))
4031 def test_tcp_session_close_in(self):
4032 """ Close TCP session from inside network """
4033 self.tcp_port_out = 10505
4034 self.nat44_add_address(self.nat_addr)
4035 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4039 proto=IP_PROTOS.tcp,
4041 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4042 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4045 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4046 start_sessnum = len(sessions)
4048 self.initiate_tcp_session(self.pg0, self.pg1)
4050 # FIN packet in -> out
4051 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4052 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4053 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4054 flags="FA", seq=100, ack=300))
4055 self.pg0.add_stream(p)
4056 self.pg_enable_capture(self.pg_interfaces)
4058 self.pg1.get_capture(1)
4062 # ACK packet out -> in
4063 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4064 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4065 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4066 flags="A", seq=300, ack=101))
4069 # FIN packet out -> in
4070 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4071 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4072 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4073 flags="FA", seq=300, ack=101))
4076 self.pg1.add_stream(pkts)
4077 self.pg_enable_capture(self.pg_interfaces)
4079 self.pg0.get_capture(2)
4081 # ACK packet in -> out
4082 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4083 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4084 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4085 flags="A", seq=101, ack=301))
4086 self.pg0.add_stream(p)
4087 self.pg_enable_capture(self.pg_interfaces)
4089 self.pg1.get_capture(1)
4091 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4093 self.assertEqual(len(sessions) - start_sessnum, 0)
4095 def test_tcp_session_close_out(self):
4096 """ Close TCP session from outside network """
4097 self.tcp_port_out = 10505
4098 self.nat44_add_address(self.nat_addr)
4099 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4103 proto=IP_PROTOS.tcp,
4105 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4106 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4109 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4110 start_sessnum = len(sessions)
4112 self.initiate_tcp_session(self.pg0, self.pg1)
4114 # FIN packet out -> in
4115 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4116 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4117 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4118 flags="FA", seq=100, ack=300))
4119 self.pg1.add_stream(p)
4120 self.pg_enable_capture(self.pg_interfaces)
4122 self.pg0.get_capture(1)
4124 # FIN+ACK packet in -> out
4125 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4126 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4127 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4128 flags="FA", seq=300, ack=101))
4130 self.pg0.add_stream(p)
4131 self.pg_enable_capture(self.pg_interfaces)
4133 self.pg1.get_capture(1)
4135 # ACK packet out -> in
4136 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4137 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4138 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4139 flags="A", seq=101, ack=301))
4140 self.pg1.add_stream(p)
4141 self.pg_enable_capture(self.pg_interfaces)
4143 self.pg0.get_capture(1)
4145 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4147 self.assertEqual(len(sessions) - start_sessnum, 0)
4149 def test_tcp_session_close_simultaneous(self):
4150 """ Close TCP session from inside network """
4151 self.tcp_port_out = 10505
4152 self.nat44_add_address(self.nat_addr)
4153 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4157 proto=IP_PROTOS.tcp,
4159 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4160 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4163 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4164 start_sessnum = len(sessions)
4166 self.initiate_tcp_session(self.pg0, self.pg1)
4168 # FIN packet in -> out
4169 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4170 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4171 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4172 flags="FA", seq=100, ack=300))
4173 self.pg0.add_stream(p)
4174 self.pg_enable_capture(self.pg_interfaces)
4176 self.pg1.get_capture(1)
4178 # FIN packet out -> in
4179 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4180 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4181 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4182 flags="FA", seq=300, ack=100))
4183 self.pg1.add_stream(p)
4184 self.pg_enable_capture(self.pg_interfaces)
4186 self.pg0.get_capture(1)
4188 # ACK packet in -> out
4189 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4190 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4191 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4192 flags="A", seq=101, ack=301))
4193 self.pg0.add_stream(p)
4194 self.pg_enable_capture(self.pg_interfaces)
4196 self.pg1.get_capture(1)
4198 # ACK packet out -> in
4199 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4200 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4201 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4202 flags="A", seq=301, ack=101))
4203 self.pg1.add_stream(p)
4204 self.pg_enable_capture(self.pg_interfaces)
4206 self.pg0.get_capture(1)
4208 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4210 self.assertEqual(len(sessions) - start_sessnum, 0)
4212 def test_one_armed_nat44_static(self):
4213 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4214 remote_host = self.pg4.remote_hosts[0]
4215 local_host = self.pg4.remote_hosts[1]
4220 self.vapi.nat44_forwarding_enable_disable(1)
4221 self.nat44_add_address(self.nat_addr, twice_nat=1)
4222 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4223 local_port, external_port,
4224 proto=IP_PROTOS.tcp, out2in_only=1,
4226 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4227 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4230 # from client to service
4231 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4232 IP(src=remote_host.ip4, dst=self.nat_addr) /
4233 TCP(sport=12345, dport=external_port))
4234 self.pg4.add_stream(p)
4235 self.pg_enable_capture(self.pg_interfaces)
4237 capture = self.pg4.get_capture(1)
4242 self.assertEqual(ip.dst, local_host.ip4)
4243 self.assertEqual(ip.src, self.nat_addr)
4244 self.assertEqual(tcp.dport, local_port)
4245 self.assertNotEqual(tcp.sport, 12345)
4246 eh_port_in = tcp.sport
4247 self.assert_packet_checksums_valid(p)
4249 self.logger.error(ppp("Unexpected or invalid packet:", p))
4252 # from service back to client
4253 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4254 IP(src=local_host.ip4, dst=self.nat_addr) /
4255 TCP(sport=local_port, dport=eh_port_in))
4256 self.pg4.add_stream(p)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 capture = self.pg4.get_capture(1)
4264 self.assertEqual(ip.src, self.nat_addr)
4265 self.assertEqual(ip.dst, remote_host.ip4)
4266 self.assertEqual(tcp.sport, external_port)
4267 self.assertEqual(tcp.dport, 12345)
4268 self.assert_packet_checksums_valid(p)
4270 self.logger.error(ppp("Unexpected or invalid packet:", p))
4273 def test_static_with_port_out2(self):
4274 """ 1:1 NAPT asymmetrical rule """
4279 self.vapi.nat44_forwarding_enable_disable(1)
4280 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4281 local_port, external_port,
4282 proto=IP_PROTOS.tcp, out2in_only=1)
4283 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4284 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4287 # from client to service
4288 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4289 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4290 TCP(sport=12345, dport=external_port))
4291 self.pg1.add_stream(p)
4292 self.pg_enable_capture(self.pg_interfaces)
4294 capture = self.pg0.get_capture(1)
4299 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4300 self.assertEqual(tcp.dport, local_port)
4301 self.assert_packet_checksums_valid(p)
4303 self.logger.error(ppp("Unexpected or invalid packet:", p))
4307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4308 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4309 ICMP(type=11) / capture[0][IP])
4310 self.pg0.add_stream(p)
4311 self.pg_enable_capture(self.pg_interfaces)
4313 capture = self.pg1.get_capture(1)
4316 self.assertEqual(p[IP].src, self.nat_addr)
4318 self.assertEqual(inner.dst, self.nat_addr)
4319 self.assertEqual(inner[TCPerror].dport, external_port)
4321 self.logger.error(ppp("Unexpected or invalid packet:", p))
4324 # from service back to client
4325 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4326 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4327 TCP(sport=local_port, dport=12345))
4328 self.pg0.add_stream(p)
4329 self.pg_enable_capture(self.pg_interfaces)
4331 capture = self.pg1.get_capture(1)
4336 self.assertEqual(ip.src, self.nat_addr)
4337 self.assertEqual(tcp.sport, external_port)
4338 self.assert_packet_checksums_valid(p)
4340 self.logger.error(ppp("Unexpected or invalid packet:", p))
4344 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4345 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4346 ICMP(type=11) / capture[0][IP])
4347 self.pg1.add_stream(p)
4348 self.pg_enable_capture(self.pg_interfaces)
4350 capture = self.pg0.get_capture(1)
4353 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4355 self.assertEqual(inner.src, self.pg0.remote_ip4)
4356 self.assertEqual(inner[TCPerror].sport, local_port)
4358 self.logger.error(ppp("Unexpected or invalid packet:", p))
4361 # from client to server (no translation)
4362 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4363 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4364 TCP(sport=12346, dport=local_port))
4365 self.pg1.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 capture = self.pg0.get_capture(1)
4373 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4374 self.assertEqual(tcp.dport, local_port)
4375 self.assert_packet_checksums_valid(p)
4377 self.logger.error(ppp("Unexpected or invalid packet:", p))
4380 # from service back to client (no translation)
4381 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4382 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4383 TCP(sport=local_port, dport=12346))
4384 self.pg0.add_stream(p)
4385 self.pg_enable_capture(self.pg_interfaces)
4387 capture = self.pg1.get_capture(1)
4392 self.assertEqual(ip.src, self.pg0.remote_ip4)
4393 self.assertEqual(tcp.sport, local_port)
4394 self.assert_packet_checksums_valid(p)
4396 self.logger.error(ppp("Unexpected or invalid packet:", p))
4400 super(TestNAT44EndpointDependent, self).tearDown()
4401 if not self.vpp_dead:
4402 self.logger.info(self.vapi.cli("show nat44 addresses"))
4403 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4404 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4405 self.logger.info(self.vapi.cli("show nat44 interface address"))
4406 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4407 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4409 self.vapi.cli("clear logging")
4412 class TestNAT44Out2InDPO(MethodHolder):
4413 """ NAT44 Test Cases using out2in DPO """
4416 def setUpConstants(cls):
4417 super(TestNAT44Out2InDPO, cls).setUpConstants()
4418 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4421 def setUpClass(cls):
4422 super(TestNAT44Out2InDPO, cls).setUpClass()
4423 cls.vapi.cli("set log class nat level debug")
4426 cls.tcp_port_in = 6303
4427 cls.tcp_port_out = 6303
4428 cls.udp_port_in = 6304
4429 cls.udp_port_out = 6304
4430 cls.icmp_id_in = 6305
4431 cls.icmp_id_out = 6305
4432 cls.nat_addr = '10.0.0.3'
4433 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4434 cls.dst_ip4 = '192.168.70.1'
4436 cls.create_pg_interfaces(range(2))
4439 cls.pg0.config_ip4()
4440 cls.pg0.resolve_arp()
4443 cls.pg1.config_ip6()
4444 cls.pg1.resolve_ndp()
4446 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4447 dst_address_length=0,
4448 next_hop_address=cls.pg1.remote_ip6n,
4449 next_hop_sw_if_index=cls.pg1.sw_if_index)
4452 super(TestNAT44Out2InDPO, cls).tearDownClass()
4455 def configure_xlat(self):
4456 self.dst_ip6_pfx = '1:2:3::'
4457 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4459 self.dst_ip6_pfx_len = 96
4460 self.src_ip6_pfx = '4:5:6::'
4461 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4463 self.src_ip6_pfx_len = 96
4464 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4465 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4466 '\x00\x00\x00\x00', 0, is_translation=1,
4469 def test_464xlat_ce(self):
4470 """ Test 464XLAT CE with NAT44 """
4472 self.configure_xlat()
4474 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4475 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4477 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4478 self.dst_ip6_pfx_len)
4479 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4480 self.src_ip6_pfx_len)
4483 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4484 self.pg0.add_stream(pkts)
4485 self.pg_enable_capture(self.pg_interfaces)
4487 capture = self.pg1.get_capture(len(pkts))
4488 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4491 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4493 self.pg1.add_stream(pkts)
4494 self.pg_enable_capture(self.pg_interfaces)
4496 capture = self.pg0.get_capture(len(pkts))
4497 self.verify_capture_in(capture, self.pg0)
4499 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4501 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4502 self.nat_addr_n, is_add=0)
4504 def test_464xlat_ce_no_nat(self):
4505 """ Test 464XLAT CE without NAT44 """
4507 self.configure_xlat()
4509 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4510 self.dst_ip6_pfx_len)
4511 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4512 self.src_ip6_pfx_len)
4514 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4515 self.pg0.add_stream(pkts)
4516 self.pg_enable_capture(self.pg_interfaces)
4518 capture = self.pg1.get_capture(len(pkts))
4519 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4520 nat_ip=out_dst_ip6, same_port=True)
4522 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4523 self.pg1.add_stream(pkts)
4524 self.pg_enable_capture(self.pg_interfaces)
4526 capture = self.pg0.get_capture(len(pkts))
4527 self.verify_capture_in(capture, self.pg0)
4530 class TestDeterministicNAT(MethodHolder):
4531 """ Deterministic NAT Test Cases """
4534 def setUpConstants(cls):
4535 super(TestDeterministicNAT, cls).setUpConstants()
4536 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4539 def setUpClass(cls):
4540 super(TestDeterministicNAT, cls).setUpClass()
4541 cls.vapi.cli("set log class nat level debug")
4544 cls.tcp_port_in = 6303
4545 cls.tcp_external_port = 6303
4546 cls.udp_port_in = 6304
4547 cls.udp_external_port = 6304
4548 cls.icmp_id_in = 6305
4549 cls.nat_addr = '10.0.0.3'
4551 cls.create_pg_interfaces(range(3))
4552 cls.interfaces = list(cls.pg_interfaces)
4554 for i in cls.interfaces:
4559 cls.pg0.generate_remote_hosts(2)
4560 cls.pg0.configure_ipv4_neighbors()
4563 super(TestDeterministicNAT, cls).tearDownClass()
4566 def create_stream_in(self, in_if, out_if, ttl=64):
4568 Create packet stream for inside network
4570 :param in_if: Inside interface
4571 :param out_if: Outside interface
4572 :param ttl: TTL of generated packets
4576 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4577 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4578 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4582 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4583 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4584 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4588 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4589 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4590 ICMP(id=self.icmp_id_in, type='echo-request'))
4595 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4597 Create packet stream for outside network
4599 :param out_if: Outside interface
4600 :param dst_ip: Destination IP address (Default use global NAT address)
4601 :param ttl: TTL of generated packets
4604 dst_ip = self.nat_addr
4607 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4608 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4609 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4613 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4614 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4615 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4619 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4620 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4621 ICMP(id=self.icmp_external_id, type='echo-reply'))
4626 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4628 Verify captured packets on outside network
4630 :param capture: Captured packets
4631 :param nat_ip: Translated IP address (Default use global NAT address)
4632 :param same_port: Sorce port number is not translated (Default False)
4633 :param packet_num: Expected number of packets (Default 3)
4636 nat_ip = self.nat_addr
4637 self.assertEqual(packet_num, len(capture))
4638 for packet in capture:
4640 self.assertEqual(packet[IP].src, nat_ip)
4641 if packet.haslayer(TCP):
4642 self.tcp_port_out = packet[TCP].sport
4643 elif packet.haslayer(UDP):
4644 self.udp_port_out = packet[UDP].sport
4646 self.icmp_external_id = packet[ICMP].id
4648 self.logger.error(ppp("Unexpected or invalid packet "
4649 "(outside network):", packet))
4652 def verify_ipfix_max_entries_per_user(self, data):
4654 Verify IPFIX maximum entries per user exceeded event
4656 :param data: Decoded IPFIX data records
4658 self.assertEqual(1, len(data))
4661 self.assertEqual(ord(record[230]), 13)
4662 # natQuotaExceededEvent
4663 self.assertEqual('\x03\x00\x00\x00', record[466])
4665 self.assertEqual('\xe8\x03\x00\x00', record[473])
4667 self.assertEqual(self.pg0.remote_ip4n, record[8])
4669 def test_deterministic_mode(self):
4670 """ NAT plugin run deterministic mode """
4671 in_addr = '172.16.255.0'
4672 out_addr = '172.17.255.50'
4673 in_addr_t = '172.16.255.20'
4674 in_addr_n = socket.inet_aton(in_addr)
4675 out_addr_n = socket.inet_aton(out_addr)
4676 in_addr_t_n = socket.inet_aton(in_addr_t)
4680 nat_config = self.vapi.nat_show_config()
4681 self.assertEqual(1, nat_config.deterministic)
4683 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4685 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4686 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4687 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4688 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4690 deterministic_mappings = self.vapi.nat_det_map_dump()
4691 self.assertEqual(len(deterministic_mappings), 1)
4692 dsm = deterministic_mappings[0]
4693 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4694 self.assertEqual(in_plen, dsm.in_plen)
4695 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4696 self.assertEqual(out_plen, dsm.out_plen)
4698 self.clear_nat_det()
4699 deterministic_mappings = self.vapi.nat_det_map_dump()
4700 self.assertEqual(len(deterministic_mappings), 0)
4702 def test_set_timeouts(self):
4703 """ Set deterministic NAT timeouts """
4704 timeouts_before = self.vapi.nat_det_get_timeouts()
4706 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4707 timeouts_before.tcp_established + 10,
4708 timeouts_before.tcp_transitory + 10,
4709 timeouts_before.icmp + 10)
4711 timeouts_after = self.vapi.nat_det_get_timeouts()
4713 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4714 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4715 self.assertNotEqual(timeouts_before.tcp_established,
4716 timeouts_after.tcp_established)
4717 self.assertNotEqual(timeouts_before.tcp_transitory,
4718 timeouts_after.tcp_transitory)
4720 def test_det_in(self):
4721 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4723 nat_ip = "10.0.0.10"
4725 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4727 socket.inet_aton(nat_ip),
4729 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4730 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4734 pkts = self.create_stream_in(self.pg0, self.pg1)
4735 self.pg0.add_stream(pkts)
4736 self.pg_enable_capture(self.pg_interfaces)
4738 capture = self.pg1.get_capture(len(pkts))
4739 self.verify_capture_out(capture, nat_ip)
4742 pkts = self.create_stream_out(self.pg1, nat_ip)
4743 self.pg1.add_stream(pkts)
4744 self.pg_enable_capture(self.pg_interfaces)
4746 capture = self.pg0.get_capture(len(pkts))
4747 self.verify_capture_in(capture, self.pg0)
4750 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4751 self.assertEqual(len(sessions), 3)
4755 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4756 self.assertEqual(s.in_port, self.tcp_port_in)
4757 self.assertEqual(s.out_port, self.tcp_port_out)
4758 self.assertEqual(s.ext_port, self.tcp_external_port)
4762 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4763 self.assertEqual(s.in_port, self.udp_port_in)
4764 self.assertEqual(s.out_port, self.udp_port_out)
4765 self.assertEqual(s.ext_port, self.udp_external_port)
4769 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4770 self.assertEqual(s.in_port, self.icmp_id_in)
4771 self.assertEqual(s.out_port, self.icmp_external_id)
4773 def test_multiple_users(self):
4774 """ Deterministic NAT multiple users """
4776 nat_ip = "10.0.0.10"
4778 external_port = 6303
4780 host0 = self.pg0.remote_hosts[0]
4781 host1 = self.pg0.remote_hosts[1]
4783 self.vapi.nat_det_add_del_map(host0.ip4n,
4785 socket.inet_aton(nat_ip),
4787 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4788 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4792 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4793 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4794 TCP(sport=port_in, dport=external_port))
4795 self.pg0.add_stream(p)
4796 self.pg_enable_capture(self.pg_interfaces)
4798 capture = self.pg1.get_capture(1)
4803 self.assertEqual(ip.src, nat_ip)
4804 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4805 self.assertEqual(tcp.dport, external_port)
4806 port_out0 = tcp.sport
4808 self.logger.error(ppp("Unexpected or invalid packet:", p))
4812 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4813 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4814 TCP(sport=port_in, dport=external_port))
4815 self.pg0.add_stream(p)
4816 self.pg_enable_capture(self.pg_interfaces)
4818 capture = self.pg1.get_capture(1)
4823 self.assertEqual(ip.src, nat_ip)
4824 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4825 self.assertEqual(tcp.dport, external_port)
4826 port_out1 = tcp.sport
4828 self.logger.error(ppp("Unexpected or invalid packet:", p))
4831 dms = self.vapi.nat_det_map_dump()
4832 self.assertEqual(1, len(dms))
4833 self.assertEqual(2, dms[0].ses_num)
4836 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4837 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4838 TCP(sport=external_port, dport=port_out0))
4839 self.pg1.add_stream(p)
4840 self.pg_enable_capture(self.pg_interfaces)
4842 capture = self.pg0.get_capture(1)
4847 self.assertEqual(ip.src, self.pg1.remote_ip4)
4848 self.assertEqual(ip.dst, host0.ip4)
4849 self.assertEqual(tcp.dport, port_in)
4850 self.assertEqual(tcp.sport, external_port)
4852 self.logger.error(ppp("Unexpected or invalid packet:", p))
4856 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4857 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4858 TCP(sport=external_port, dport=port_out1))
4859 self.pg1.add_stream(p)
4860 self.pg_enable_capture(self.pg_interfaces)
4862 capture = self.pg0.get_capture(1)
4867 self.assertEqual(ip.src, self.pg1.remote_ip4)
4868 self.assertEqual(ip.dst, host1.ip4)
4869 self.assertEqual(tcp.dport, port_in)
4870 self.assertEqual(tcp.sport, external_port)
4872 self.logger.error(ppp("Unexpected or invalid packet", p))
4875 # session close api test
4876 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4878 self.pg1.remote_ip4n,
4880 dms = self.vapi.nat_det_map_dump()
4881 self.assertEqual(dms[0].ses_num, 1)
4883 self.vapi.nat_det_close_session_in(host0.ip4n,
4885 self.pg1.remote_ip4n,
4887 dms = self.vapi.nat_det_map_dump()
4888 self.assertEqual(dms[0].ses_num, 0)
4890 def test_tcp_session_close_detection_in(self):
4891 """ Deterministic NAT TCP session close from inside network """
4892 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4894 socket.inet_aton(self.nat_addr),
4896 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4897 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4900 self.initiate_tcp_session(self.pg0, self.pg1)
4902 # close the session from inside
4904 # FIN packet in -> out
4905 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4906 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4907 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4909 self.pg0.add_stream(p)
4910 self.pg_enable_capture(self.pg_interfaces)
4912 self.pg1.get_capture(1)
4916 # ACK packet out -> in
4917 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4918 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4919 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4923 # FIN packet out -> in
4924 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4925 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4926 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4930 self.pg1.add_stream(pkts)
4931 self.pg_enable_capture(self.pg_interfaces)
4933 self.pg0.get_capture(2)
4935 # ACK packet in -> out
4936 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4937 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4938 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4940 self.pg0.add_stream(p)
4941 self.pg_enable_capture(self.pg_interfaces)
4943 self.pg1.get_capture(1)
4945 # Check if deterministic NAT44 closed the session
4946 dms = self.vapi.nat_det_map_dump()
4947 self.assertEqual(0, dms[0].ses_num)
4949 self.logger.error("TCP session termination failed")
4952 def test_tcp_session_close_detection_out(self):
4953 """ Deterministic NAT TCP session close from outside network """
4954 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4956 socket.inet_aton(self.nat_addr),
4958 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4959 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4962 self.initiate_tcp_session(self.pg0, self.pg1)
4964 # close the session from outside
4966 # FIN packet out -> in
4967 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4968 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4969 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4971 self.pg1.add_stream(p)
4972 self.pg_enable_capture(self.pg_interfaces)
4974 self.pg0.get_capture(1)
4978 # ACK packet in -> out
4979 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4980 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4981 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4985 # ACK packet in -> out
4986 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4987 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4988 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4992 self.pg0.add_stream(pkts)
4993 self.pg_enable_capture(self.pg_interfaces)
4995 self.pg1.get_capture(2)
4997 # ACK packet out -> in
4998 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4999 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5000 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5002 self.pg1.add_stream(p)
5003 self.pg_enable_capture(self.pg_interfaces)
5005 self.pg0.get_capture(1)
5007 # Check if deterministic NAT44 closed the session
5008 dms = self.vapi.nat_det_map_dump()
5009 self.assertEqual(0, dms[0].ses_num)
5011 self.logger.error("TCP session termination failed")
5014 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5015 def test_session_timeout(self):
5016 """ Deterministic NAT session timeouts """
5017 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5019 socket.inet_aton(self.nat_addr),
5021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5025 self.initiate_tcp_session(self.pg0, self.pg1)
5026 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5027 pkts = self.create_stream_in(self.pg0, self.pg1)
5028 self.pg0.add_stream(pkts)
5029 self.pg_enable_capture(self.pg_interfaces)
5031 capture = self.pg1.get_capture(len(pkts))
5034 dms = self.vapi.nat_det_map_dump()
5035 self.assertEqual(0, dms[0].ses_num)
5037 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5038 def test_session_limit_per_user(self):
5039 """ Deterministic NAT maximum sessions per user limit """
5040 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5042 socket.inet_aton(self.nat_addr),
5044 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5045 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5047 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5048 src_address=self.pg2.local_ip4n,
5050 template_interval=10)
5051 self.vapi.nat_ipfix()
5054 for port in range(1025, 2025):
5055 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5056 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5057 UDP(sport=port, dport=port))
5060 self.pg0.add_stream(pkts)
5061 self.pg_enable_capture(self.pg_interfaces)
5063 capture = self.pg1.get_capture(len(pkts))
5065 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5066 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5067 UDP(sport=3001, dport=3002))
5068 self.pg0.add_stream(p)
5069 self.pg_enable_capture(self.pg_interfaces)
5071 capture = self.pg1.assert_nothing_captured()
5073 # verify ICMP error packet
5074 capture = self.pg0.get_capture(1)
5076 self.assertTrue(p.haslayer(ICMP))
5078 self.assertEqual(icmp.type, 3)
5079 self.assertEqual(icmp.code, 1)
5080 self.assertTrue(icmp.haslayer(IPerror))
5081 inner_ip = icmp[IPerror]
5082 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5083 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5085 dms = self.vapi.nat_det_map_dump()
5087 self.assertEqual(1000, dms[0].ses_num)
5089 # verify IPFIX logging
5090 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5092 capture = self.pg2.get_capture(2)
5093 ipfix = IPFIXDecoder()
5094 # first load template
5096 self.assertTrue(p.haslayer(IPFIX))
5097 if p.haslayer(Template):
5098 ipfix.add_template(p.getlayer(Template))
5099 # verify events in data set
5101 if p.haslayer(Data):
5102 data = ipfix.decode_data_set(p.getlayer(Set))
5103 self.verify_ipfix_max_entries_per_user(data)
5105 def clear_nat_det(self):
5107 Clear deterministic NAT configuration.
5109 self.vapi.nat_ipfix(enable=0)
5110 self.vapi.nat_det_set_timeouts()
5111 deterministic_mappings = self.vapi.nat_det_map_dump()
5112 for dsm in deterministic_mappings:
5113 self.vapi.nat_det_add_del_map(dsm.in_addr,
5119 interfaces = self.vapi.nat44_interface_dump()
5120 for intf in interfaces:
5121 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5126 super(TestDeterministicNAT, self).tearDown()
5127 if not self.vpp_dead:
5128 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5130 self.vapi.cli("show nat44 deterministic mappings"))
5132 self.vapi.cli("show nat44 deterministic timeouts"))
5134 self.vapi.cli("show nat44 deterministic sessions"))
5135 self.clear_nat_det()
5138 class TestNAT64(MethodHolder):
5139 """ NAT64 Test Cases """
5142 def setUpConstants(cls):
5143 super(TestNAT64, cls).setUpConstants()
5144 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5145 "nat64 st hash buckets 256", "}"])
5148 def setUpClass(cls):
5149 super(TestNAT64, cls).setUpClass()
5152 cls.tcp_port_in = 6303
5153 cls.tcp_port_out = 6303
5154 cls.udp_port_in = 6304
5155 cls.udp_port_out = 6304
5156 cls.icmp_id_in = 6305
5157 cls.icmp_id_out = 6305
5158 cls.nat_addr = '10.0.0.3'
5159 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5161 cls.vrf1_nat_addr = '10.0.10.3'
5162 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5164 cls.ipfix_src_port = 4739
5165 cls.ipfix_domain_id = 1
5167 cls.create_pg_interfaces(range(5))
5168 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5169 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5170 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5172 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5174 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5176 cls.pg0.generate_remote_hosts(2)
5178 for i in cls.ip6_interfaces:
5181 i.configure_ipv6_neighbors()
5183 for i in cls.ip4_interfaces:
5189 cls.pg3.config_ip4()
5190 cls.pg3.resolve_arp()
5191 cls.pg3.config_ip6()
5192 cls.pg3.configure_ipv6_neighbors()
5195 super(TestNAT64, cls).tearDownClass()
5198 def test_pool(self):
5199 """ Add/delete address to NAT64 pool """
5200 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5202 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5204 addresses = self.vapi.nat64_pool_addr_dump()
5205 self.assertEqual(len(addresses), 1)
5206 self.assertEqual(addresses[0].address, nat_addr)
5208 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5210 addresses = self.vapi.nat64_pool_addr_dump()
5211 self.assertEqual(len(addresses), 0)
5213 def test_interface(self):
5214 """ Enable/disable NAT64 feature on the interface """
5215 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5216 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5218 interfaces = self.vapi.nat64_interface_dump()
5219 self.assertEqual(len(interfaces), 2)
5222 for intf in interfaces:
5223 if intf.sw_if_index == self.pg0.sw_if_index:
5224 self.assertEqual(intf.is_inside, 1)
5226 elif intf.sw_if_index == self.pg1.sw_if_index:
5227 self.assertEqual(intf.is_inside, 0)
5229 self.assertTrue(pg0_found)
5230 self.assertTrue(pg1_found)
5232 features = self.vapi.cli("show interface features pg0")
5233 self.assertNotEqual(features.find('nat64-in2out'), -1)
5234 features = self.vapi.cli("show interface features pg1")
5235 self.assertNotEqual(features.find('nat64-out2in'), -1)
5237 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5238 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5240 interfaces = self.vapi.nat64_interface_dump()
5241 self.assertEqual(len(interfaces), 0)
5243 def test_static_bib(self):
5244 """ Add/delete static BIB entry """
5245 in_addr = socket.inet_pton(socket.AF_INET6,
5246 '2001:db8:85a3::8a2e:370:7334')
5247 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5250 proto = IP_PROTOS.tcp
5252 self.vapi.nat64_add_del_static_bib(in_addr,
5257 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5262 self.assertEqual(bibe.i_addr, in_addr)
5263 self.assertEqual(bibe.o_addr, out_addr)
5264 self.assertEqual(bibe.i_port, in_port)
5265 self.assertEqual(bibe.o_port, out_port)
5266 self.assertEqual(static_bib_num, 1)
5268 self.vapi.nat64_add_del_static_bib(in_addr,
5274 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5279 self.assertEqual(static_bib_num, 0)
5281 def test_set_timeouts(self):
5282 """ Set NAT64 timeouts """
5283 # verify default values
5284 timeouts = self.vapi.nat64_get_timeouts()
5285 self.assertEqual(timeouts.udp, 300)
5286 self.assertEqual(timeouts.icmp, 60)
5287 self.assertEqual(timeouts.tcp_trans, 240)
5288 self.assertEqual(timeouts.tcp_est, 7440)
5289 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5291 # set and verify custom values
5292 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5293 tcp_est=7450, tcp_incoming_syn=10)
5294 timeouts = self.vapi.nat64_get_timeouts()
5295 self.assertEqual(timeouts.udp, 200)
5296 self.assertEqual(timeouts.icmp, 30)
5297 self.assertEqual(timeouts.tcp_trans, 250)
5298 self.assertEqual(timeouts.tcp_est, 7450)
5299 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5301 def test_dynamic(self):
5302 """ NAT64 dynamic translation test """
5303 self.tcp_port_in = 6303
5304 self.udp_port_in = 6304
5305 self.icmp_id_in = 6305
5307 ses_num_start = self.nat64_get_ses_num()
5309 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5311 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5312 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5315 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5316 self.pg0.add_stream(pkts)
5317 self.pg_enable_capture(self.pg_interfaces)
5319 capture = self.pg1.get_capture(len(pkts))
5320 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5321 dst_ip=self.pg1.remote_ip4)
5324 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5325 self.pg1.add_stream(pkts)
5326 self.pg_enable_capture(self.pg_interfaces)
5328 capture = self.pg0.get_capture(len(pkts))
5329 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5330 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5333 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5334 self.pg0.add_stream(pkts)
5335 self.pg_enable_capture(self.pg_interfaces)
5337 capture = self.pg1.get_capture(len(pkts))
5338 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5339 dst_ip=self.pg1.remote_ip4)
5342 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5343 self.pg1.add_stream(pkts)
5344 self.pg_enable_capture(self.pg_interfaces)
5346 capture = self.pg0.get_capture(len(pkts))
5347 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5349 ses_num_end = self.nat64_get_ses_num()
5351 self.assertEqual(ses_num_end - ses_num_start, 3)
5353 # tenant with specific VRF
5354 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5355 self.vrf1_nat_addr_n,
5356 vrf_id=self.vrf1_id)
5357 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5359 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5360 self.pg2.add_stream(pkts)
5361 self.pg_enable_capture(self.pg_interfaces)
5363 capture = self.pg1.get_capture(len(pkts))
5364 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5365 dst_ip=self.pg1.remote_ip4)
5367 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5368 self.pg1.add_stream(pkts)
5369 self.pg_enable_capture(self.pg_interfaces)
5371 capture = self.pg2.get_capture(len(pkts))
5372 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5374 def test_static(self):
5375 """ NAT64 static translation test """
5376 self.tcp_port_in = 60303
5377 self.udp_port_in = 60304
5378 self.icmp_id_in = 60305
5379 self.tcp_port_out = 60303
5380 self.udp_port_out = 60304
5381 self.icmp_id_out = 60305
5383 ses_num_start = self.nat64_get_ses_num()
5385 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5387 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5388 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5390 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5395 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5400 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5407 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5408 self.pg0.add_stream(pkts)
5409 self.pg_enable_capture(self.pg_interfaces)
5411 capture = self.pg1.get_capture(len(pkts))
5412 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5413 dst_ip=self.pg1.remote_ip4, same_port=True)
5416 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5417 self.pg1.add_stream(pkts)
5418 self.pg_enable_capture(self.pg_interfaces)
5420 capture = self.pg0.get_capture(len(pkts))
5421 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5422 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5424 ses_num_end = self.nat64_get_ses_num()
5426 self.assertEqual(ses_num_end - ses_num_start, 3)
5428 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5429 def test_session_timeout(self):
5430 """ NAT64 session timeout """
5431 self.icmp_id_in = 1234
5432 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5434 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5435 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5436 self.vapi.nat64_set_timeouts(icmp=5)
5438 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5439 self.pg0.add_stream(pkts)
5440 self.pg_enable_capture(self.pg_interfaces)
5442 capture = self.pg1.get_capture(len(pkts))
5444 ses_num_before_timeout = self.nat64_get_ses_num()
5448 # ICMP session after timeout
5449 ses_num_after_timeout = self.nat64_get_ses_num()
5450 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5452 def test_icmp_error(self):
5453 """ NAT64 ICMP Error message translation """
5454 self.tcp_port_in = 6303
5455 self.udp_port_in = 6304
5456 self.icmp_id_in = 6305
5458 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5460 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5461 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5463 # send some packets to create sessions
5464 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5465 self.pg0.add_stream(pkts)
5466 self.pg_enable_capture(self.pg_interfaces)
5468 capture_ip4 = self.pg1.get_capture(len(pkts))
5469 self.verify_capture_out(capture_ip4,
5470 nat_ip=self.nat_addr,
5471 dst_ip=self.pg1.remote_ip4)
5473 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5474 self.pg1.add_stream(pkts)
5475 self.pg_enable_capture(self.pg_interfaces)
5477 capture_ip6 = self.pg0.get_capture(len(pkts))
5478 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5479 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5480 self.pg0.remote_ip6)
5483 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5484 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5485 ICMPv6DestUnreach(code=1) /
5486 packet[IPv6] for packet in capture_ip6]
5487 self.pg0.add_stream(pkts)
5488 self.pg_enable_capture(self.pg_interfaces)
5490 capture = self.pg1.get_capture(len(pkts))
5491 for packet in capture:
5493 self.assertEqual(packet[IP].src, self.nat_addr)
5494 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5495 self.assertEqual(packet[ICMP].type, 3)
5496 self.assertEqual(packet[ICMP].code, 13)
5497 inner = packet[IPerror]
5498 self.assertEqual(inner.src, self.pg1.remote_ip4)
5499 self.assertEqual(inner.dst, self.nat_addr)
5500 self.assert_packet_checksums_valid(packet)
5501 if inner.haslayer(TCPerror):
5502 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5503 elif inner.haslayer(UDPerror):
5504 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5506 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5508 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5512 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5513 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5514 ICMP(type=3, code=13) /
5515 packet[IP] for packet in capture_ip4]
5516 self.pg1.add_stream(pkts)
5517 self.pg_enable_capture(self.pg_interfaces)
5519 capture = self.pg0.get_capture(len(pkts))
5520 for packet in capture:
5522 self.assertEqual(packet[IPv6].src, ip.src)
5523 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5524 icmp = packet[ICMPv6DestUnreach]
5525 self.assertEqual(icmp.code, 1)
5526 inner = icmp[IPerror6]
5527 self.assertEqual(inner.src, self.pg0.remote_ip6)
5528 self.assertEqual(inner.dst, ip.src)
5529 self.assert_icmpv6_checksum_valid(packet)
5530 if inner.haslayer(TCPerror):
5531 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5532 elif inner.haslayer(UDPerror):
5533 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5535 self.assertEqual(inner[ICMPv6EchoRequest].id,
5538 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5541 def test_hairpinning(self):
5542 """ NAT64 hairpinning """
5544 client = self.pg0.remote_hosts[0]
5545 server = self.pg0.remote_hosts[1]
5546 server_tcp_in_port = 22
5547 server_tcp_out_port = 4022
5548 server_udp_in_port = 23
5549 server_udp_out_port = 4023
5550 client_tcp_in_port = 1234
5551 client_udp_in_port = 1235
5552 client_tcp_out_port = 0
5553 client_udp_out_port = 0
5554 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5555 nat_addr_ip6 = ip.src
5557 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5559 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5560 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5562 self.vapi.nat64_add_del_static_bib(server.ip6n,
5565 server_tcp_out_port,
5567 self.vapi.nat64_add_del_static_bib(server.ip6n,
5570 server_udp_out_port,
5575 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5576 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5577 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5579 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5580 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5581 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5583 self.pg0.add_stream(pkts)
5584 self.pg_enable_capture(self.pg_interfaces)
5586 capture = self.pg0.get_capture(len(pkts))
5587 for packet in capture:
5589 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5590 self.assertEqual(packet[IPv6].dst, server.ip6)
5591 self.assert_packet_checksums_valid(packet)
5592 if packet.haslayer(TCP):
5593 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5594 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5595 client_tcp_out_port = packet[TCP].sport
5597 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5598 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5599 client_udp_out_port = packet[UDP].sport
5601 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5606 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5607 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5608 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5610 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5611 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5612 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5614 self.pg0.add_stream(pkts)
5615 self.pg_enable_capture(self.pg_interfaces)
5617 capture = self.pg0.get_capture(len(pkts))
5618 for packet in capture:
5620 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5621 self.assertEqual(packet[IPv6].dst, client.ip6)
5622 self.assert_packet_checksums_valid(packet)
5623 if packet.haslayer(TCP):
5624 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5625 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5627 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5628 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5635 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5636 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5637 ICMPv6DestUnreach(code=1) /
5638 packet[IPv6] for packet in capture]
5639 self.pg0.add_stream(pkts)
5640 self.pg_enable_capture(self.pg_interfaces)
5642 capture = self.pg0.get_capture(len(pkts))
5643 for packet in capture:
5645 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5646 self.assertEqual(packet[IPv6].dst, server.ip6)
5647 icmp = packet[ICMPv6DestUnreach]
5648 self.assertEqual(icmp.code, 1)
5649 inner = icmp[IPerror6]
5650 self.assertEqual(inner.src, server.ip6)
5651 self.assertEqual(inner.dst, nat_addr_ip6)
5652 self.assert_packet_checksums_valid(packet)
5653 if inner.haslayer(TCPerror):
5654 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5655 self.assertEqual(inner[TCPerror].dport,
5656 client_tcp_out_port)
5658 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5659 self.assertEqual(inner[UDPerror].dport,
5660 client_udp_out_port)
5662 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5665 def test_prefix(self):
5666 """ NAT64 Network-Specific Prefix """
5668 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5670 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5671 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5672 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5673 self.vrf1_nat_addr_n,
5674 vrf_id=self.vrf1_id)
5675 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5678 global_pref64 = "2001:db8::"
5679 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5680 global_pref64_len = 32
5681 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5683 prefix = self.vapi.nat64_prefix_dump()
5684 self.assertEqual(len(prefix), 1)
5685 self.assertEqual(prefix[0].prefix, global_pref64_n)
5686 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5687 self.assertEqual(prefix[0].vrf_id, 0)
5689 # Add tenant specific prefix
5690 vrf1_pref64 = "2001:db8:122:300::"
5691 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5692 vrf1_pref64_len = 56
5693 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5695 vrf_id=self.vrf1_id)
5696 prefix = self.vapi.nat64_prefix_dump()
5697 self.assertEqual(len(prefix), 2)
5700 pkts = self.create_stream_in_ip6(self.pg0,
5703 plen=global_pref64_len)
5704 self.pg0.add_stream(pkts)
5705 self.pg_enable_capture(self.pg_interfaces)
5707 capture = self.pg1.get_capture(len(pkts))
5708 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5709 dst_ip=self.pg1.remote_ip4)
5711 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5712 self.pg1.add_stream(pkts)
5713 self.pg_enable_capture(self.pg_interfaces)
5715 capture = self.pg0.get_capture(len(pkts))
5716 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5719 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5721 # Tenant specific prefix
5722 pkts = self.create_stream_in_ip6(self.pg2,
5725 plen=vrf1_pref64_len)
5726 self.pg2.add_stream(pkts)
5727 self.pg_enable_capture(self.pg_interfaces)
5729 capture = self.pg1.get_capture(len(pkts))
5730 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5731 dst_ip=self.pg1.remote_ip4)
5733 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5734 self.pg1.add_stream(pkts)
5735 self.pg_enable_capture(self.pg_interfaces)
5737 capture = self.pg2.get_capture(len(pkts))
5738 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5741 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5743 def test_unknown_proto(self):
5744 """ NAT64 translate packet with unknown protocol """
5746 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5748 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5749 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5750 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5753 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5754 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5755 TCP(sport=self.tcp_port_in, dport=20))
5756 self.pg0.add_stream(p)
5757 self.pg_enable_capture(self.pg_interfaces)
5759 p = self.pg1.get_capture(1)
5761 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5762 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5764 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5765 TCP(sport=1234, dport=1234))
5766 self.pg0.add_stream(p)
5767 self.pg_enable_capture(self.pg_interfaces)
5769 p = self.pg1.get_capture(1)
5772 self.assertEqual(packet[IP].src, self.nat_addr)
5773 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5774 self.assertTrue(packet.haslayer(GRE))
5775 self.assert_packet_checksums_valid(packet)
5777 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5781 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5782 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5784 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5785 TCP(sport=1234, dport=1234))
5786 self.pg1.add_stream(p)
5787 self.pg_enable_capture(self.pg_interfaces)
5789 p = self.pg0.get_capture(1)
5792 self.assertEqual(packet[IPv6].src, remote_ip6)
5793 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5794 self.assertEqual(packet[IPv6].nh, 47)
5796 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5799 def test_hairpinning_unknown_proto(self):
5800 """ NAT64 translate packet with unknown protocol - hairpinning """
5802 client = self.pg0.remote_hosts[0]
5803 server = self.pg0.remote_hosts[1]
5804 server_tcp_in_port = 22
5805 server_tcp_out_port = 4022
5806 client_tcp_in_port = 1234
5807 client_tcp_out_port = 1235
5808 server_nat_ip = "10.0.0.100"
5809 client_nat_ip = "10.0.0.110"
5810 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5811 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5812 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5813 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5815 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5817 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5818 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5820 self.vapi.nat64_add_del_static_bib(server.ip6n,
5823 server_tcp_out_port,
5826 self.vapi.nat64_add_del_static_bib(server.ip6n,
5832 self.vapi.nat64_add_del_static_bib(client.ip6n,
5835 client_tcp_out_port,
5839 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5840 IPv6(src=client.ip6, dst=server_nat_ip6) /
5841 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5842 self.pg0.add_stream(p)
5843 self.pg_enable_capture(self.pg_interfaces)
5845 p = self.pg0.get_capture(1)
5847 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5848 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5850 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5851 TCP(sport=1234, dport=1234))
5852 self.pg0.add_stream(p)
5853 self.pg_enable_capture(self.pg_interfaces)
5855 p = self.pg0.get_capture(1)
5858 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5859 self.assertEqual(packet[IPv6].dst, server.ip6)
5860 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5862 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5866 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5867 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5869 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5870 TCP(sport=1234, dport=1234))
5871 self.pg0.add_stream(p)
5872 self.pg_enable_capture(self.pg_interfaces)
5874 p = self.pg0.get_capture(1)
5877 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5878 self.assertEqual(packet[IPv6].dst, client.ip6)
5879 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5881 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5884 def test_one_armed_nat64(self):
5885 """ One armed NAT64 """
5887 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5891 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5893 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5894 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5897 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5898 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5899 TCP(sport=12345, dport=80))
5900 self.pg3.add_stream(p)
5901 self.pg_enable_capture(self.pg_interfaces)
5903 capture = self.pg3.get_capture(1)
5908 self.assertEqual(ip.src, self.nat_addr)
5909 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5910 self.assertNotEqual(tcp.sport, 12345)
5911 external_port = tcp.sport
5912 self.assertEqual(tcp.dport, 80)
5913 self.assert_packet_checksums_valid(p)
5915 self.logger.error(ppp("Unexpected or invalid packet:", p))
5919 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5920 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5921 TCP(sport=80, dport=external_port))
5922 self.pg3.add_stream(p)
5923 self.pg_enable_capture(self.pg_interfaces)
5925 capture = self.pg3.get_capture(1)
5930 self.assertEqual(ip.src, remote_host_ip6)
5931 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5932 self.assertEqual(tcp.sport, 80)
5933 self.assertEqual(tcp.dport, 12345)
5934 self.assert_packet_checksums_valid(p)
5936 self.logger.error(ppp("Unexpected or invalid packet:", p))
5939 def test_frag_in_order(self):
5940 """ NAT64 translate fragments arriving in order """
5941 self.tcp_port_in = random.randint(1025, 65535)
5943 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5945 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5946 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5948 reass = self.vapi.nat_reass_dump()
5949 reass_n_start = len(reass)
5953 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5954 self.tcp_port_in, 20, data)
5955 self.pg0.add_stream(pkts)
5956 self.pg_enable_capture(self.pg_interfaces)
5958 frags = self.pg1.get_capture(len(pkts))
5959 p = self.reass_frags_and_verify(frags,
5961 self.pg1.remote_ip4)
5962 self.assertEqual(p[TCP].dport, 20)
5963 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5964 self.tcp_port_out = p[TCP].sport
5965 self.assertEqual(data, p[Raw].load)
5968 data = "A" * 4 + "b" * 16 + "C" * 3
5969 pkts = self.create_stream_frag(self.pg1,
5974 self.pg1.add_stream(pkts)
5975 self.pg_enable_capture(self.pg_interfaces)
5977 frags = self.pg0.get_capture(len(pkts))
5978 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5979 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5980 self.assertEqual(p[TCP].sport, 20)
5981 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5982 self.assertEqual(data, p[Raw].load)
5984 reass = self.vapi.nat_reass_dump()
5985 reass_n_end = len(reass)
5987 self.assertEqual(reass_n_end - reass_n_start, 2)
5989 def test_reass_hairpinning(self):
5990 """ NAT64 fragments hairpinning """
5992 server = self.pg0.remote_hosts[1]
5993 server_in_port = random.randint(1025, 65535)
5994 server_out_port = random.randint(1025, 65535)
5995 client_in_port = random.randint(1025, 65535)
5996 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5997 nat_addr_ip6 = ip.src
5999 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6001 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6002 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6004 # add static BIB entry for server
6005 self.vapi.nat64_add_del_static_bib(server.ip6n,
6011 # send packet from host to server
6012 pkts = self.create_stream_frag_ip6(self.pg0,
6017 self.pg0.add_stream(pkts)
6018 self.pg_enable_capture(self.pg_interfaces)
6020 frags = self.pg0.get_capture(len(pkts))
6021 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6022 self.assertNotEqual(p[TCP].sport, client_in_port)
6023 self.assertEqual(p[TCP].dport, server_in_port)
6024 self.assertEqual(data, p[Raw].load)
6026 def test_frag_out_of_order(self):
6027 """ NAT64 translate fragments arriving out of order """
6028 self.tcp_port_in = random.randint(1025, 65535)
6030 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6032 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6033 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6037 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6038 self.tcp_port_in, 20, data)
6040 self.pg0.add_stream(pkts)
6041 self.pg_enable_capture(self.pg_interfaces)
6043 frags = self.pg1.get_capture(len(pkts))
6044 p = self.reass_frags_and_verify(frags,
6046 self.pg1.remote_ip4)
6047 self.assertEqual(p[TCP].dport, 20)
6048 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6049 self.tcp_port_out = p[TCP].sport
6050 self.assertEqual(data, p[Raw].load)
6053 data = "A" * 4 + "B" * 16 + "C" * 3
6054 pkts = self.create_stream_frag(self.pg1,
6060 self.pg1.add_stream(pkts)
6061 self.pg_enable_capture(self.pg_interfaces)
6063 frags = self.pg0.get_capture(len(pkts))
6064 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6065 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6066 self.assertEqual(p[TCP].sport, 20)
6067 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6068 self.assertEqual(data, p[Raw].load)
6070 def test_interface_addr(self):
6071 """ Acquire NAT64 pool addresses from interface """
6072 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6074 # no address in NAT64 pool
6075 adresses = self.vapi.nat44_address_dump()
6076 self.assertEqual(0, len(adresses))
6078 # configure interface address and check NAT64 address pool
6079 self.pg4.config_ip4()
6080 addresses = self.vapi.nat64_pool_addr_dump()
6081 self.assertEqual(len(addresses), 1)
6082 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6084 # remove interface address and check NAT64 address pool
6085 self.pg4.unconfig_ip4()
6086 addresses = self.vapi.nat64_pool_addr_dump()
6087 self.assertEqual(0, len(adresses))
6089 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6090 def test_ipfix_max_bibs_sessions(self):
6091 """ IPFIX logging maximum session and BIB entries exceeded """
6094 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6098 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6100 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6101 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6105 for i in range(0, max_bibs):
6106 src = "fd01:aa::%x" % (i)
6107 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6108 IPv6(src=src, dst=remote_host_ip6) /
6109 TCP(sport=12345, dport=80))
6111 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6112 IPv6(src=src, dst=remote_host_ip6) /
6113 TCP(sport=12345, dport=22))
6115 self.pg0.add_stream(pkts)
6116 self.pg_enable_capture(self.pg_interfaces)
6118 self.pg1.get_capture(max_sessions)
6120 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6121 src_address=self.pg3.local_ip4n,
6123 template_interval=10)
6124 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6125 src_port=self.ipfix_src_port)
6127 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6128 IPv6(src=src, dst=remote_host_ip6) /
6129 TCP(sport=12345, dport=25))
6130 self.pg0.add_stream(p)
6131 self.pg_enable_capture(self.pg_interfaces)
6133 self.pg1.assert_nothing_captured()
6135 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6136 capture = self.pg3.get_capture(9)
6137 ipfix = IPFIXDecoder()
6138 # first load template
6140 self.assertTrue(p.haslayer(IPFIX))
6141 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6142 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6143 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6144 self.assertEqual(p[UDP].dport, 4739)
6145 self.assertEqual(p[IPFIX].observationDomainID,
6146 self.ipfix_domain_id)
6147 if p.haslayer(Template):
6148 ipfix.add_template(p.getlayer(Template))
6149 # verify events in data set
6151 if p.haslayer(Data):
6152 data = ipfix.decode_data_set(p.getlayer(Set))
6153 self.verify_ipfix_max_sessions(data, max_sessions)
6155 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6156 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6157 TCP(sport=12345, dport=80))
6158 self.pg0.add_stream(p)
6159 self.pg_enable_capture(self.pg_interfaces)
6161 self.pg1.assert_nothing_captured()
6163 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6164 capture = self.pg3.get_capture(1)
6165 # verify events in data set
6167 self.assertTrue(p.haslayer(IPFIX))
6168 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6169 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6170 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6171 self.assertEqual(p[UDP].dport, 4739)
6172 self.assertEqual(p[IPFIX].observationDomainID,
6173 self.ipfix_domain_id)
6174 if p.haslayer(Data):
6175 data = ipfix.decode_data_set(p.getlayer(Set))
6176 self.verify_ipfix_max_bibs(data, max_bibs)
6178 def test_ipfix_max_frags(self):
6179 """ IPFIX logging maximum fragments pending reassembly exceeded """
6180 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6182 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6183 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6184 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6185 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6186 src_address=self.pg3.local_ip4n,
6188 template_interval=10)
6189 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6190 src_port=self.ipfix_src_port)
6193 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6194 self.tcp_port_in, 20, data)
6195 self.pg0.add_stream(pkts[-1])
6196 self.pg_enable_capture(self.pg_interfaces)
6198 self.pg1.assert_nothing_captured()
6200 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6201 capture = self.pg3.get_capture(9)
6202 ipfix = IPFIXDecoder()
6203 # first load template
6205 self.assertTrue(p.haslayer(IPFIX))
6206 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6207 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6208 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6209 self.assertEqual(p[UDP].dport, 4739)
6210 self.assertEqual(p[IPFIX].observationDomainID,
6211 self.ipfix_domain_id)
6212 if p.haslayer(Template):
6213 ipfix.add_template(p.getlayer(Template))
6214 # verify events in data set
6216 if p.haslayer(Data):
6217 data = ipfix.decode_data_set(p.getlayer(Set))
6218 self.verify_ipfix_max_fragments_ip6(data, 0,
6219 self.pg0.remote_ip6n)
6221 def test_ipfix_bib_ses(self):
6222 """ IPFIX logging NAT64 BIB/session create and delete events """
6223 self.tcp_port_in = random.randint(1025, 65535)
6224 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6228 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6230 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6231 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6232 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6233 src_address=self.pg3.local_ip4n,
6235 template_interval=10)
6236 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6237 src_port=self.ipfix_src_port)
6240 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6241 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6242 TCP(sport=self.tcp_port_in, dport=25))
6243 self.pg0.add_stream(p)
6244 self.pg_enable_capture(self.pg_interfaces)
6246 p = self.pg1.get_capture(1)
6247 self.tcp_port_out = p[0][TCP].sport
6248 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6249 capture = self.pg3.get_capture(10)
6250 ipfix = IPFIXDecoder()
6251 # first load template
6253 self.assertTrue(p.haslayer(IPFIX))
6254 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6255 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6256 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6257 self.assertEqual(p[UDP].dport, 4739)
6258 self.assertEqual(p[IPFIX].observationDomainID,
6259 self.ipfix_domain_id)
6260 if p.haslayer(Template):
6261 ipfix.add_template(p.getlayer(Template))
6262 # verify events in data set
6264 if p.haslayer(Data):
6265 data = ipfix.decode_data_set(p.getlayer(Set))
6266 if ord(data[0][230]) == 10:
6267 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6268 elif ord(data[0][230]) == 6:
6269 self.verify_ipfix_nat64_ses(data,
6271 self.pg0.remote_ip6n,
6272 self.pg1.remote_ip4,
6275 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6278 self.pg_enable_capture(self.pg_interfaces)
6279 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6282 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6283 capture = self.pg3.get_capture(2)
6284 # verify events in data set
6286 self.assertTrue(p.haslayer(IPFIX))
6287 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6288 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6289 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6290 self.assertEqual(p[UDP].dport, 4739)
6291 self.assertEqual(p[IPFIX].observationDomainID,
6292 self.ipfix_domain_id)
6293 if p.haslayer(Data):
6294 data = ipfix.decode_data_set(p.getlayer(Set))
6295 if ord(data[0][230]) == 11:
6296 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6297 elif ord(data[0][230]) == 7:
6298 self.verify_ipfix_nat64_ses(data,
6300 self.pg0.remote_ip6n,
6301 self.pg1.remote_ip4,
6304 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6306 def nat64_get_ses_num(self):
6308 Return number of active NAT64 sessions.
6310 st = self.vapi.nat64_st_dump()
6313 def clear_nat64(self):
6315 Clear NAT64 configuration.
6317 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6318 domain_id=self.ipfix_domain_id)
6319 self.ipfix_src_port = 4739
6320 self.ipfix_domain_id = 1
6322 self.vapi.nat64_set_timeouts()
6324 interfaces = self.vapi.nat64_interface_dump()
6325 for intf in interfaces:
6326 if intf.is_inside > 1:
6327 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6330 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6334 bib = self.vapi.nat64_bib_dump(255)
6337 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6345 adresses = self.vapi.nat64_pool_addr_dump()
6346 for addr in adresses:
6347 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6352 prefixes = self.vapi.nat64_prefix_dump()
6353 for prefix in prefixes:
6354 self.vapi.nat64_add_del_prefix(prefix.prefix,
6356 vrf_id=prefix.vrf_id,
6360 super(TestNAT64, self).tearDown()
6361 if not self.vpp_dead:
6362 self.logger.info(self.vapi.cli("show nat64 pool"))
6363 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6364 self.logger.info(self.vapi.cli("show nat64 prefix"))
6365 self.logger.info(self.vapi.cli("show nat64 bib all"))
6366 self.logger.info(self.vapi.cli("show nat64 session table all"))
6367 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6371 class TestDSlite(MethodHolder):
6372 """ DS-Lite Test Cases """
6375 def setUpClass(cls):
6376 super(TestDSlite, cls).setUpClass()
6379 cls.nat_addr = '10.0.0.3'
6380 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6382 cls.create_pg_interfaces(range(2))
6384 cls.pg0.config_ip4()
6385 cls.pg0.resolve_arp()
6387 cls.pg1.config_ip6()
6388 cls.pg1.generate_remote_hosts(2)
6389 cls.pg1.configure_ipv6_neighbors()
6392 super(TestDSlite, cls).tearDownClass()
6395 def test_dslite(self):
6396 """ Test DS-Lite """
6397 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6399 aftr_ip4 = '192.0.0.1'
6400 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6401 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6402 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6403 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6406 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6407 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6408 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6409 UDP(sport=20000, dport=10000))
6410 self.pg1.add_stream(p)
6411 self.pg_enable_capture(self.pg_interfaces)
6413 capture = self.pg0.get_capture(1)
6414 capture = capture[0]
6415 self.assertFalse(capture.haslayer(IPv6))
6416 self.assertEqual(capture[IP].src, self.nat_addr)
6417 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6418 self.assertNotEqual(capture[UDP].sport, 20000)
6419 self.assertEqual(capture[UDP].dport, 10000)
6420 self.assert_packet_checksums_valid(capture)
6421 out_port = capture[UDP].sport
6423 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6424 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6425 UDP(sport=10000, dport=out_port))
6426 self.pg0.add_stream(p)
6427 self.pg_enable_capture(self.pg_interfaces)
6429 capture = self.pg1.get_capture(1)
6430 capture = capture[0]
6431 self.assertEqual(capture[IPv6].src, aftr_ip6)
6432 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6433 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6434 self.assertEqual(capture[IP].dst, '192.168.1.1')
6435 self.assertEqual(capture[UDP].sport, 10000)
6436 self.assertEqual(capture[UDP].dport, 20000)
6437 self.assert_packet_checksums_valid(capture)
6440 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6441 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6442 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6443 TCP(sport=20001, dport=10001))
6444 self.pg1.add_stream(p)
6445 self.pg_enable_capture(self.pg_interfaces)
6447 capture = self.pg0.get_capture(1)
6448 capture = capture[0]
6449 self.assertFalse(capture.haslayer(IPv6))
6450 self.assertEqual(capture[IP].src, self.nat_addr)
6451 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6452 self.assertNotEqual(capture[TCP].sport, 20001)
6453 self.assertEqual(capture[TCP].dport, 10001)
6454 self.assert_packet_checksums_valid(capture)
6455 out_port = capture[TCP].sport
6457 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6458 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6459 TCP(sport=10001, dport=out_port))
6460 self.pg0.add_stream(p)
6461 self.pg_enable_capture(self.pg_interfaces)
6463 capture = self.pg1.get_capture(1)
6464 capture = capture[0]
6465 self.assertEqual(capture[IPv6].src, aftr_ip6)
6466 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6467 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6468 self.assertEqual(capture[IP].dst, '192.168.1.1')
6469 self.assertEqual(capture[TCP].sport, 10001)
6470 self.assertEqual(capture[TCP].dport, 20001)
6471 self.assert_packet_checksums_valid(capture)
6474 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6475 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6476 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6477 ICMP(id=4000, type='echo-request'))
6478 self.pg1.add_stream(p)
6479 self.pg_enable_capture(self.pg_interfaces)
6481 capture = self.pg0.get_capture(1)
6482 capture = capture[0]
6483 self.assertFalse(capture.haslayer(IPv6))
6484 self.assertEqual(capture[IP].src, self.nat_addr)
6485 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6486 self.assertNotEqual(capture[ICMP].id, 4000)
6487 self.assert_packet_checksums_valid(capture)
6488 out_id = capture[ICMP].id
6490 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6491 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6492 ICMP(id=out_id, type='echo-reply'))
6493 self.pg0.add_stream(p)
6494 self.pg_enable_capture(self.pg_interfaces)
6496 capture = self.pg1.get_capture(1)
6497 capture = capture[0]
6498 self.assertEqual(capture[IPv6].src, aftr_ip6)
6499 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6500 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6501 self.assertEqual(capture[IP].dst, '192.168.1.1')
6502 self.assertEqual(capture[ICMP].id, 4000)
6503 self.assert_packet_checksums_valid(capture)
6505 # ping DS-Lite AFTR tunnel endpoint address
6506 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6507 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6508 ICMPv6EchoRequest())
6509 self.pg1.add_stream(p)
6510 self.pg_enable_capture(self.pg_interfaces)
6512 capture = self.pg1.get_capture(1)
6513 self.assertEqual(1, len(capture))
6514 capture = capture[0]
6515 self.assertEqual(capture[IPv6].src, aftr_ip6)
6516 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6517 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6520 super(TestDSlite, self).tearDown()
6521 if not self.vpp_dead:
6522 self.logger.info(self.vapi.cli("show dslite pool"))
6524 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6525 self.logger.info(self.vapi.cli("show dslite sessions"))
6528 class TestDSliteCE(MethodHolder):
6529 """ DS-Lite CE Test Cases """
6532 def setUpConstants(cls):
6533 super(TestDSliteCE, cls).setUpConstants()
6534 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6537 def setUpClass(cls):
6538 super(TestDSliteCE, cls).setUpClass()
6541 cls.create_pg_interfaces(range(2))
6543 cls.pg0.config_ip4()
6544 cls.pg0.resolve_arp()
6546 cls.pg1.config_ip6()
6547 cls.pg1.generate_remote_hosts(1)
6548 cls.pg1.configure_ipv6_neighbors()
6551 super(TestDSliteCE, cls).tearDownClass()
6554 def test_dslite_ce(self):
6555 """ Test DS-Lite CE """
6557 b4_ip4 = '192.0.0.2'
6558 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6559 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6560 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6561 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6563 aftr_ip4 = '192.0.0.1'
6564 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6565 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6566 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6567 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6569 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6570 dst_address_length=128,
6571 next_hop_address=self.pg1.remote_ip6n,
6572 next_hop_sw_if_index=self.pg1.sw_if_index,
6576 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6577 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6578 UDP(sport=10000, dport=20000))
6579 self.pg0.add_stream(p)
6580 self.pg_enable_capture(self.pg_interfaces)
6582 capture = self.pg1.get_capture(1)
6583 capture = capture[0]
6584 self.assertEqual(capture[IPv6].src, b4_ip6)
6585 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6586 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6587 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6588 self.assertEqual(capture[UDP].sport, 10000)
6589 self.assertEqual(capture[UDP].dport, 20000)
6590 self.assert_packet_checksums_valid(capture)
6593 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6594 IPv6(dst=b4_ip6, src=aftr_ip6) /
6595 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6596 UDP(sport=20000, dport=10000))
6597 self.pg1.add_stream(p)
6598 self.pg_enable_capture(self.pg_interfaces)
6600 capture = self.pg0.get_capture(1)
6601 capture = capture[0]
6602 self.assertFalse(capture.haslayer(IPv6))
6603 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6604 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6605 self.assertEqual(capture[UDP].sport, 20000)
6606 self.assertEqual(capture[UDP].dport, 10000)
6607 self.assert_packet_checksums_valid(capture)
6609 # ping DS-Lite B4 tunnel endpoint address
6610 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6611 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6612 ICMPv6EchoRequest())
6613 self.pg1.add_stream(p)
6614 self.pg_enable_capture(self.pg_interfaces)
6616 capture = self.pg1.get_capture(1)
6617 self.assertEqual(1, len(capture))
6618 capture = capture[0]
6619 self.assertEqual(capture[IPv6].src, b4_ip6)
6620 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6621 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6624 super(TestDSliteCE, self).tearDown()
6625 if not self.vpp_dead:
6627 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6629 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6632 class TestNAT66(MethodHolder):
6633 """ NAT66 Test Cases """
6636 def setUpClass(cls):
6637 super(TestNAT66, cls).setUpClass()
6640 cls.nat_addr = 'fd01:ff::2'
6641 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6643 cls.create_pg_interfaces(range(2))
6644 cls.interfaces = list(cls.pg_interfaces)
6646 for i in cls.interfaces:
6649 i.configure_ipv6_neighbors()
6652 super(TestNAT66, cls).tearDownClass()
6655 def test_static(self):
6656 """ 1:1 NAT66 test """
6657 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6658 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6659 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6664 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6665 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6668 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6669 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6672 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6673 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6674 ICMPv6EchoRequest())
6676 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6677 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6678 GRE() / IP() / TCP())
6680 self.pg0.add_stream(pkts)
6681 self.pg_enable_capture(self.pg_interfaces)
6683 capture = self.pg1.get_capture(len(pkts))
6684 for packet in capture:
6686 self.assertEqual(packet[IPv6].src, self.nat_addr)
6687 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6688 self.assert_packet_checksums_valid(packet)
6690 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6695 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6696 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6699 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6700 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6703 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6704 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6707 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6708 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6709 GRE() / IP() / TCP())
6711 self.pg1.add_stream(pkts)
6712 self.pg_enable_capture(self.pg_interfaces)
6714 capture = self.pg0.get_capture(len(pkts))
6715 for packet in capture:
6717 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6718 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6719 self.assert_packet_checksums_valid(packet)
6721 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6724 sm = self.vapi.nat66_static_mapping_dump()
6725 self.assertEqual(len(sm), 1)
6726 self.assertEqual(sm[0].total_pkts, 8)
6728 def test_check_no_translate(self):
6729 """ NAT66 translate only when egress interface is outside interface """
6730 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6731 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6732 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6736 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6737 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6739 self.pg0.add_stream([p])
6740 self.pg_enable_capture(self.pg_interfaces)
6742 capture = self.pg1.get_capture(1)
6745 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6746 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6748 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6751 def clear_nat66(self):
6753 Clear NAT66 configuration.
6755 interfaces = self.vapi.nat66_interface_dump()
6756 for intf in interfaces:
6757 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6761 static_mappings = self.vapi.nat66_static_mapping_dump()
6762 for sm in static_mappings:
6763 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6764 sm.external_ip_address,
6769 super(TestNAT66, self).tearDown()
6770 if not self.vpp_dead:
6771 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6772 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6776 if __name__ == '__main__':
6777 unittest.main(testRunner=VppTestRunner)