9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
142 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
143 local_port=0, external_port=0, vrf_id=0,
144 is_add=1, external_sw_if_index=0xFFFFFFFF,
145 proto=0, twice_nat=0, self_twice_nat=0,
146 out2in_only=0, tag=""):
148 Add/delete NAT44 static mapping
150 :param local_ip: Local IP address
151 :param external_ip: External IP address
152 :param local_port: Local port number (Optional)
153 :param external_port: External port number (Optional)
154 :param vrf_id: VRF ID (Default 0)
155 :param is_add: 1 if add, 0 if delete (Default add)
156 :param external_sw_if_index: External interface instead of IP address
157 :param proto: IP protocol (Mandatory if port specified)
158 :param twice_nat: 1 if translate external host address and port
159 :param self_twice_nat: 1 if translate external host address and port
160 whenever external host address equals
161 local address of internal host
162 :param out2in_only: if 1 rule is matching only out2in direction
163 :param tag: Opaque string tag
166 if local_port and external_port:
168 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
169 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
170 self.vapi.nat44_add_del_static_mapping(
173 external_sw_if_index,
185 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
187 Add/delete NAT44 address
189 :param ip: IP address
190 :param is_add: 1 if add, 0 if delete (Default add)
191 :param twice_nat: twice NAT address for extenal hosts
193 nat_addr = socket.inet_pton(socket.AF_INET, ip)
194 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
198 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
200 Create packet stream for inside network
202 :param in_if: Inside interface
203 :param out_if: Outside interface
204 :param dst_ip: Destination address
205 :param ttl: TTL of generated packets
208 dst_ip = out_if.remote_ip4
212 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
213 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
214 TCP(sport=self.tcp_port_in, dport=20))
218 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
219 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
220 UDP(sport=self.udp_port_in, dport=20))
224 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
225 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
226 ICMP(id=self.icmp_id_in, type='echo-request'))
231 def compose_ip6(self, ip4, pref, plen):
233 Compose IPv4-embedded IPv6 addresses
235 :param ip4: IPv4 address
236 :param pref: IPv6 prefix
237 :param plen: IPv6 prefix length
238 :returns: IPv4-embedded IPv6 addresses
240 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
241 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
256 pref_n[10] = ip4_n[3]
260 pref_n[10] = ip4_n[2]
261 pref_n[11] = ip4_n[3]
264 pref_n[10] = ip4_n[1]
265 pref_n[11] = ip4_n[2]
266 pref_n[12] = ip4_n[3]
268 pref_n[12] = ip4_n[0]
269 pref_n[13] = ip4_n[1]
270 pref_n[14] = ip4_n[2]
271 pref_n[15] = ip4_n[3]
272 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
274 def extract_ip4(self, ip6, plen):
276 Extract IPv4 address embedded in IPv6 addresses
278 :param ip6: IPv6 address
279 :param plen: IPv6 prefix length
280 :returns: extracted IPv4 address
282 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
314 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
316 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
318 Create IPv6 packet stream for inside network
320 :param in_if: Inside interface
321 :param out_if: Outside interface
322 :param ttl: Hop Limit of generated packets
323 :param pref: NAT64 prefix
324 :param plen: NAT64 prefix length
328 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
330 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
333 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
334 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
335 TCP(sport=self.tcp_port_in, dport=20))
339 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
340 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
341 UDP(sport=self.udp_port_in, dport=20))
345 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
346 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
347 ICMPv6EchoRequest(id=self.icmp_id_in))
352 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
353 use_inside_ports=False):
355 Create packet stream for outside network
357 :param out_if: Outside interface
358 :param dst_ip: Destination IP address (Default use global NAT address)
359 :param ttl: TTL of generated packets
360 :param use_inside_ports: Use inside NAT ports as destination ports
361 instead of outside ports
364 dst_ip = self.nat_addr
365 if not use_inside_ports:
366 tcp_port = self.tcp_port_out
367 udp_port = self.udp_port_out
368 icmp_id = self.icmp_id_out
370 tcp_port = self.tcp_port_in
371 udp_port = self.udp_port_in
372 icmp_id = self.icmp_id_in
375 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
376 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
377 TCP(dport=tcp_port, sport=20))
381 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
382 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
383 UDP(dport=udp_port, sport=20))
387 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
388 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
389 ICMP(id=icmp_id, type='echo-reply'))
394 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
396 Create packet stream for outside network
398 :param out_if: Outside interface
399 :param dst_ip: Destination IP address (Default use global NAT address)
400 :param hl: HL of generated packets
404 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
405 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
406 TCP(dport=self.tcp_port_out, sport=20))
410 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
411 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
412 UDP(dport=self.udp_port_out, sport=20))
416 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
417 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
418 ICMPv6EchoReply(id=self.icmp_id_out))
423 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
424 packet_num=3, dst_ip=None, is_ip6=False):
426 Verify captured packets on outside network
428 :param capture: Captured packets
429 :param nat_ip: Translated IP address (Default use global NAT address)
430 :param same_port: Sorce port number is not translated (Default False)
431 :param packet_num: Expected number of packets (Default 3)
432 :param dst_ip: Destination IP address (Default do not verify)
433 :param is_ip6: If L3 protocol is IPv6 (Default False)
437 ICMP46 = ICMPv6EchoRequest
442 nat_ip = self.nat_addr
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
447 self.assert_packet_checksums_valid(packet)
448 self.assertEqual(packet[IP46].src, nat_ip)
449 if dst_ip is not None:
450 self.assertEqual(packet[IP46].dst, dst_ip)
451 if packet.haslayer(TCP):
453 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
456 packet[TCP].sport, self.tcp_port_in)
457 self.tcp_port_out = packet[TCP].sport
458 self.assert_packet_checksums_valid(packet)
459 elif packet.haslayer(UDP):
461 self.assertEqual(packet[UDP].sport, self.udp_port_in)
464 packet[UDP].sport, self.udp_port_in)
465 self.udp_port_out = packet[UDP].sport
468 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
471 self.icmp_id_out = packet[ICMP46].id
472 self.assert_packet_checksums_valid(packet)
474 self.logger.error(ppp("Unexpected or invalid packet "
475 "(outside network):", packet))
478 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
479 packet_num=3, dst_ip=None):
481 Verify captured packets on outside network
483 :param capture: Captured packets
484 :param nat_ip: Translated IP address
485 :param same_port: Sorce port number is not translated (Default False)
486 :param packet_num: Expected number of packets (Default 3)
487 :param dst_ip: Destination IP address (Default do not verify)
489 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
492 def verify_capture_in(self, capture, in_if, packet_num=3):
494 Verify captured packets on inside network
496 :param capture: Captured packets
497 :param in_if: Inside interface
498 :param packet_num: Expected number of packets (Default 3)
500 self.assertEqual(packet_num, len(capture))
501 for packet in capture:
503 self.assert_packet_checksums_valid(packet)
504 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
505 if packet.haslayer(TCP):
506 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
507 elif packet.haslayer(UDP):
508 self.assertEqual(packet[UDP].dport, self.udp_port_in)
510 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
512 self.logger.error(ppp("Unexpected or invalid packet "
513 "(inside network):", packet))
516 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
518 Verify captured IPv6 packets on inside network
520 :param capture: Captured packets
521 :param src_ip: Source IP
522 :param dst_ip: Destination IP address
523 :param packet_num: Expected number of packets (Default 3)
525 self.assertEqual(packet_num, len(capture))
526 for packet in capture:
528 self.assertEqual(packet[IPv6].src, src_ip)
529 self.assertEqual(packet[IPv6].dst, dst_ip)
530 self.assert_packet_checksums_valid(packet)
531 if packet.haslayer(TCP):
532 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
533 elif packet.haslayer(UDP):
534 self.assertEqual(packet[UDP].dport, self.udp_port_in)
536 self.assertEqual(packet[ICMPv6EchoReply].id,
539 self.logger.error(ppp("Unexpected or invalid packet "
540 "(inside network):", packet))
543 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
545 Verify captured packet that don't have to be translated
547 :param capture: Captured packets
548 :param ingress_if: Ingress interface
549 :param egress_if: Egress interface
551 for packet in capture:
553 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
554 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
555 if packet.haslayer(TCP):
556 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
557 elif packet.haslayer(UDP):
558 self.assertEqual(packet[UDP].sport, self.udp_port_in)
560 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
562 self.logger.error(ppp("Unexpected or invalid packet "
563 "(inside network):", packet))
566 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
567 packet_num=3, icmp_type=11):
569 Verify captured packets with ICMP errors on outside network
571 :param capture: Captured packets
572 :param src_ip: Translated IP address or IP address of VPP
573 (Default use global NAT address)
574 :param packet_num: Expected number of packets (Default 3)
575 :param icmp_type: Type of error ICMP packet
576 we are expecting (Default 11)
579 src_ip = self.nat_addr
580 self.assertEqual(packet_num, len(capture))
581 for packet in capture:
583 self.assertEqual(packet[IP].src, src_ip)
584 self.assertTrue(packet.haslayer(ICMP))
586 self.assertEqual(icmp.type, icmp_type)
587 self.assertTrue(icmp.haslayer(IPerror))
588 inner_ip = icmp[IPerror]
589 if inner_ip.haslayer(TCPerror):
590 self.assertEqual(inner_ip[TCPerror].dport,
592 elif inner_ip.haslayer(UDPerror):
593 self.assertEqual(inner_ip[UDPerror].dport,
596 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
598 self.logger.error(ppp("Unexpected or invalid packet "
599 "(outside network):", packet))
602 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
605 Verify captured packets with ICMP errors on inside network
607 :param capture: Captured packets
608 :param in_if: Inside interface
609 :param packet_num: Expected number of packets (Default 3)
610 :param icmp_type: Type of error ICMP packet
611 we are expecting (Default 11)
613 self.assertEqual(packet_num, len(capture))
614 for packet in capture:
616 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
617 self.assertTrue(packet.haslayer(ICMP))
619 self.assertEqual(icmp.type, icmp_type)
620 self.assertTrue(icmp.haslayer(IPerror))
621 inner_ip = icmp[IPerror]
622 if inner_ip.haslayer(TCPerror):
623 self.assertEqual(inner_ip[TCPerror].sport,
625 elif inner_ip.haslayer(UDPerror):
626 self.assertEqual(inner_ip[UDPerror].sport,
629 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
631 self.logger.error(ppp("Unexpected or invalid packet "
632 "(inside network):", packet))
635 def create_stream_frag(self, src_if, dst, sport, dport, data):
637 Create fragmented packet stream
639 :param src_if: Source interface
640 :param dst: Destination IPv4 address
641 :param sport: Source TCP port
642 :param dport: Destination TCP port
643 :param data: Payload data
646 id = random.randint(0, 65535)
647 p = (IP(src=src_if.remote_ip4, dst=dst) /
648 TCP(sport=sport, dport=dport) /
650 p = p.__class__(str(p))
651 chksum = p['TCP'].chksum
653 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
654 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
655 TCP(sport=sport, dport=dport, chksum=chksum) /
658 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
659 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
660 proto=IP_PROTOS.tcp) /
663 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
664 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
670 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
671 pref=None, plen=0, frag_size=128):
673 Create fragmented packet stream
675 :param src_if: Source interface
676 :param dst: Destination IPv4 address
677 :param sport: Source TCP port
678 :param dport: Destination TCP port
679 :param data: Payload data
680 :param pref: NAT64 prefix
681 :param plen: NAT64 prefix length
682 :param fragsize: size of fragments
686 dst_ip6 = ''.join(['64:ff9b::', dst])
688 dst_ip6 = self.compose_ip6(dst, pref, plen)
690 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
691 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
692 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
693 TCP(sport=sport, dport=dport) /
696 return fragment6(p, frag_size)
698 def reass_frags_and_verify(self, frags, src, dst):
700 Reassemble and verify fragmented packet
702 :param frags: Captured fragments
703 :param src: Source IPv4 address to verify
704 :param dst: Destination IPv4 address to verify
706 :returns: Reassembled IPv4 packet
708 buffer = StringIO.StringIO()
710 self.assertEqual(p[IP].src, src)
711 self.assertEqual(p[IP].dst, dst)
712 self.assert_ip_checksum_valid(p)
713 buffer.seek(p[IP].frag * 8)
714 buffer.write(p[IP].payload)
715 ip = frags[0].getlayer(IP)
716 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
717 proto=frags[0][IP].proto)
718 if ip.proto == IP_PROTOS.tcp:
719 p = (ip / TCP(buffer.getvalue()))
720 self.assert_tcp_checksum_valid(p)
721 elif ip.proto == IP_PROTOS.udp:
722 p = (ip / UDP(buffer.getvalue()))
725 def reass_frags_and_verify_ip6(self, frags, src, dst):
727 Reassemble and verify fragmented packet
729 :param frags: Captured fragments
730 :param src: Source IPv6 address to verify
731 :param dst: Destination IPv6 address to verify
733 :returns: Reassembled IPv6 packet
735 buffer = StringIO.StringIO()
737 self.assertEqual(p[IPv6].src, src)
738 self.assertEqual(p[IPv6].dst, dst)
739 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
740 buffer.write(p[IPv6ExtHdrFragment].payload)
741 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
742 nh=frags[0][IPv6ExtHdrFragment].nh)
743 if ip.nh == IP_PROTOS.tcp:
744 p = (ip / TCP(buffer.getvalue()))
745 elif ip.nh == IP_PROTOS.udp:
746 p = (ip / UDP(buffer.getvalue()))
747 self.assert_packet_checksums_valid(p)
750 def initiate_tcp_session(self, in_if, out_if):
752 Initiates TCP session
754 :param in_if: Inside interface
755 :param out_if: Outside interface
759 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
760 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
761 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
764 self.pg_enable_capture(self.pg_interfaces)
766 capture = out_if.get_capture(1)
768 self.tcp_port_out = p[TCP].sport
770 # SYN + ACK packet out->in
771 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
772 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
773 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
776 self.pg_enable_capture(self.pg_interfaces)
781 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
782 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
783 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
786 self.pg_enable_capture(self.pg_interfaces)
788 out_if.get_capture(1)
791 self.logger.error("TCP 3 way handshake failed")
794 def verify_ipfix_nat44_ses(self, data):
796 Verify IPFIX NAT44 session create/delete event
798 :param data: Decoded IPFIX data records
800 nat44_ses_create_num = 0
801 nat44_ses_delete_num = 0
802 self.assertEqual(6, len(data))
805 self.assertIn(ord(record[230]), [4, 5])
806 if ord(record[230]) == 4:
807 nat44_ses_create_num += 1
809 nat44_ses_delete_num += 1
811 self.assertEqual(self.pg0.remote_ip4n, record[8])
812 # postNATSourceIPv4Address
813 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
816 self.assertEqual(struct.pack("!I", 0), record[234])
817 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
818 if IP_PROTOS.icmp == ord(record[4]):
819 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
820 self.assertEqual(struct.pack("!H", self.icmp_id_out),
822 elif IP_PROTOS.tcp == ord(record[4]):
823 self.assertEqual(struct.pack("!H", self.tcp_port_in),
825 self.assertEqual(struct.pack("!H", self.tcp_port_out),
827 elif IP_PROTOS.udp == ord(record[4]):
828 self.assertEqual(struct.pack("!H", self.udp_port_in),
830 self.assertEqual(struct.pack("!H", self.udp_port_out),
833 self.fail("Invalid protocol")
834 self.assertEqual(3, nat44_ses_create_num)
835 self.assertEqual(3, nat44_ses_delete_num)
837 def verify_ipfix_addr_exhausted(self, data):
839 Verify IPFIX NAT addresses event
841 :param data: Decoded IPFIX data records
843 self.assertEqual(1, len(data))
846 self.assertEqual(ord(record[230]), 3)
848 self.assertEqual(struct.pack("!I", 0), record[283])
850 def verify_ipfix_max_sessions(self, data, limit):
852 Verify IPFIX maximum session entries exceeded event
854 :param data: Decoded IPFIX data records
855 :param limit: Number of maximum session entries that can be created.
857 self.assertEqual(1, len(data))
860 self.assertEqual(ord(record[230]), 13)
861 # natQuotaExceededEvent
862 self.assertEqual(struct.pack("I", 1), record[466])
864 self.assertEqual(struct.pack("I", limit), record[471])
866 def verify_ipfix_max_bibs(self, data, limit):
868 Verify IPFIX maximum BIB entries exceeded event
870 :param data: Decoded IPFIX data records
871 :param limit: Number of maximum BIB entries that can be created.
873 self.assertEqual(1, len(data))
876 self.assertEqual(ord(record[230]), 13)
877 # natQuotaExceededEvent
878 self.assertEqual(struct.pack("I", 2), record[466])
880 self.assertEqual(struct.pack("I", limit), record[472])
882 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
884 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
886 :param data: Decoded IPFIX data records
887 :param limit: Number of maximum fragments pending reassembly
888 :param src_addr: IPv6 source address
890 self.assertEqual(1, len(data))
893 self.assertEqual(ord(record[230]), 13)
894 # natQuotaExceededEvent
895 self.assertEqual(struct.pack("I", 5), record[466])
896 # maxFragmentsPendingReassembly
897 self.assertEqual(struct.pack("I", limit), record[475])
899 self.assertEqual(src_addr, record[27])
901 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
903 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
905 :param data: Decoded IPFIX data records
906 :param limit: Number of maximum fragments pending reassembly
907 :param src_addr: IPv4 source address
909 self.assertEqual(1, len(data))
912 self.assertEqual(ord(record[230]), 13)
913 # natQuotaExceededEvent
914 self.assertEqual(struct.pack("I", 5), record[466])
915 # maxFragmentsPendingReassembly
916 self.assertEqual(struct.pack("I", limit), record[475])
918 self.assertEqual(src_addr, record[8])
920 def verify_ipfix_bib(self, data, is_create, src_addr):
922 Verify IPFIX NAT64 BIB create and delete events
924 :param data: Decoded IPFIX data records
925 :param is_create: Create event if nonzero value otherwise delete event
926 :param src_addr: IPv6 source address
928 self.assertEqual(1, len(data))
932 self.assertEqual(ord(record[230]), 10)
934 self.assertEqual(ord(record[230]), 11)
936 self.assertEqual(src_addr, record[27])
937 # postNATSourceIPv4Address
938 self.assertEqual(self.nat_addr_n, record[225])
940 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
942 self.assertEqual(struct.pack("!I", 0), record[234])
943 # sourceTransportPort
944 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
945 # postNAPTSourceTransportPort
946 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
948 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
951 Verify IPFIX NAT64 session create and delete events
953 :param data: Decoded IPFIX data records
954 :param is_create: Create event if nonzero value otherwise delete event
955 :param src_addr: IPv6 source address
956 :param dst_addr: IPv4 destination address
957 :param dst_port: destination TCP port
959 self.assertEqual(1, len(data))
963 self.assertEqual(ord(record[230]), 6)
965 self.assertEqual(ord(record[230]), 7)
967 self.assertEqual(src_addr, record[27])
968 # destinationIPv6Address
969 self.assertEqual(socket.inet_pton(socket.AF_INET6,
970 self.compose_ip6(dst_addr,
974 # postNATSourceIPv4Address
975 self.assertEqual(self.nat_addr_n, record[225])
976 # postNATDestinationIPv4Address
977 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
980 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
982 self.assertEqual(struct.pack("!I", 0), record[234])
983 # sourceTransportPort
984 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
985 # postNAPTSourceTransportPort
986 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
987 # destinationTransportPort
988 self.assertEqual(struct.pack("!H", dst_port), record[11])
989 # postNAPTDestinationTransportPort
990 self.assertEqual(struct.pack("!H", dst_port), record[228])
992 def verify_no_nat44_user(self):
993 """ Verify that there is no NAT44 user """
994 users = self.vapi.nat44_user_dump()
995 self.assertEqual(len(users), 0)
997 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
999 Verify IPFIX maximum entries per user exceeded event
1001 :param data: Decoded IPFIX data records
1002 :param limit: Number of maximum entries per user
1003 :param src_addr: IPv4 source address
1005 self.assertEqual(1, len(data))
1008 self.assertEqual(ord(record[230]), 13)
1009 # natQuotaExceededEvent
1010 self.assertEqual(struct.pack("I", 3), record[466])
1012 self.assertEqual(struct.pack("I", limit), record[473])
1014 self.assertEqual(src_addr, record[8])
1017 class TestNAT44(MethodHolder):
1018 """ NAT44 Test Cases """
1021 def setUpClass(cls):
1022 super(TestNAT44, cls).setUpClass()
1023 cls.vapi.cli("set log class nat level debug")
1026 cls.tcp_port_in = 6303
1027 cls.tcp_port_out = 6303
1028 cls.udp_port_in = 6304
1029 cls.udp_port_out = 6304
1030 cls.icmp_id_in = 6305
1031 cls.icmp_id_out = 6305
1032 cls.nat_addr = '10.0.0.3'
1033 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1034 cls.ipfix_src_port = 4739
1035 cls.ipfix_domain_id = 1
1036 cls.tcp_external_port = 80
1038 cls.create_pg_interfaces(range(10))
1039 cls.interfaces = list(cls.pg_interfaces[0:4])
1041 for i in cls.interfaces:
1046 cls.pg0.generate_remote_hosts(3)
1047 cls.pg0.configure_ipv4_neighbors()
1049 cls.pg1.generate_remote_hosts(1)
1050 cls.pg1.configure_ipv4_neighbors()
1052 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1053 cls.vapi.ip_table_add_del(10, is_add=1)
1054 cls.vapi.ip_table_add_del(20, is_add=1)
1056 cls.pg4._local_ip4 = "172.16.255.1"
1057 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1058 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1059 cls.pg4.set_table_ip4(10)
1060 cls.pg5._local_ip4 = "172.17.255.3"
1061 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1062 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1063 cls.pg5.set_table_ip4(10)
1064 cls.pg6._local_ip4 = "172.16.255.1"
1065 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1066 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1067 cls.pg6.set_table_ip4(20)
1068 for i in cls.overlapping_interfaces:
1076 cls.pg9.generate_remote_hosts(2)
1077 cls.pg9.config_ip4()
1078 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1079 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1083 cls.pg9.resolve_arp()
1084 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1085 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1086 cls.pg9.resolve_arp()
1089 super(TestNAT44, cls).tearDownClass()
1092 def test_dynamic(self):
1093 """ NAT44 dynamic translation test """
1095 self.nat44_add_address(self.nat_addr)
1096 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1097 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1101 pkts = self.create_stream_in(self.pg0, self.pg1)
1102 self.pg0.add_stream(pkts)
1103 self.pg_enable_capture(self.pg_interfaces)
1105 capture = self.pg1.get_capture(len(pkts))
1106 self.verify_capture_out(capture)
1109 pkts = self.create_stream_out(self.pg1)
1110 self.pg1.add_stream(pkts)
1111 self.pg_enable_capture(self.pg_interfaces)
1113 capture = self.pg0.get_capture(len(pkts))
1114 self.verify_capture_in(capture, self.pg0)
1116 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1117 """ NAT44 handling of client packets with TTL=1 """
1119 self.nat44_add_address(self.nat_addr)
1120 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1121 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124 # Client side - generate traffic
1125 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1126 self.pg0.add_stream(pkts)
1127 self.pg_enable_capture(self.pg_interfaces)
1130 # Client side - verify ICMP type 11 packets
1131 capture = self.pg0.get_capture(len(pkts))
1132 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1134 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1135 """ NAT44 handling of server packets with TTL=1 """
1137 self.nat44_add_address(self.nat_addr)
1138 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1139 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1142 # Client side - create sessions
1143 pkts = self.create_stream_in(self.pg0, self.pg1)
1144 self.pg0.add_stream(pkts)
1145 self.pg_enable_capture(self.pg_interfaces)
1148 # Server side - generate traffic
1149 capture = self.pg1.get_capture(len(pkts))
1150 self.verify_capture_out(capture)
1151 pkts = self.create_stream_out(self.pg1, ttl=1)
1152 self.pg1.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1156 # Server side - verify ICMP type 11 packets
1157 capture = self.pg1.get_capture(len(pkts))
1158 self.verify_capture_out_with_icmp_errors(capture,
1159 src_ip=self.pg1.local_ip4)
1161 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1162 """ NAT44 handling of error responses to client packets with TTL=2 """
1164 self.nat44_add_address(self.nat_addr)
1165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1169 # Client side - generate traffic
1170 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1171 self.pg0.add_stream(pkts)
1172 self.pg_enable_capture(self.pg_interfaces)
1175 # Server side - simulate ICMP type 11 response
1176 capture = self.pg1.get_capture(len(pkts))
1177 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1178 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1179 ICMP(type=11) / packet[IP] for packet in capture]
1180 self.pg1.add_stream(pkts)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 # Client side - verify ICMP type 11 packets
1185 capture = self.pg0.get_capture(len(pkts))
1186 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1188 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1189 """ NAT44 handling of error responses to server packets with TTL=2 """
1191 self.nat44_add_address(self.nat_addr)
1192 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1193 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1196 # Client side - create sessions
1197 pkts = self.create_stream_in(self.pg0, self.pg1)
1198 self.pg0.add_stream(pkts)
1199 self.pg_enable_capture(self.pg_interfaces)
1202 # Server side - generate traffic
1203 capture = self.pg1.get_capture(len(pkts))
1204 self.verify_capture_out(capture)
1205 pkts = self.create_stream_out(self.pg1, ttl=2)
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1210 # Client side - simulate ICMP type 11 response
1211 capture = self.pg0.get_capture(len(pkts))
1212 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1214 ICMP(type=11) / packet[IP] for packet in capture]
1215 self.pg0.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1219 # Server side - verify ICMP type 11 packets
1220 capture = self.pg1.get_capture(len(pkts))
1221 self.verify_capture_out_with_icmp_errors(capture)
1223 def test_ping_out_interface_from_outside(self):
1224 """ Ping NAT44 out interface from outside network """
1226 self.nat44_add_address(self.nat_addr)
1227 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1228 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1231 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1232 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1233 ICMP(id=self.icmp_id_out, type='echo-request'))
1235 self.pg1.add_stream(pkts)
1236 self.pg_enable_capture(self.pg_interfaces)
1238 capture = self.pg1.get_capture(len(pkts))
1239 self.assertEqual(1, len(capture))
1242 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1243 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1244 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1245 self.assertEqual(packet[ICMP].type, 0) # echo reply
1247 self.logger.error(ppp("Unexpected or invalid packet "
1248 "(outside network):", packet))
1251 def test_ping_internal_host_from_outside(self):
1252 """ Ping internal host from outside network """
1254 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1260 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1261 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1262 ICMP(id=self.icmp_id_out, type='echo-request'))
1263 self.pg1.add_stream(pkt)
1264 self.pg_enable_capture(self.pg_interfaces)
1266 capture = self.pg0.get_capture(1)
1267 self.verify_capture_in(capture, self.pg0, packet_num=1)
1268 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1271 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1272 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1273 ICMP(id=self.icmp_id_in, type='echo-reply'))
1274 self.pg0.add_stream(pkt)
1275 self.pg_enable_capture(self.pg_interfaces)
1277 capture = self.pg1.get_capture(1)
1278 self.verify_capture_out(capture, same_port=True, packet_num=1)
1279 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1281 def test_forwarding(self):
1282 """ NAT44 forwarding test """
1284 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1285 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1287 self.vapi.nat44_forwarding_enable_disable(1)
1289 real_ip = self.pg0.remote_ip4n
1290 alias_ip = self.nat_addr_n
1291 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1292 external_ip=alias_ip)
1295 # static mapping match
1297 pkts = self.create_stream_out(self.pg1)
1298 self.pg1.add_stream(pkts)
1299 self.pg_enable_capture(self.pg_interfaces)
1301 capture = self.pg0.get_capture(len(pkts))
1302 self.verify_capture_in(capture, self.pg0)
1304 pkts = self.create_stream_in(self.pg0, self.pg1)
1305 self.pg0.add_stream(pkts)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 capture = self.pg1.get_capture(len(pkts))
1309 self.verify_capture_out(capture, same_port=True)
1311 # no static mapping match
1313 host0 = self.pg0.remote_hosts[0]
1314 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1316 pkts = self.create_stream_out(self.pg1,
1317 dst_ip=self.pg0.remote_ip4,
1318 use_inside_ports=True)
1319 self.pg1.add_stream(pkts)
1320 self.pg_enable_capture(self.pg_interfaces)
1322 capture = self.pg0.get_capture(len(pkts))
1323 self.verify_capture_in(capture, self.pg0)
1325 pkts = self.create_stream_in(self.pg0, self.pg1)
1326 self.pg0.add_stream(pkts)
1327 self.pg_enable_capture(self.pg_interfaces)
1329 capture = self.pg1.get_capture(len(pkts))
1330 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1333 self.pg0.remote_hosts[0] = host0
1336 self.vapi.nat44_forwarding_enable_disable(0)
1337 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1338 external_ip=alias_ip,
1341 def test_static_in(self):
1342 """ 1:1 NAT initialized from inside network """
1344 nat_ip = "10.0.0.10"
1345 self.tcp_port_out = 6303
1346 self.udp_port_out = 6304
1347 self.icmp_id_out = 6305
1349 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1350 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1351 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1353 sm = self.vapi.nat44_static_mapping_dump()
1354 self.assertEqual(len(sm), 1)
1355 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1356 self.assertEqual(sm[0].protocol, 0)
1357 self.assertEqual(sm[0].local_port, 0)
1358 self.assertEqual(sm[0].external_port, 0)
1361 pkts = self.create_stream_in(self.pg0, self.pg1)
1362 self.pg0.add_stream(pkts)
1363 self.pg_enable_capture(self.pg_interfaces)
1365 capture = self.pg1.get_capture(len(pkts))
1366 self.verify_capture_out(capture, nat_ip, True)
1369 pkts = self.create_stream_out(self.pg1, nat_ip)
1370 self.pg1.add_stream(pkts)
1371 self.pg_enable_capture(self.pg_interfaces)
1373 capture = self.pg0.get_capture(len(pkts))
1374 self.verify_capture_in(capture, self.pg0)
1376 def test_static_out(self):
1377 """ 1:1 NAT initialized from outside network """
1379 nat_ip = "10.0.0.20"
1380 self.tcp_port_out = 6303
1381 self.udp_port_out = 6304
1382 self.icmp_id_out = 6305
1385 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1386 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1387 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1389 sm = self.vapi.nat44_static_mapping_dump()
1390 self.assertEqual(len(sm), 1)
1391 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1394 pkts = self.create_stream_out(self.pg1, nat_ip)
1395 self.pg1.add_stream(pkts)
1396 self.pg_enable_capture(self.pg_interfaces)
1398 capture = self.pg0.get_capture(len(pkts))
1399 self.verify_capture_in(capture, self.pg0)
1402 pkts = self.create_stream_in(self.pg0, self.pg1)
1403 self.pg0.add_stream(pkts)
1404 self.pg_enable_capture(self.pg_interfaces)
1406 capture = self.pg1.get_capture(len(pkts))
1407 self.verify_capture_out(capture, nat_ip, True)
1409 def test_static_with_port_in(self):
1410 """ 1:1 NAPT initialized from inside network """
1412 self.tcp_port_out = 3606
1413 self.udp_port_out = 3607
1414 self.icmp_id_out = 3608
1416 self.nat44_add_address(self.nat_addr)
1417 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1418 self.tcp_port_in, self.tcp_port_out,
1419 proto=IP_PROTOS.tcp)
1420 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1421 self.udp_port_in, self.udp_port_out,
1422 proto=IP_PROTOS.udp)
1423 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1424 self.icmp_id_in, self.icmp_id_out,
1425 proto=IP_PROTOS.icmp)
1426 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1427 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1431 pkts = self.create_stream_in(self.pg0, self.pg1)
1432 self.pg0.add_stream(pkts)
1433 self.pg_enable_capture(self.pg_interfaces)
1435 capture = self.pg1.get_capture(len(pkts))
1436 self.verify_capture_out(capture)
1439 pkts = self.create_stream_out(self.pg1)
1440 self.pg1.add_stream(pkts)
1441 self.pg_enable_capture(self.pg_interfaces)
1443 capture = self.pg0.get_capture(len(pkts))
1444 self.verify_capture_in(capture, self.pg0)
1446 def test_static_with_port_out(self):
1447 """ 1:1 NAPT initialized from outside network """
1449 self.tcp_port_out = 30606
1450 self.udp_port_out = 30607
1451 self.icmp_id_out = 30608
1453 self.nat44_add_address(self.nat_addr)
1454 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1455 self.tcp_port_in, self.tcp_port_out,
1456 proto=IP_PROTOS.tcp)
1457 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1458 self.udp_port_in, self.udp_port_out,
1459 proto=IP_PROTOS.udp)
1460 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1461 self.icmp_id_in, self.icmp_id_out,
1462 proto=IP_PROTOS.icmp)
1463 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1464 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1468 pkts = self.create_stream_out(self.pg1)
1469 self.pg1.add_stream(pkts)
1470 self.pg_enable_capture(self.pg_interfaces)
1472 capture = self.pg0.get_capture(len(pkts))
1473 self.verify_capture_in(capture, self.pg0)
1476 pkts = self.create_stream_in(self.pg0, self.pg1)
1477 self.pg0.add_stream(pkts)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg1.get_capture(len(pkts))
1481 self.verify_capture_out(capture)
1483 def test_static_vrf_aware(self):
1484 """ 1:1 NAT VRF awareness """
1486 nat_ip1 = "10.0.0.30"
1487 nat_ip2 = "10.0.0.40"
1488 self.tcp_port_out = 6303
1489 self.udp_port_out = 6304
1490 self.icmp_id_out = 6305
1492 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1494 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1496 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1498 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1499 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1501 # inside interface VRF match NAT44 static mapping VRF
1502 pkts = self.create_stream_in(self.pg4, self.pg3)
1503 self.pg4.add_stream(pkts)
1504 self.pg_enable_capture(self.pg_interfaces)
1506 capture = self.pg3.get_capture(len(pkts))
1507 self.verify_capture_out(capture, nat_ip1, True)
1509 # inside interface VRF don't match NAT44 static mapping VRF (packets
1511 pkts = self.create_stream_in(self.pg0, self.pg3)
1512 self.pg0.add_stream(pkts)
1513 self.pg_enable_capture(self.pg_interfaces)
1515 self.pg3.assert_nothing_captured()
1517 def test_dynamic_to_static(self):
1518 """ Switch from dynamic translation to 1:1NAT """
1519 nat_ip = "10.0.0.10"
1520 self.tcp_port_out = 6303
1521 self.udp_port_out = 6304
1522 self.icmp_id_out = 6305
1524 self.nat44_add_address(self.nat_addr)
1525 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1526 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1530 pkts = self.create_stream_in(self.pg0, self.pg1)
1531 self.pg0.add_stream(pkts)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg1.get_capture(len(pkts))
1535 self.verify_capture_out(capture)
1538 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1539 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1540 self.assertEqual(len(sessions), 0)
1541 pkts = self.create_stream_in(self.pg0, self.pg1)
1542 self.pg0.add_stream(pkts)
1543 self.pg_enable_capture(self.pg_interfaces)
1545 capture = self.pg1.get_capture(len(pkts))
1546 self.verify_capture_out(capture, nat_ip, True)
1548 def test_identity_nat(self):
1549 """ Identity NAT """
1551 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1552 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1553 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1556 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1557 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1558 TCP(sport=12345, dport=56789))
1559 self.pg1.add_stream(p)
1560 self.pg_enable_capture(self.pg_interfaces)
1562 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(ip.src, self.pg1.remote_ip4)
1569 self.assertEqual(tcp.dport, 56789)
1570 self.assertEqual(tcp.sport, 12345)
1571 self.assert_packet_checksums_valid(p)
1573 self.logger.error(ppp("Unexpected or invalid packet:", p))
1576 def test_multiple_inside_interfaces(self):
1577 """ NAT44 multiple non-overlapping address space inside interfaces """
1579 self.nat44_add_address(self.nat_addr)
1580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1582 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1585 # between two NAT44 inside interfaces (no translation)
1586 pkts = self.create_stream_in(self.pg0, self.pg1)
1587 self.pg0.add_stream(pkts)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg1.get_capture(len(pkts))
1591 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1593 # from NAT44 inside to interface without NAT44 feature (no translation)
1594 pkts = self.create_stream_in(self.pg0, self.pg2)
1595 self.pg0.add_stream(pkts)
1596 self.pg_enable_capture(self.pg_interfaces)
1598 capture = self.pg2.get_capture(len(pkts))
1599 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1601 # in2out 1st interface
1602 pkts = self.create_stream_in(self.pg0, self.pg3)
1603 self.pg0.add_stream(pkts)
1604 self.pg_enable_capture(self.pg_interfaces)
1606 capture = self.pg3.get_capture(len(pkts))
1607 self.verify_capture_out(capture)
1609 # out2in 1st interface
1610 pkts = self.create_stream_out(self.pg3)
1611 self.pg3.add_stream(pkts)
1612 self.pg_enable_capture(self.pg_interfaces)
1614 capture = self.pg0.get_capture(len(pkts))
1615 self.verify_capture_in(capture, self.pg0)
1617 # in2out 2nd interface
1618 pkts = self.create_stream_in(self.pg1, self.pg3)
1619 self.pg1.add_stream(pkts)
1620 self.pg_enable_capture(self.pg_interfaces)
1622 capture = self.pg3.get_capture(len(pkts))
1623 self.verify_capture_out(capture)
1625 # out2in 2nd interface
1626 pkts = self.create_stream_out(self.pg3)
1627 self.pg3.add_stream(pkts)
1628 self.pg_enable_capture(self.pg_interfaces)
1630 capture = self.pg1.get_capture(len(pkts))
1631 self.verify_capture_in(capture, self.pg1)
1633 def test_inside_overlapping_interfaces(self):
1634 """ NAT44 multiple inside interfaces with overlapping address space """
1636 static_nat_ip = "10.0.0.10"
1637 self.nat44_add_address(self.nat_addr)
1638 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1640 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1641 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1642 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1643 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1646 # between NAT44 inside interfaces with same VRF (no translation)
1647 pkts = self.create_stream_in(self.pg4, self.pg5)
1648 self.pg4.add_stream(pkts)
1649 self.pg_enable_capture(self.pg_interfaces)
1651 capture = self.pg5.get_capture(len(pkts))
1652 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1654 # between NAT44 inside interfaces with different VRF (hairpinning)
1655 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1656 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1657 TCP(sport=1234, dport=5678))
1658 self.pg4.add_stream(p)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg6.get_capture(1)
1666 self.assertEqual(ip.src, self.nat_addr)
1667 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1668 self.assertNotEqual(tcp.sport, 1234)
1669 self.assertEqual(tcp.dport, 5678)
1671 self.logger.error(ppp("Unexpected or invalid packet:", p))
1674 # in2out 1st interface
1675 pkts = self.create_stream_in(self.pg4, self.pg3)
1676 self.pg4.add_stream(pkts)
1677 self.pg_enable_capture(self.pg_interfaces)
1679 capture = self.pg3.get_capture(len(pkts))
1680 self.verify_capture_out(capture)
1682 # out2in 1st interface
1683 pkts = self.create_stream_out(self.pg3)
1684 self.pg3.add_stream(pkts)
1685 self.pg_enable_capture(self.pg_interfaces)
1687 capture = self.pg4.get_capture(len(pkts))
1688 self.verify_capture_in(capture, self.pg4)
1690 # in2out 2nd interface
1691 pkts = self.create_stream_in(self.pg5, self.pg3)
1692 self.pg5.add_stream(pkts)
1693 self.pg_enable_capture(self.pg_interfaces)
1695 capture = self.pg3.get_capture(len(pkts))
1696 self.verify_capture_out(capture)
1698 # out2in 2nd interface
1699 pkts = self.create_stream_out(self.pg3)
1700 self.pg3.add_stream(pkts)
1701 self.pg_enable_capture(self.pg_interfaces)
1703 capture = self.pg5.get_capture(len(pkts))
1704 self.verify_capture_in(capture, self.pg5)
1707 addresses = self.vapi.nat44_address_dump()
1708 self.assertEqual(len(addresses), 1)
1709 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1710 self.assertEqual(len(sessions), 3)
1711 for session in sessions:
1712 self.assertFalse(session.is_static)
1713 self.assertEqual(session.inside_ip_address[0:4],
1714 self.pg5.remote_ip4n)
1715 self.assertEqual(session.outside_ip_address,
1716 addresses[0].ip_address)
1717 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1718 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1719 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1720 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1721 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1722 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1723 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1724 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1725 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1727 # in2out 3rd interface
1728 pkts = self.create_stream_in(self.pg6, self.pg3)
1729 self.pg6.add_stream(pkts)
1730 self.pg_enable_capture(self.pg_interfaces)
1732 capture = self.pg3.get_capture(len(pkts))
1733 self.verify_capture_out(capture, static_nat_ip, True)
1735 # out2in 3rd interface
1736 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1737 self.pg3.add_stream(pkts)
1738 self.pg_enable_capture(self.pg_interfaces)
1740 capture = self.pg6.get_capture(len(pkts))
1741 self.verify_capture_in(capture, self.pg6)
1743 # general user and session dump verifications
1744 users = self.vapi.nat44_user_dump()
1745 self.assertTrue(len(users) >= 3)
1746 addresses = self.vapi.nat44_address_dump()
1747 self.assertEqual(len(addresses), 1)
1749 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1751 for session in sessions:
1752 self.assertEqual(user.ip_address, session.inside_ip_address)
1753 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1754 self.assertTrue(session.protocol in
1755 [IP_PROTOS.tcp, IP_PROTOS.udp,
1757 self.assertFalse(session.ext_host_valid)
1760 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1761 self.assertTrue(len(sessions) >= 4)
1762 for session in sessions:
1763 self.assertFalse(session.is_static)
1764 self.assertEqual(session.inside_ip_address[0:4],
1765 self.pg4.remote_ip4n)
1766 self.assertEqual(session.outside_ip_address,
1767 addresses[0].ip_address)
1770 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1771 self.assertTrue(len(sessions) >= 3)
1772 for session in sessions:
1773 self.assertTrue(session.is_static)
1774 self.assertEqual(session.inside_ip_address[0:4],
1775 self.pg6.remote_ip4n)
1776 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1777 map(int, static_nat_ip.split('.')))
1778 self.assertTrue(session.inside_port in
1779 [self.tcp_port_in, self.udp_port_in,
1782 def test_hairpinning(self):
1783 """ NAT44 hairpinning - 1:1 NAPT """
1785 host = self.pg0.remote_hosts[0]
1786 server = self.pg0.remote_hosts[1]
1789 server_in_port = 5678
1790 server_out_port = 8765
1792 self.nat44_add_address(self.nat_addr)
1793 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1794 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1796 # add static mapping for server
1797 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1798 server_in_port, server_out_port,
1799 proto=IP_PROTOS.tcp)
1801 # send packet from host to server
1802 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1803 IP(src=host.ip4, dst=self.nat_addr) /
1804 TCP(sport=host_in_port, dport=server_out_port))
1805 self.pg0.add_stream(p)
1806 self.pg_enable_capture(self.pg_interfaces)
1808 capture = self.pg0.get_capture(1)
1813 self.assertEqual(ip.src, self.nat_addr)
1814 self.assertEqual(ip.dst, server.ip4)
1815 self.assertNotEqual(tcp.sport, host_in_port)
1816 self.assertEqual(tcp.dport, server_in_port)
1817 self.assert_packet_checksums_valid(p)
1818 host_out_port = tcp.sport
1820 self.logger.error(ppp("Unexpected or invalid packet:", p))
1823 # send reply from server to host
1824 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1825 IP(src=server.ip4, dst=self.nat_addr) /
1826 TCP(sport=server_in_port, dport=host_out_port))
1827 self.pg0.add_stream(p)
1828 self.pg_enable_capture(self.pg_interfaces)
1830 capture = self.pg0.get_capture(1)
1835 self.assertEqual(ip.src, self.nat_addr)
1836 self.assertEqual(ip.dst, host.ip4)
1837 self.assertEqual(tcp.sport, server_out_port)
1838 self.assertEqual(tcp.dport, host_in_port)
1839 self.assert_packet_checksums_valid(p)
1841 self.logger.error(ppp("Unexpected or invalid packet:", p))
1844 def test_hairpinning2(self):
1845 """ NAT44 hairpinning - 1:1 NAT"""
1847 server1_nat_ip = "10.0.0.10"
1848 server2_nat_ip = "10.0.0.11"
1849 host = self.pg0.remote_hosts[0]
1850 server1 = self.pg0.remote_hosts[1]
1851 server2 = self.pg0.remote_hosts[2]
1852 server_tcp_port = 22
1853 server_udp_port = 20
1855 self.nat44_add_address(self.nat_addr)
1856 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1857 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1860 # add static mapping for servers
1861 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1862 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1866 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1867 IP(src=host.ip4, dst=server1_nat_ip) /
1868 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1870 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1871 IP(src=host.ip4, dst=server1_nat_ip) /
1872 UDP(sport=self.udp_port_in, dport=server_udp_port))
1874 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1875 IP(src=host.ip4, dst=server1_nat_ip) /
1876 ICMP(id=self.icmp_id_in, type='echo-request'))
1878 self.pg0.add_stream(pkts)
1879 self.pg_enable_capture(self.pg_interfaces)
1881 capture = self.pg0.get_capture(len(pkts))
1882 for packet in capture:
1884 self.assertEqual(packet[IP].src, self.nat_addr)
1885 self.assertEqual(packet[IP].dst, server1.ip4)
1886 if packet.haslayer(TCP):
1887 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1888 self.assertEqual(packet[TCP].dport, server_tcp_port)
1889 self.tcp_port_out = packet[TCP].sport
1890 self.assert_packet_checksums_valid(packet)
1891 elif packet.haslayer(UDP):
1892 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1893 self.assertEqual(packet[UDP].dport, server_udp_port)
1894 self.udp_port_out = packet[UDP].sport
1896 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1897 self.icmp_id_out = packet[ICMP].id
1899 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1904 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1905 IP(src=server1.ip4, dst=self.nat_addr) /
1906 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1908 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1909 IP(src=server1.ip4, dst=self.nat_addr) /
1910 UDP(sport=server_udp_port, dport=self.udp_port_out))
1912 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1913 IP(src=server1.ip4, dst=self.nat_addr) /
1914 ICMP(id=self.icmp_id_out, type='echo-reply'))
1916 self.pg0.add_stream(pkts)
1917 self.pg_enable_capture(self.pg_interfaces)
1919 capture = self.pg0.get_capture(len(pkts))
1920 for packet in capture:
1922 self.assertEqual(packet[IP].src, server1_nat_ip)
1923 self.assertEqual(packet[IP].dst, host.ip4)
1924 if packet.haslayer(TCP):
1925 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1926 self.assertEqual(packet[TCP].sport, server_tcp_port)
1927 self.assert_packet_checksums_valid(packet)
1928 elif packet.haslayer(UDP):
1929 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1930 self.assertEqual(packet[UDP].sport, server_udp_port)
1932 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1934 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1937 # server2 to server1
1939 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1940 IP(src=server2.ip4, dst=server1_nat_ip) /
1941 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1943 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1944 IP(src=server2.ip4, dst=server1_nat_ip) /
1945 UDP(sport=self.udp_port_in, dport=server_udp_port))
1947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1948 IP(src=server2.ip4, dst=server1_nat_ip) /
1949 ICMP(id=self.icmp_id_in, type='echo-request'))
1951 self.pg0.add_stream(pkts)
1952 self.pg_enable_capture(self.pg_interfaces)
1954 capture = self.pg0.get_capture(len(pkts))
1955 for packet in capture:
1957 self.assertEqual(packet[IP].src, server2_nat_ip)
1958 self.assertEqual(packet[IP].dst, server1.ip4)
1959 if packet.haslayer(TCP):
1960 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1961 self.assertEqual(packet[TCP].dport, server_tcp_port)
1962 self.tcp_port_out = packet[TCP].sport
1963 self.assert_packet_checksums_valid(packet)
1964 elif packet.haslayer(UDP):
1965 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1966 self.assertEqual(packet[UDP].dport, server_udp_port)
1967 self.udp_port_out = packet[UDP].sport
1969 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1970 self.icmp_id_out = packet[ICMP].id
1972 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1975 # server1 to server2
1977 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1978 IP(src=server1.ip4, dst=server2_nat_ip) /
1979 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1982 IP(src=server1.ip4, dst=server2_nat_ip) /
1983 UDP(sport=server_udp_port, dport=self.udp_port_out))
1985 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1986 IP(src=server1.ip4, dst=server2_nat_ip) /
1987 ICMP(id=self.icmp_id_out, type='echo-reply'))
1989 self.pg0.add_stream(pkts)
1990 self.pg_enable_capture(self.pg_interfaces)
1992 capture = self.pg0.get_capture(len(pkts))
1993 for packet in capture:
1995 self.assertEqual(packet[IP].src, server1_nat_ip)
1996 self.assertEqual(packet[IP].dst, server2.ip4)
1997 if packet.haslayer(TCP):
1998 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1999 self.assertEqual(packet[TCP].sport, server_tcp_port)
2000 self.assert_packet_checksums_valid(packet)
2001 elif packet.haslayer(UDP):
2002 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2003 self.assertEqual(packet[UDP].sport, server_udp_port)
2005 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2007 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2010 def test_max_translations_per_user(self):
2011 """ MAX translations per user - recycle the least recently used """
2013 self.nat44_add_address(self.nat_addr)
2014 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2015 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2018 # get maximum number of translations per user
2019 nat44_config = self.vapi.nat_show_config()
2021 # send more than maximum number of translations per user packets
2022 pkts_num = nat44_config.max_translations_per_user + 5
2024 for port in range(0, pkts_num):
2025 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2026 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2027 TCP(sport=1025 + port))
2029 self.pg0.add_stream(pkts)
2030 self.pg_enable_capture(self.pg_interfaces)
2033 # verify number of translated packet
2034 self.pg1.get_capture(pkts_num)
2036 users = self.vapi.nat44_user_dump()
2038 if user.ip_address == self.pg0.remote_ip4n:
2039 self.assertEqual(user.nsessions,
2040 nat44_config.max_translations_per_user)
2041 self.assertEqual(user.nstaticsessions, 0)
2044 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2046 proto=IP_PROTOS.tcp)
2047 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2048 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2049 TCP(sport=tcp_port))
2050 self.pg0.add_stream(p)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 self.pg1.get_capture(1)
2054 users = self.vapi.nat44_user_dump()
2056 if user.ip_address == self.pg0.remote_ip4n:
2057 self.assertEqual(user.nsessions,
2058 nat44_config.max_translations_per_user - 1)
2059 self.assertEqual(user.nstaticsessions, 1)
2061 def test_interface_addr(self):
2062 """ Acquire NAT44 addresses from interface """
2063 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2065 # no address in NAT pool
2066 adresses = self.vapi.nat44_address_dump()
2067 self.assertEqual(0, len(adresses))
2069 # configure interface address and check NAT address pool
2070 self.pg7.config_ip4()
2071 adresses = self.vapi.nat44_address_dump()
2072 self.assertEqual(1, len(adresses))
2073 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2075 # remove interface address and check NAT address pool
2076 self.pg7.unconfig_ip4()
2077 adresses = self.vapi.nat44_address_dump()
2078 self.assertEqual(0, len(adresses))
2080 def test_interface_addr_static_mapping(self):
2081 """ Static mapping with addresses from interface """
2084 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2085 self.nat44_add_static_mapping(
2087 external_sw_if_index=self.pg7.sw_if_index,
2090 # static mappings with external interface
2091 static_mappings = self.vapi.nat44_static_mapping_dump()
2092 self.assertEqual(1, len(static_mappings))
2093 self.assertEqual(self.pg7.sw_if_index,
2094 static_mappings[0].external_sw_if_index)
2095 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2097 # configure interface address and check static mappings
2098 self.pg7.config_ip4()
2099 static_mappings = self.vapi.nat44_static_mapping_dump()
2100 self.assertEqual(2, len(static_mappings))
2102 for sm in static_mappings:
2103 if sm.external_sw_if_index == 0xFFFFFFFF:
2104 self.assertEqual(sm.external_ip_address[0:4],
2105 self.pg7.local_ip4n)
2106 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2108 self.assertTrue(resolved)
2110 # remove interface address and check static mappings
2111 self.pg7.unconfig_ip4()
2112 static_mappings = self.vapi.nat44_static_mapping_dump()
2113 self.assertEqual(1, len(static_mappings))
2114 self.assertEqual(self.pg7.sw_if_index,
2115 static_mappings[0].external_sw_if_index)
2116 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2118 # configure interface address again and check static mappings
2119 self.pg7.config_ip4()
2120 static_mappings = self.vapi.nat44_static_mapping_dump()
2121 self.assertEqual(2, len(static_mappings))
2123 for sm in static_mappings:
2124 if sm.external_sw_if_index == 0xFFFFFFFF:
2125 self.assertEqual(sm.external_ip_address[0:4],
2126 self.pg7.local_ip4n)
2127 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2129 self.assertTrue(resolved)
2131 # remove static mapping
2132 self.nat44_add_static_mapping(
2134 external_sw_if_index=self.pg7.sw_if_index,
2137 static_mappings = self.vapi.nat44_static_mapping_dump()
2138 self.assertEqual(0, len(static_mappings))
2140 def test_interface_addr_identity_nat(self):
2141 """ Identity NAT with addresses from interface """
2144 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2145 self.vapi.nat44_add_del_identity_mapping(
2146 sw_if_index=self.pg7.sw_if_index,
2148 protocol=IP_PROTOS.tcp,
2151 # identity mappings with external interface
2152 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2153 self.assertEqual(1, len(identity_mappings))
2154 self.assertEqual(self.pg7.sw_if_index,
2155 identity_mappings[0].sw_if_index)
2157 # configure interface address and check identity mappings
2158 self.pg7.config_ip4()
2159 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2161 self.assertEqual(2, len(identity_mappings))
2162 for sm in identity_mappings:
2163 if sm.sw_if_index == 0xFFFFFFFF:
2164 self.assertEqual(identity_mappings[0].ip_address,
2165 self.pg7.local_ip4n)
2166 self.assertEqual(port, identity_mappings[0].port)
2167 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2169 self.assertTrue(resolved)
2171 # remove interface address and check identity mappings
2172 self.pg7.unconfig_ip4()
2173 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2174 self.assertEqual(1, len(identity_mappings))
2175 self.assertEqual(self.pg7.sw_if_index,
2176 identity_mappings[0].sw_if_index)
2178 def test_ipfix_nat44_sess(self):
2179 """ IPFIX logging NAT44 session created/delted """
2180 self.ipfix_domain_id = 10
2181 self.ipfix_src_port = 20202
2182 colector_port = 30303
2183 bind_layers(UDP, IPFIX, dport=30303)
2184 self.nat44_add_address(self.nat_addr)
2185 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2186 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2188 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2189 src_address=self.pg3.local_ip4n,
2191 template_interval=10,
2192 collector_port=colector_port)
2193 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2194 src_port=self.ipfix_src_port)
2196 pkts = self.create_stream_in(self.pg0, self.pg1)
2197 self.pg0.add_stream(pkts)
2198 self.pg_enable_capture(self.pg_interfaces)
2200 capture = self.pg1.get_capture(len(pkts))
2201 self.verify_capture_out(capture)
2202 self.nat44_add_address(self.nat_addr, is_add=0)
2203 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2204 capture = self.pg3.get_capture(9)
2205 ipfix = IPFIXDecoder()
2206 # first load template
2208 self.assertTrue(p.haslayer(IPFIX))
2209 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2210 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2211 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2212 self.assertEqual(p[UDP].dport, colector_port)
2213 self.assertEqual(p[IPFIX].observationDomainID,
2214 self.ipfix_domain_id)
2215 if p.haslayer(Template):
2216 ipfix.add_template(p.getlayer(Template))
2217 # verify events in data set
2219 if p.haslayer(Data):
2220 data = ipfix.decode_data_set(p.getlayer(Set))
2221 self.verify_ipfix_nat44_ses(data)
2223 def test_ipfix_addr_exhausted(self):
2224 """ IPFIX logging NAT addresses exhausted """
2225 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2226 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2228 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2229 src_address=self.pg3.local_ip4n,
2231 template_interval=10)
2232 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2233 src_port=self.ipfix_src_port)
2235 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2236 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2238 self.pg0.add_stream(p)
2239 self.pg_enable_capture(self.pg_interfaces)
2241 self.pg1.assert_nothing_captured()
2243 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2244 capture = self.pg3.get_capture(9)
2245 ipfix = IPFIXDecoder()
2246 # first load template
2248 self.assertTrue(p.haslayer(IPFIX))
2249 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2250 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2251 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2252 self.assertEqual(p[UDP].dport, 4739)
2253 self.assertEqual(p[IPFIX].observationDomainID,
2254 self.ipfix_domain_id)
2255 if p.haslayer(Template):
2256 ipfix.add_template(p.getlayer(Template))
2257 # verify events in data set
2259 if p.haslayer(Data):
2260 data = ipfix.decode_data_set(p.getlayer(Set))
2261 self.verify_ipfix_addr_exhausted(data)
2263 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2264 def test_ipfix_max_sessions(self):
2265 """ IPFIX logging maximum session entries exceeded """
2266 self.nat44_add_address(self.nat_addr)
2267 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2268 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2271 nat44_config = self.vapi.nat_show_config()
2272 max_sessions = 10 * nat44_config.translation_buckets
2275 for i in range(0, max_sessions):
2276 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=src, dst=self.pg1.remote_ip4) /
2281 self.pg0.add_stream(pkts)
2282 self.pg_enable_capture(self.pg_interfaces)
2285 self.pg1.get_capture(max_sessions)
2286 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2287 src_address=self.pg3.local_ip4n,
2289 template_interval=10)
2290 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2291 src_port=self.ipfix_src_port)
2293 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2294 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2296 self.pg0.add_stream(p)
2297 self.pg_enable_capture(self.pg_interfaces)
2299 self.pg1.assert_nothing_captured()
2301 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2302 capture = self.pg3.get_capture(9)
2303 ipfix = IPFIXDecoder()
2304 # first load template
2306 self.assertTrue(p.haslayer(IPFIX))
2307 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2308 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2309 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2310 self.assertEqual(p[UDP].dport, 4739)
2311 self.assertEqual(p[IPFIX].observationDomainID,
2312 self.ipfix_domain_id)
2313 if p.haslayer(Template):
2314 ipfix.add_template(p.getlayer(Template))
2315 # verify events in data set
2317 if p.haslayer(Data):
2318 data = ipfix.decode_data_set(p.getlayer(Set))
2319 self.verify_ipfix_max_sessions(data, max_sessions)
2321 def test_pool_addr_fib(self):
2322 """ NAT44 add pool addresses to FIB """
2323 static_addr = '10.0.0.10'
2324 self.nat44_add_address(self.nat_addr)
2325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2326 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2328 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2331 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2332 ARP(op=ARP.who_has, pdst=self.nat_addr,
2333 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2334 self.pg1.add_stream(p)
2335 self.pg_enable_capture(self.pg_interfaces)
2337 capture = self.pg1.get_capture(1)
2338 self.assertTrue(capture[0].haslayer(ARP))
2339 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2342 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2343 ARP(op=ARP.who_has, pdst=static_addr,
2344 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2345 self.pg1.add_stream(p)
2346 self.pg_enable_capture(self.pg_interfaces)
2348 capture = self.pg1.get_capture(1)
2349 self.assertTrue(capture[0].haslayer(ARP))
2350 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2352 # send ARP to non-NAT44 interface
2353 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2354 ARP(op=ARP.who_has, pdst=self.nat_addr,
2355 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2356 self.pg2.add_stream(p)
2357 self.pg_enable_capture(self.pg_interfaces)
2359 self.pg1.assert_nothing_captured()
2361 # remove addresses and verify
2362 self.nat44_add_address(self.nat_addr, is_add=0)
2363 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2366 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2367 ARP(op=ARP.who_has, pdst=self.nat_addr,
2368 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2369 self.pg1.add_stream(p)
2370 self.pg_enable_capture(self.pg_interfaces)
2372 self.pg1.assert_nothing_captured()
2374 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2375 ARP(op=ARP.who_has, pdst=static_addr,
2376 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2377 self.pg1.add_stream(p)
2378 self.pg_enable_capture(self.pg_interfaces)
2380 self.pg1.assert_nothing_captured()
2382 def test_vrf_mode(self):
2383 """ NAT44 tenant VRF aware address pool mode """
2387 nat_ip1 = "10.0.0.10"
2388 nat_ip2 = "10.0.0.11"
2390 self.pg0.unconfig_ip4()
2391 self.pg1.unconfig_ip4()
2392 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2393 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2394 self.pg0.set_table_ip4(vrf_id1)
2395 self.pg1.set_table_ip4(vrf_id2)
2396 self.pg0.config_ip4()
2397 self.pg1.config_ip4()
2398 self.pg0.resolve_arp()
2399 self.pg1.resolve_arp()
2401 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2402 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2403 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2404 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2410 pkts = self.create_stream_in(self.pg0, self.pg2)
2411 self.pg0.add_stream(pkts)
2412 self.pg_enable_capture(self.pg_interfaces)
2414 capture = self.pg2.get_capture(len(pkts))
2415 self.verify_capture_out(capture, nat_ip1)
2418 pkts = self.create_stream_in(self.pg1, self.pg2)
2419 self.pg1.add_stream(pkts)
2420 self.pg_enable_capture(self.pg_interfaces)
2422 capture = self.pg2.get_capture(len(pkts))
2423 self.verify_capture_out(capture, nat_ip2)
2426 self.pg0.unconfig_ip4()
2427 self.pg1.unconfig_ip4()
2428 self.pg0.set_table_ip4(0)
2429 self.pg1.set_table_ip4(0)
2430 self.pg0.config_ip4()
2431 self.pg1.config_ip4()
2432 self.pg0.resolve_arp()
2433 self.pg1.resolve_arp()
2434 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2435 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2437 def test_vrf_feature_independent(self):
2438 """ NAT44 tenant VRF independent address pool mode """
2440 nat_ip1 = "10.0.0.10"
2441 nat_ip2 = "10.0.0.11"
2443 self.nat44_add_address(nat_ip1)
2444 self.nat44_add_address(nat_ip2, vrf_id=99)
2445 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2446 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2447 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2451 pkts = self.create_stream_in(self.pg0, self.pg2)
2452 self.pg0.add_stream(pkts)
2453 self.pg_enable_capture(self.pg_interfaces)
2455 capture = self.pg2.get_capture(len(pkts))
2456 self.verify_capture_out(capture, nat_ip1)
2459 pkts = self.create_stream_in(self.pg1, self.pg2)
2460 self.pg1.add_stream(pkts)
2461 self.pg_enable_capture(self.pg_interfaces)
2463 capture = self.pg2.get_capture(len(pkts))
2464 self.verify_capture_out(capture, nat_ip1)
2466 def test_dynamic_ipless_interfaces(self):
2467 """ NAT44 interfaces without configured IP address """
2469 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2470 mactobinary(self.pg7.remote_mac),
2471 self.pg7.remote_ip4n,
2473 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2474 mactobinary(self.pg8.remote_mac),
2475 self.pg8.remote_ip4n,
2478 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2479 dst_address_length=32,
2480 next_hop_address=self.pg7.remote_ip4n,
2481 next_hop_sw_if_index=self.pg7.sw_if_index)
2482 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2483 dst_address_length=32,
2484 next_hop_address=self.pg8.remote_ip4n,
2485 next_hop_sw_if_index=self.pg8.sw_if_index)
2487 self.nat44_add_address(self.nat_addr)
2488 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2489 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2493 pkts = self.create_stream_in(self.pg7, self.pg8)
2494 self.pg7.add_stream(pkts)
2495 self.pg_enable_capture(self.pg_interfaces)
2497 capture = self.pg8.get_capture(len(pkts))
2498 self.verify_capture_out(capture)
2501 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2502 self.pg8.add_stream(pkts)
2503 self.pg_enable_capture(self.pg_interfaces)
2505 capture = self.pg7.get_capture(len(pkts))
2506 self.verify_capture_in(capture, self.pg7)
2508 def test_static_ipless_interfaces(self):
2509 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2511 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2512 mactobinary(self.pg7.remote_mac),
2513 self.pg7.remote_ip4n,
2515 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2516 mactobinary(self.pg8.remote_mac),
2517 self.pg8.remote_ip4n,
2520 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2521 dst_address_length=32,
2522 next_hop_address=self.pg7.remote_ip4n,
2523 next_hop_sw_if_index=self.pg7.sw_if_index)
2524 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2525 dst_address_length=32,
2526 next_hop_address=self.pg8.remote_ip4n,
2527 next_hop_sw_if_index=self.pg8.sw_if_index)
2529 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2530 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2535 pkts = self.create_stream_out(self.pg8)
2536 self.pg8.add_stream(pkts)
2537 self.pg_enable_capture(self.pg_interfaces)
2539 capture = self.pg7.get_capture(len(pkts))
2540 self.verify_capture_in(capture, self.pg7)
2543 pkts = self.create_stream_in(self.pg7, self.pg8)
2544 self.pg7.add_stream(pkts)
2545 self.pg_enable_capture(self.pg_interfaces)
2547 capture = self.pg8.get_capture(len(pkts))
2548 self.verify_capture_out(capture, self.nat_addr, True)
2550 def test_static_with_port_ipless_interfaces(self):
2551 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2553 self.tcp_port_out = 30606
2554 self.udp_port_out = 30607
2555 self.icmp_id_out = 30608
2557 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2558 mactobinary(self.pg7.remote_mac),
2559 self.pg7.remote_ip4n,
2561 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2562 mactobinary(self.pg8.remote_mac),
2563 self.pg8.remote_ip4n,
2566 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2567 dst_address_length=32,
2568 next_hop_address=self.pg7.remote_ip4n,
2569 next_hop_sw_if_index=self.pg7.sw_if_index)
2570 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2571 dst_address_length=32,
2572 next_hop_address=self.pg8.remote_ip4n,
2573 next_hop_sw_if_index=self.pg8.sw_if_index)
2575 self.nat44_add_address(self.nat_addr)
2576 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2577 self.tcp_port_in, self.tcp_port_out,
2578 proto=IP_PROTOS.tcp)
2579 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2580 self.udp_port_in, self.udp_port_out,
2581 proto=IP_PROTOS.udp)
2582 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2583 self.icmp_id_in, self.icmp_id_out,
2584 proto=IP_PROTOS.icmp)
2585 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2586 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2590 pkts = self.create_stream_out(self.pg8)
2591 self.pg8.add_stream(pkts)
2592 self.pg_enable_capture(self.pg_interfaces)
2594 capture = self.pg7.get_capture(len(pkts))
2595 self.verify_capture_in(capture, self.pg7)
2598 pkts = self.create_stream_in(self.pg7, self.pg8)
2599 self.pg7.add_stream(pkts)
2600 self.pg_enable_capture(self.pg_interfaces)
2602 capture = self.pg8.get_capture(len(pkts))
2603 self.verify_capture_out(capture)
2605 def test_static_unknown_proto(self):
2606 """ 1:1 NAT translate packet with unknown protocol """
2607 nat_ip = "10.0.0.10"
2608 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2609 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2610 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2614 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2615 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2617 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2618 TCP(sport=1234, dport=1234))
2619 self.pg0.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 p = self.pg1.get_capture(1)
2625 self.assertEqual(packet[IP].src, nat_ip)
2626 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2627 self.assertTrue(packet.haslayer(GRE))
2628 self.assert_packet_checksums_valid(packet)
2630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2634 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2635 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2637 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2638 TCP(sport=1234, dport=1234))
2639 self.pg1.add_stream(p)
2640 self.pg_enable_capture(self.pg_interfaces)
2642 p = self.pg0.get_capture(1)
2645 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2646 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2647 self.assertTrue(packet.haslayer(GRE))
2648 self.assert_packet_checksums_valid(packet)
2650 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2653 def test_hairpinning_static_unknown_proto(self):
2654 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2656 host = self.pg0.remote_hosts[0]
2657 server = self.pg0.remote_hosts[1]
2659 host_nat_ip = "10.0.0.10"
2660 server_nat_ip = "10.0.0.11"
2662 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2663 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2664 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2665 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2669 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2670 IP(src=host.ip4, dst=server_nat_ip) /
2672 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2673 TCP(sport=1234, dport=1234))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 p = self.pg0.get_capture(1)
2680 self.assertEqual(packet[IP].src, host_nat_ip)
2681 self.assertEqual(packet[IP].dst, server.ip4)
2682 self.assertTrue(packet.haslayer(GRE))
2683 self.assert_packet_checksums_valid(packet)
2685 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2689 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2690 IP(src=server.ip4, dst=host_nat_ip) /
2692 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2693 TCP(sport=1234, dport=1234))
2694 self.pg0.add_stream(p)
2695 self.pg_enable_capture(self.pg_interfaces)
2697 p = self.pg0.get_capture(1)
2700 self.assertEqual(packet[IP].src, server_nat_ip)
2701 self.assertEqual(packet[IP].dst, host.ip4)
2702 self.assertTrue(packet.haslayer(GRE))
2703 self.assert_packet_checksums_valid(packet)
2705 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2708 def test_output_feature(self):
2709 """ NAT44 interface output feature (in2out postrouting) """
2710 self.nat44_add_address(self.nat_addr)
2711 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2712 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2713 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2717 pkts = self.create_stream_in(self.pg0, self.pg3)
2718 self.pg0.add_stream(pkts)
2719 self.pg_enable_capture(self.pg_interfaces)
2721 capture = self.pg3.get_capture(len(pkts))
2722 self.verify_capture_out(capture)
2725 pkts = self.create_stream_out(self.pg3)
2726 self.pg3.add_stream(pkts)
2727 self.pg_enable_capture(self.pg_interfaces)
2729 capture = self.pg0.get_capture(len(pkts))
2730 self.verify_capture_in(capture, self.pg0)
2732 # from non-NAT interface to NAT inside interface
2733 pkts = self.create_stream_in(self.pg2, self.pg0)
2734 self.pg2.add_stream(pkts)
2735 self.pg_enable_capture(self.pg_interfaces)
2737 capture = self.pg0.get_capture(len(pkts))
2738 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2740 def test_output_feature_vrf_aware(self):
2741 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2742 nat_ip_vrf10 = "10.0.0.10"
2743 nat_ip_vrf20 = "10.0.0.20"
2745 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2746 dst_address_length=32,
2747 next_hop_address=self.pg3.remote_ip4n,
2748 next_hop_sw_if_index=self.pg3.sw_if_index,
2750 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2751 dst_address_length=32,
2752 next_hop_address=self.pg3.remote_ip4n,
2753 next_hop_sw_if_index=self.pg3.sw_if_index,
2756 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2757 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2758 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2759 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2760 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2764 pkts = self.create_stream_in(self.pg4, self.pg3)
2765 self.pg4.add_stream(pkts)
2766 self.pg_enable_capture(self.pg_interfaces)
2768 capture = self.pg3.get_capture(len(pkts))
2769 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2772 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2773 self.pg3.add_stream(pkts)
2774 self.pg_enable_capture(self.pg_interfaces)
2776 capture = self.pg4.get_capture(len(pkts))
2777 self.verify_capture_in(capture, self.pg4)
2780 pkts = self.create_stream_in(self.pg6, self.pg3)
2781 self.pg6.add_stream(pkts)
2782 self.pg_enable_capture(self.pg_interfaces)
2784 capture = self.pg3.get_capture(len(pkts))
2785 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2788 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2789 self.pg3.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg6.get_capture(len(pkts))
2793 self.verify_capture_in(capture, self.pg6)
2795 def test_output_feature_hairpinning(self):
2796 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2797 host = self.pg0.remote_hosts[0]
2798 server = self.pg0.remote_hosts[1]
2801 server_in_port = 5678
2802 server_out_port = 8765
2804 self.nat44_add_address(self.nat_addr)
2805 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2806 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2809 # add static mapping for server
2810 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2811 server_in_port, server_out_port,
2812 proto=IP_PROTOS.tcp)
2814 # send packet from host to server
2815 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2816 IP(src=host.ip4, dst=self.nat_addr) /
2817 TCP(sport=host_in_port, dport=server_out_port))
2818 self.pg0.add_stream(p)
2819 self.pg_enable_capture(self.pg_interfaces)
2821 capture = self.pg0.get_capture(1)
2826 self.assertEqual(ip.src, self.nat_addr)
2827 self.assertEqual(ip.dst, server.ip4)
2828 self.assertNotEqual(tcp.sport, host_in_port)
2829 self.assertEqual(tcp.dport, server_in_port)
2830 self.assert_packet_checksums_valid(p)
2831 host_out_port = tcp.sport
2833 self.logger.error(ppp("Unexpected or invalid packet:", p))
2836 # send reply from server to host
2837 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2838 IP(src=server.ip4, dst=self.nat_addr) /
2839 TCP(sport=server_in_port, dport=host_out_port))
2840 self.pg0.add_stream(p)
2841 self.pg_enable_capture(self.pg_interfaces)
2843 capture = self.pg0.get_capture(1)
2848 self.assertEqual(ip.src, self.nat_addr)
2849 self.assertEqual(ip.dst, host.ip4)
2850 self.assertEqual(tcp.sport, server_out_port)
2851 self.assertEqual(tcp.dport, host_in_port)
2852 self.assert_packet_checksums_valid(p)
2854 self.logger.error(ppp("Unexpected or invalid packet:", p))
2857 def test_one_armed_nat44(self):
2858 """ One armed NAT44 """
2859 remote_host = self.pg9.remote_hosts[0]
2860 local_host = self.pg9.remote_hosts[1]
2863 self.nat44_add_address(self.nat_addr)
2864 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2865 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2869 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2870 IP(src=local_host.ip4, dst=remote_host.ip4) /
2871 TCP(sport=12345, dport=80))
2872 self.pg9.add_stream(p)
2873 self.pg_enable_capture(self.pg_interfaces)
2875 capture = self.pg9.get_capture(1)
2880 self.assertEqual(ip.src, self.nat_addr)
2881 self.assertEqual(ip.dst, remote_host.ip4)
2882 self.assertNotEqual(tcp.sport, 12345)
2883 external_port = tcp.sport
2884 self.assertEqual(tcp.dport, 80)
2885 self.assert_packet_checksums_valid(p)
2887 self.logger.error(ppp("Unexpected or invalid packet:", p))
2891 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2892 IP(src=remote_host.ip4, dst=self.nat_addr) /
2893 TCP(sport=80, dport=external_port))
2894 self.pg9.add_stream(p)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg9.get_capture(1)
2902 self.assertEqual(ip.src, remote_host.ip4)
2903 self.assertEqual(ip.dst, local_host.ip4)
2904 self.assertEqual(tcp.sport, 80)
2905 self.assertEqual(tcp.dport, 12345)
2906 self.assert_packet_checksums_valid(p)
2908 self.logger.error(ppp("Unexpected or invalid packet:", p))
2911 def test_del_session(self):
2912 """ Delete NAT44 session """
2913 self.nat44_add_address(self.nat_addr)
2914 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2915 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2918 pkts = self.create_stream_in(self.pg0, self.pg1)
2919 self.pg0.add_stream(pkts)
2920 self.pg_enable_capture(self.pg_interfaces)
2922 self.pg1.get_capture(len(pkts))
2924 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2925 nsessions = len(sessions)
2927 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2928 sessions[0].inside_port,
2929 sessions[0].protocol)
2930 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2931 sessions[1].outside_port,
2932 sessions[1].protocol,
2935 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2936 self.assertEqual(nsessions - len(sessions), 2)
2938 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2939 sessions[0].inside_port,
2940 sessions[0].protocol)
2942 self.verify_no_nat44_user()
2944 def test_set_get_reass(self):
2945 """ NAT44 set/get virtual fragmentation reassembly """
2946 reas_cfg1 = self.vapi.nat_get_reass()
2948 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2949 max_reass=reas_cfg1.ip4_max_reass * 2,
2950 max_frag=reas_cfg1.ip4_max_frag * 2)
2952 reas_cfg2 = self.vapi.nat_get_reass()
2954 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2955 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2956 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2958 self.vapi.nat_set_reass(drop_frag=1)
2959 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2961 def test_frag_in_order(self):
2962 """ NAT44 translate fragments arriving in order """
2963 self.nat44_add_address(self.nat_addr)
2964 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2965 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2968 data = "A" * 4 + "B" * 16 + "C" * 3
2969 self.tcp_port_in = random.randint(1025, 65535)
2971 reass = self.vapi.nat_reass_dump()
2972 reass_n_start = len(reass)
2975 pkts = self.create_stream_frag(self.pg0,
2976 self.pg1.remote_ip4,
2980 self.pg0.add_stream(pkts)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 frags = self.pg1.get_capture(len(pkts))
2984 p = self.reass_frags_and_verify(frags,
2986 self.pg1.remote_ip4)
2987 self.assertEqual(p[TCP].dport, 20)
2988 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2989 self.tcp_port_out = p[TCP].sport
2990 self.assertEqual(data, p[Raw].load)
2993 pkts = self.create_stream_frag(self.pg1,
2998 self.pg1.add_stream(pkts)
2999 self.pg_enable_capture(self.pg_interfaces)
3001 frags = self.pg0.get_capture(len(pkts))
3002 p = self.reass_frags_and_verify(frags,
3003 self.pg1.remote_ip4,
3004 self.pg0.remote_ip4)
3005 self.assertEqual(p[TCP].sport, 20)
3006 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3007 self.assertEqual(data, p[Raw].load)
3009 reass = self.vapi.nat_reass_dump()
3010 reass_n_end = len(reass)
3012 self.assertEqual(reass_n_end - reass_n_start, 2)
3014 def test_reass_hairpinning(self):
3015 """ NAT44 fragments hairpinning """
3016 server = self.pg0.remote_hosts[1]
3017 host_in_port = random.randint(1025, 65535)
3018 server_in_port = random.randint(1025, 65535)
3019 server_out_port = random.randint(1025, 65535)
3020 data = "A" * 4 + "B" * 16 + "C" * 3
3022 self.nat44_add_address(self.nat_addr)
3023 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3024 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3026 # add static mapping for server
3027 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3028 server_in_port, server_out_port,
3029 proto=IP_PROTOS.tcp)
3031 # send packet from host to server
3032 pkts = self.create_stream_frag(self.pg0,
3037 self.pg0.add_stream(pkts)
3038 self.pg_enable_capture(self.pg_interfaces)
3040 frags = self.pg0.get_capture(len(pkts))
3041 p = self.reass_frags_and_verify(frags,
3044 self.assertNotEqual(p[TCP].sport, host_in_port)
3045 self.assertEqual(p[TCP].dport, server_in_port)
3046 self.assertEqual(data, p[Raw].load)
3048 def test_frag_out_of_order(self):
3049 """ NAT44 translate fragments arriving out of order """
3050 self.nat44_add_address(self.nat_addr)
3051 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3052 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3055 data = "A" * 4 + "B" * 16 + "C" * 3
3056 random.randint(1025, 65535)
3059 pkts = self.create_stream_frag(self.pg0,
3060 self.pg1.remote_ip4,
3065 self.pg0.add_stream(pkts)
3066 self.pg_enable_capture(self.pg_interfaces)
3068 frags = self.pg1.get_capture(len(pkts))
3069 p = self.reass_frags_and_verify(frags,
3071 self.pg1.remote_ip4)
3072 self.assertEqual(p[TCP].dport, 20)
3073 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3074 self.tcp_port_out = p[TCP].sport
3075 self.assertEqual(data, p[Raw].load)
3078 pkts = self.create_stream_frag(self.pg1,
3084 self.pg1.add_stream(pkts)
3085 self.pg_enable_capture(self.pg_interfaces)
3087 frags = self.pg0.get_capture(len(pkts))
3088 p = self.reass_frags_and_verify(frags,
3089 self.pg1.remote_ip4,
3090 self.pg0.remote_ip4)
3091 self.assertEqual(p[TCP].sport, 20)
3092 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3093 self.assertEqual(data, p[Raw].load)
3095 def test_port_restricted(self):
3096 """ Port restricted NAT44 (MAP-E CE) """
3097 self.nat44_add_address(self.nat_addr)
3098 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3099 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3101 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3102 "psid-offset 6 psid-len 6")
3104 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3105 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3106 TCP(sport=4567, dport=22))
3107 self.pg0.add_stream(p)
3108 self.pg_enable_capture(self.pg_interfaces)
3110 capture = self.pg1.get_capture(1)
3115 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3116 self.assertEqual(ip.src, self.nat_addr)
3117 self.assertEqual(tcp.dport, 22)
3118 self.assertNotEqual(tcp.sport, 4567)
3119 self.assertEqual((tcp.sport >> 6) & 63, 10)
3120 self.assert_packet_checksums_valid(p)
3122 self.logger.error(ppp("Unexpected or invalid packet:", p))
3125 def test_ipfix_max_frags(self):
3126 """ IPFIX logging maximum fragments pending reassembly exceeded """
3127 self.nat44_add_address(self.nat_addr)
3128 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3129 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3131 self.vapi.nat_set_reass(max_frag=0)
3132 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3133 src_address=self.pg3.local_ip4n,
3135 template_interval=10)
3136 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3137 src_port=self.ipfix_src_port)
3139 data = "A" * 4 + "B" * 16 + "C" * 3
3140 self.tcp_port_in = random.randint(1025, 65535)
3141 pkts = self.create_stream_frag(self.pg0,
3142 self.pg1.remote_ip4,
3146 self.pg0.add_stream(pkts[-1])
3147 self.pg_enable_capture(self.pg_interfaces)
3149 self.pg1.assert_nothing_captured()
3151 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3152 capture = self.pg3.get_capture(9)
3153 ipfix = IPFIXDecoder()
3154 # first load template
3156 self.assertTrue(p.haslayer(IPFIX))
3157 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3158 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3159 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3160 self.assertEqual(p[UDP].dport, 4739)
3161 self.assertEqual(p[IPFIX].observationDomainID,
3162 self.ipfix_domain_id)
3163 if p.haslayer(Template):
3164 ipfix.add_template(p.getlayer(Template))
3165 # verify events in data set
3167 if p.haslayer(Data):
3168 data = ipfix.decode_data_set(p.getlayer(Set))
3169 self.verify_ipfix_max_fragments_ip4(data, 0,
3170 self.pg0.remote_ip4n)
3172 def test_multiple_outside_vrf(self):
3173 """ Multiple outside VRF """
3177 self.pg1.unconfig_ip4()
3178 self.pg2.unconfig_ip4()
3179 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3180 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3181 self.pg1.set_table_ip4(vrf_id1)
3182 self.pg2.set_table_ip4(vrf_id2)
3183 self.pg1.config_ip4()
3184 self.pg2.config_ip4()
3185 self.pg1.resolve_arp()
3186 self.pg2.resolve_arp()
3188 self.nat44_add_address(self.nat_addr)
3189 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3190 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3192 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3197 pkts = self.create_stream_in(self.pg0, self.pg1)
3198 self.pg0.add_stream(pkts)
3199 self.pg_enable_capture(self.pg_interfaces)
3201 capture = self.pg1.get_capture(len(pkts))
3202 self.verify_capture_out(capture, self.nat_addr)
3204 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3205 self.pg1.add_stream(pkts)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg0.get_capture(len(pkts))
3209 self.verify_capture_in(capture, self.pg0)
3211 self.tcp_port_in = 60303
3212 self.udp_port_in = 60304
3213 self.icmp_id_in = 60305
3216 pkts = self.create_stream_in(self.pg0, self.pg2)
3217 self.pg0.add_stream(pkts)
3218 self.pg_enable_capture(self.pg_interfaces)
3220 capture = self.pg2.get_capture(len(pkts))
3221 self.verify_capture_out(capture, self.nat_addr)
3223 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3224 self.pg2.add_stream(pkts)
3225 self.pg_enable_capture(self.pg_interfaces)
3227 capture = self.pg0.get_capture(len(pkts))
3228 self.verify_capture_in(capture, self.pg0)
3231 self.pg1.unconfig_ip4()
3232 self.pg2.unconfig_ip4()
3233 self.pg1.set_table_ip4(0)
3234 self.pg2.set_table_ip4(0)
3235 self.pg1.config_ip4()
3236 self.pg2.config_ip4()
3237 self.pg1.resolve_arp()
3238 self.pg2.resolve_arp()
3240 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3241 def test_session_timeout(self):
3242 """ NAT44 session timeouts """
3243 self.nat44_add_address(self.nat_addr)
3244 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3245 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3247 self.vapi.nat_set_timeouts(udp=5)
3251 for i in range(0, max_sessions):
3252 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3253 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3254 IP(src=src, dst=self.pg1.remote_ip4) /
3255 UDP(sport=1025, dport=53))
3257 self.pg0.add_stream(pkts)
3258 self.pg_enable_capture(self.pg_interfaces)
3260 self.pg1.get_capture(max_sessions)
3265 for i in range(0, max_sessions):
3266 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3267 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3268 IP(src=src, dst=self.pg1.remote_ip4) /
3269 UDP(sport=1026, dport=53))
3271 self.pg0.add_stream(pkts)
3272 self.pg_enable_capture(self.pg_interfaces)
3274 self.pg1.get_capture(max_sessions)
3277 users = self.vapi.nat44_user_dump()
3279 nsessions = nsessions + user.nsessions
3280 self.assertLess(nsessions, 2 * max_sessions)
3283 super(TestNAT44, self).tearDown()
3284 if not self.vpp_dead:
3285 self.logger.info(self.vapi.cli("show nat44 addresses"))
3286 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3287 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3288 self.logger.info(self.vapi.cli("show nat44 interface address"))
3289 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3290 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3291 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3292 self.logger.info(self.vapi.cli("show nat timeouts"))
3293 self.vapi.cli("nat addr-port-assignment-alg default")
3295 self.vapi.cli("clear logging")
3298 class TestNAT44EndpointDependent(MethodHolder):
3299 """ Endpoint-Dependent mapping and filtering test cases """
3302 def setUpConstants(cls):
3303 super(TestNAT44EndpointDependent, cls).setUpConstants()
3304 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3307 def setUpClass(cls):
3308 super(TestNAT44EndpointDependent, cls).setUpClass()
3309 cls.vapi.cli("set log class nat level debug")
3311 cls.tcp_port_in = 6303
3312 cls.tcp_port_out = 6303
3313 cls.udp_port_in = 6304
3314 cls.udp_port_out = 6304
3315 cls.icmp_id_in = 6305
3316 cls.icmp_id_out = 6305
3317 cls.nat_addr = '10.0.0.3'
3318 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3319 cls.ipfix_src_port = 4739
3320 cls.ipfix_domain_id = 1
3321 cls.tcp_external_port = 80
3323 cls.create_pg_interfaces(range(7))
3324 cls.interfaces = list(cls.pg_interfaces[0:3])
3326 for i in cls.interfaces:
3331 cls.pg0.generate_remote_hosts(3)
3332 cls.pg0.configure_ipv4_neighbors()
3336 cls.pg4.generate_remote_hosts(2)
3337 cls.pg4.config_ip4()
3338 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3339 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3343 cls.pg4.resolve_arp()
3344 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3345 cls.pg4.resolve_arp()
3347 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3348 cls.vapi.ip_table_add_del(1, is_add=1)
3350 cls.pg5._local_ip4 = "10.1.1.1"
3351 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3353 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3354 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3355 socket.AF_INET, cls.pg5.remote_ip4)
3356 cls.pg5.set_table_ip4(1)
3357 cls.pg5.config_ip4()
3359 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3360 dst_address_length=32,
3362 next_hop_sw_if_index=cls.pg5.sw_if_index,
3363 next_hop_address=zero_ip4n)
3365 cls.pg6._local_ip4 = "10.1.2.1"
3366 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3368 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3369 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3370 socket.AF_INET, cls.pg6.remote_ip4)
3371 cls.pg6.set_table_ip4(1)
3372 cls.pg6.config_ip4()
3374 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3375 dst_address_length=32,
3377 next_hop_sw_if_index=cls.pg6.sw_if_index,
3378 next_hop_address=zero_ip4n)
3380 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3381 dst_address_length=16,
3382 next_hop_address=zero_ip4n,
3384 next_hop_table_id=1)
3385 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3386 dst_address_length=0,
3387 next_hop_address=zero_ip4n,
3389 next_hop_table_id=0)
3390 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3391 dst_address_length=0,
3393 next_hop_sw_if_index=cls.pg1.sw_if_index,
3394 next_hop_address=cls.pg1.local_ip4n)
3396 cls.pg5.resolve_arp()
3397 cls.pg6.resolve_arp()
3400 super(TestNAT44EndpointDependent, cls).tearDownClass()
3403 def test_dynamic(self):
3404 """ NAT44 dynamic translation test """
3406 self.nat44_add_address(self.nat_addr)
3407 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3408 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3411 nat_config = self.vapi.nat_show_config()
3412 self.assertEqual(1, nat_config.endpoint_dependent)
3415 pkts = self.create_stream_in(self.pg0, self.pg1)
3416 self.pg0.add_stream(pkts)
3417 self.pg_enable_capture(self.pg_interfaces)
3419 capture = self.pg1.get_capture(len(pkts))
3420 self.verify_capture_out(capture)
3423 pkts = self.create_stream_out(self.pg1)
3424 self.pg1.add_stream(pkts)
3425 self.pg_enable_capture(self.pg_interfaces)
3427 capture = self.pg0.get_capture(len(pkts))
3428 self.verify_capture_in(capture, self.pg0)
3430 def test_forwarding(self):
3431 """ NAT44 forwarding test """
3433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3436 self.vapi.nat44_forwarding_enable_disable(1)
3438 real_ip = self.pg0.remote_ip4n
3439 alias_ip = self.nat_addr_n
3440 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3441 external_ip=alias_ip)
3444 # in2out - static mapping match
3446 pkts = self.create_stream_out(self.pg1)
3447 self.pg1.add_stream(pkts)
3448 self.pg_enable_capture(self.pg_interfaces)
3450 capture = self.pg0.get_capture(len(pkts))
3451 self.verify_capture_in(capture, self.pg0)
3453 pkts = self.create_stream_in(self.pg0, self.pg1)
3454 self.pg0.add_stream(pkts)
3455 self.pg_enable_capture(self.pg_interfaces)
3457 capture = self.pg1.get_capture(len(pkts))
3458 self.verify_capture_out(capture, same_port=True)
3460 # in2out - no static mapping match
3462 host0 = self.pg0.remote_hosts[0]
3463 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3465 pkts = self.create_stream_out(self.pg1,
3466 dst_ip=self.pg0.remote_ip4,
3467 use_inside_ports=True)
3468 self.pg1.add_stream(pkts)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(len(pkts))
3472 self.verify_capture_in(capture, self.pg0)
3474 pkts = self.create_stream_in(self.pg0, self.pg1)
3475 self.pg0.add_stream(pkts)
3476 self.pg_enable_capture(self.pg_interfaces)
3478 capture = self.pg1.get_capture(len(pkts))
3479 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3482 self.pg0.remote_hosts[0] = host0
3484 user = self.pg0.remote_hosts[1]
3485 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3486 self.assertEqual(len(sessions), 3)
3487 self.assertTrue(sessions[0].ext_host_valid)
3488 self.vapi.nat44_del_session(
3489 sessions[0].inside_ip_address,
3490 sessions[0].inside_port,
3491 sessions[0].protocol,
3492 ext_host_address=sessions[0].ext_host_address,
3493 ext_host_port=sessions[0].ext_host_port)
3494 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3495 self.assertEqual(len(sessions), 2)
3498 self.vapi.nat44_forwarding_enable_disable(0)
3499 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3500 external_ip=alias_ip,
3503 def test_static_lb(self):
3504 """ NAT44 local service load balancing """
3505 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3508 server1 = self.pg0.remote_hosts[0]
3509 server2 = self.pg0.remote_hosts[1]
3511 locals = [{'addr': server1.ip4n,
3515 {'addr': server2.ip4n,
3520 self.nat44_add_address(self.nat_addr)
3521 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3524 local_num=len(locals),
3526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3527 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3530 # from client to service
3531 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3532 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3533 TCP(sport=12345, dport=external_port))
3534 self.pg1.add_stream(p)
3535 self.pg_enable_capture(self.pg_interfaces)
3537 capture = self.pg0.get_capture(1)
3543 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3544 if ip.dst == server1.ip4:
3548 self.assertEqual(tcp.dport, local_port)
3549 self.assert_packet_checksums_valid(p)
3551 self.logger.error(ppp("Unexpected or invalid packet:", p))
3554 # from service back to client
3555 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3556 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3557 TCP(sport=local_port, dport=12345))
3558 self.pg0.add_stream(p)
3559 self.pg_enable_capture(self.pg_interfaces)
3561 capture = self.pg1.get_capture(1)
3566 self.assertEqual(ip.src, self.nat_addr)
3567 self.assertEqual(tcp.sport, external_port)
3568 self.assert_packet_checksums_valid(p)
3570 self.logger.error(ppp("Unexpected or invalid packet:", p))
3573 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3574 self.assertEqual(len(sessions), 1)
3575 self.assertTrue(sessions[0].ext_host_valid)
3576 self.vapi.nat44_del_session(
3577 sessions[0].inside_ip_address,
3578 sessions[0].inside_port,
3579 sessions[0].protocol,
3580 ext_host_address=sessions[0].ext_host_address,
3581 ext_host_port=sessions[0].ext_host_port)
3582 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3583 self.assertEqual(len(sessions), 0)
3585 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3586 def test_static_lb_multi_clients(self):
3587 """ NAT44 local service load balancing - multiple clients"""
3589 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3592 server1 = self.pg0.remote_hosts[0]
3593 server2 = self.pg0.remote_hosts[1]
3595 locals = [{'addr': server1.ip4n,
3599 {'addr': server2.ip4n,
3604 self.nat44_add_address(self.nat_addr)
3605 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3608 local_num=len(locals),
3610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3611 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3616 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3618 for client in clients:
3619 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3620 IP(src=client, dst=self.nat_addr) /
3621 TCP(sport=12345, dport=external_port))
3623 self.pg1.add_stream(pkts)
3624 self.pg_enable_capture(self.pg_interfaces)
3626 capture = self.pg0.get_capture(len(pkts))
3628 if p[IP].dst == server1.ip4:
3632 self.assertTrue(server1_n > server2_n)
3634 def test_static_lb_2(self):
3635 """ NAT44 local service load balancing (asymmetrical rule) """
3636 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3639 server1 = self.pg0.remote_hosts[0]
3640 server2 = self.pg0.remote_hosts[1]
3642 locals = [{'addr': server1.ip4n,
3646 {'addr': server2.ip4n,
3651 self.vapi.nat44_forwarding_enable_disable(1)
3652 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3656 local_num=len(locals),
3658 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3659 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3662 # from client to service
3663 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3664 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3665 TCP(sport=12345, dport=external_port))
3666 self.pg1.add_stream(p)
3667 self.pg_enable_capture(self.pg_interfaces)
3669 capture = self.pg0.get_capture(1)
3675 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3676 if ip.dst == server1.ip4:
3680 self.assertEqual(tcp.dport, local_port)
3681 self.assert_packet_checksums_valid(p)
3683 self.logger.error(ppp("Unexpected or invalid packet:", p))
3686 # from service back to client
3687 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3688 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3689 TCP(sport=local_port, dport=12345))
3690 self.pg0.add_stream(p)
3691 self.pg_enable_capture(self.pg_interfaces)
3693 capture = self.pg1.get_capture(1)
3698 self.assertEqual(ip.src, self.nat_addr)
3699 self.assertEqual(tcp.sport, external_port)
3700 self.assert_packet_checksums_valid(p)
3702 self.logger.error(ppp("Unexpected or invalid packet:", p))
3705 # from client to server (no translation)
3706 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3707 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3708 TCP(sport=12346, dport=local_port))
3709 self.pg1.add_stream(p)
3710 self.pg_enable_capture(self.pg_interfaces)
3712 capture = self.pg0.get_capture(1)
3718 self.assertEqual(ip.dst, server1.ip4)
3719 self.assertEqual(tcp.dport, local_port)
3720 self.assert_packet_checksums_valid(p)
3722 self.logger.error(ppp("Unexpected or invalid packet:", p))
3725 # from service back to client (no translation)
3726 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3727 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3728 TCP(sport=local_port, dport=12346))
3729 self.pg0.add_stream(p)
3730 self.pg_enable_capture(self.pg_interfaces)
3732 capture = self.pg1.get_capture(1)
3737 self.assertEqual(ip.src, server1.ip4)
3738 self.assertEqual(tcp.sport, local_port)
3739 self.assert_packet_checksums_valid(p)
3741 self.logger.error(ppp("Unexpected or invalid packet:", p))
3744 def test_unknown_proto(self):
3745 """ NAT44 translate packet with unknown protocol """
3746 self.nat44_add_address(self.nat_addr)
3747 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3748 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3752 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3753 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3754 TCP(sport=self.tcp_port_in, dport=20))
3755 self.pg0.add_stream(p)
3756 self.pg_enable_capture(self.pg_interfaces)
3758 p = self.pg1.get_capture(1)
3760 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3761 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3763 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3764 TCP(sport=1234, dport=1234))
3765 self.pg0.add_stream(p)
3766 self.pg_enable_capture(self.pg_interfaces)
3768 p = self.pg1.get_capture(1)
3771 self.assertEqual(packet[IP].src, self.nat_addr)
3772 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3773 self.assertTrue(packet.haslayer(GRE))
3774 self.assert_packet_checksums_valid(packet)
3776 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3780 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3781 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3783 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3784 TCP(sport=1234, dport=1234))
3785 self.pg1.add_stream(p)
3786 self.pg_enable_capture(self.pg_interfaces)
3788 p = self.pg0.get_capture(1)
3791 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3792 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3793 self.assertTrue(packet.haslayer(GRE))
3794 self.assert_packet_checksums_valid(packet)
3796 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3799 def test_hairpinning_unknown_proto(self):
3800 """ NAT44 translate packet with unknown protocol - hairpinning """
3801 host = self.pg0.remote_hosts[0]
3802 server = self.pg0.remote_hosts[1]
3804 server_out_port = 8765
3805 server_nat_ip = "10.0.0.11"
3807 self.nat44_add_address(self.nat_addr)
3808 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3809 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3812 # add static mapping for server
3813 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3816 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3817 IP(src=host.ip4, dst=server_nat_ip) /
3818 TCP(sport=host_in_port, dport=server_out_port))
3819 self.pg0.add_stream(p)
3820 self.pg_enable_capture(self.pg_interfaces)
3822 self.pg0.get_capture(1)
3824 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3825 IP(src=host.ip4, dst=server_nat_ip) /
3827 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3828 TCP(sport=1234, dport=1234))
3829 self.pg0.add_stream(p)
3830 self.pg_enable_capture(self.pg_interfaces)
3832 p = self.pg0.get_capture(1)
3835 self.assertEqual(packet[IP].src, self.nat_addr)
3836 self.assertEqual(packet[IP].dst, server.ip4)
3837 self.assertTrue(packet.haslayer(GRE))
3838 self.assert_packet_checksums_valid(packet)
3840 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3844 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3845 IP(src=server.ip4, dst=self.nat_addr) /
3847 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3848 TCP(sport=1234, dport=1234))
3849 self.pg0.add_stream(p)
3850 self.pg_enable_capture(self.pg_interfaces)
3852 p = self.pg0.get_capture(1)
3855 self.assertEqual(packet[IP].src, server_nat_ip)
3856 self.assertEqual(packet[IP].dst, host.ip4)
3857 self.assertTrue(packet.haslayer(GRE))
3858 self.assert_packet_checksums_valid(packet)
3860 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3863 def test_output_feature_and_service(self):
3864 """ NAT44 interface output feature and services """
3865 external_addr = '1.2.3.4'
3869 self.vapi.nat44_forwarding_enable_disable(1)
3870 self.nat44_add_address(self.nat_addr)
3871 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3872 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3873 local_port, external_port,
3874 proto=IP_PROTOS.tcp, out2in_only=1)
3875 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3878 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3881 # from client to service
3882 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3883 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3884 TCP(sport=12345, dport=external_port))
3885 self.pg1.add_stream(p)
3886 self.pg_enable_capture(self.pg_interfaces)
3888 capture = self.pg0.get_capture(1)
3893 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3894 self.assertEqual(tcp.dport, local_port)
3895 self.assert_packet_checksums_valid(p)
3897 self.logger.error(ppp("Unexpected or invalid packet:", p))
3900 # from service back to client
3901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3902 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3903 TCP(sport=local_port, dport=12345))
3904 self.pg0.add_stream(p)
3905 self.pg_enable_capture(self.pg_interfaces)
3907 capture = self.pg1.get_capture(1)
3912 self.assertEqual(ip.src, external_addr)
3913 self.assertEqual(tcp.sport, external_port)
3914 self.assert_packet_checksums_valid(p)
3916 self.logger.error(ppp("Unexpected or invalid packet:", p))
3919 # from local network host to external network
3920 pkts = self.create_stream_in(self.pg0, self.pg1)
3921 self.pg0.add_stream(pkts)
3922 self.pg_enable_capture(self.pg_interfaces)
3924 capture = self.pg1.get_capture(len(pkts))
3925 self.verify_capture_out(capture)
3926 pkts = self.create_stream_in(self.pg0, self.pg1)
3927 self.pg0.add_stream(pkts)
3928 self.pg_enable_capture(self.pg_interfaces)
3930 capture = self.pg1.get_capture(len(pkts))
3931 self.verify_capture_out(capture)
3933 # from external network back to local network host
3934 pkts = self.create_stream_out(self.pg1)
3935 self.pg1.add_stream(pkts)
3936 self.pg_enable_capture(self.pg_interfaces)
3938 capture = self.pg0.get_capture(len(pkts))
3939 self.verify_capture_in(capture, self.pg0)
3941 def test_output_feature_and_service2(self):
3942 """ NAT44 interface output feature and service host direct access """
3943 self.vapi.nat44_forwarding_enable_disable(1)
3944 self.nat44_add_address(self.nat_addr)
3945 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3948 # session initiaded from service host - translate
3949 pkts = self.create_stream_in(self.pg0, self.pg1)
3950 self.pg0.add_stream(pkts)
3951 self.pg_enable_capture(self.pg_interfaces)
3953 capture = self.pg1.get_capture(len(pkts))
3954 self.verify_capture_out(capture)
3956 pkts = self.create_stream_out(self.pg1)
3957 self.pg1.add_stream(pkts)
3958 self.pg_enable_capture(self.pg_interfaces)
3960 capture = self.pg0.get_capture(len(pkts))
3961 self.verify_capture_in(capture, self.pg0)
3963 # session initiaded from remote host - do not translate
3964 self.tcp_port_in = 60303
3965 self.udp_port_in = 60304
3966 self.icmp_id_in = 60305
3967 pkts = self.create_stream_out(self.pg1,
3968 self.pg0.remote_ip4,
3969 use_inside_ports=True)
3970 self.pg1.add_stream(pkts)
3971 self.pg_enable_capture(self.pg_interfaces)
3973 capture = self.pg0.get_capture(len(pkts))
3974 self.verify_capture_in(capture, self.pg0)
3976 pkts = self.create_stream_in(self.pg0, self.pg1)
3977 self.pg0.add_stream(pkts)
3978 self.pg_enable_capture(self.pg_interfaces)
3980 capture = self.pg1.get_capture(len(pkts))
3981 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3984 def test_output_feature_and_service3(self):
3985 """ NAT44 interface output feature and DST NAT """
3986 external_addr = '1.2.3.4'
3990 self.vapi.nat44_forwarding_enable_disable(1)
3991 self.nat44_add_address(self.nat_addr)
3992 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3993 local_port, external_port,
3994 proto=IP_PROTOS.tcp, out2in_only=1)
3995 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3996 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3998 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4001 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4002 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4003 TCP(sport=12345, dport=external_port))
4004 self.pg0.add_stream(p)
4005 self.pg_enable_capture(self.pg_interfaces)
4007 capture = self.pg1.get_capture(1)
4012 self.assertEqual(ip.src, self.pg0.remote_ip4)
4013 self.assertEqual(tcp.sport, 12345)
4014 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4015 self.assertEqual(tcp.dport, local_port)
4016 self.assert_packet_checksums_valid(p)
4018 self.logger.error(ppp("Unexpected or invalid packet:", p))
4021 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4022 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4023 TCP(sport=local_port, dport=12345))
4024 self.pg1.add_stream(p)
4025 self.pg_enable_capture(self.pg_interfaces)
4027 capture = self.pg0.get_capture(1)
4032 self.assertEqual(ip.src, external_addr)
4033 self.assertEqual(tcp.sport, external_port)
4034 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4035 self.assertEqual(tcp.dport, 12345)
4036 self.assert_packet_checksums_valid(p)
4038 self.logger.error(ppp("Unexpected or invalid packet:", p))
4041 def test_next_src_nat(self):
4042 """ On way back forward packet to nat44-in2out node. """
4043 twice_nat_addr = '10.0.1.3'
4046 post_twice_nat_port = 0
4048 self.vapi.nat44_forwarding_enable_disable(1)
4049 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4050 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4051 local_port, external_port,
4052 proto=IP_PROTOS.tcp, out2in_only=1,
4053 self_twice_nat=1, vrf_id=1)
4054 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4057 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4058 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4059 TCP(sport=12345, dport=external_port))
4060 self.pg6.add_stream(p)
4061 self.pg_enable_capture(self.pg_interfaces)
4063 capture = self.pg6.get_capture(1)
4068 self.assertEqual(ip.src, twice_nat_addr)
4069 self.assertNotEqual(tcp.sport, 12345)
4070 post_twice_nat_port = tcp.sport
4071 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4072 self.assertEqual(tcp.dport, local_port)
4073 self.assert_packet_checksums_valid(p)
4075 self.logger.error(ppp("Unexpected or invalid packet:", p))
4078 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4079 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4080 TCP(sport=local_port, dport=post_twice_nat_port))
4081 self.pg6.add_stream(p)
4082 self.pg_enable_capture(self.pg_interfaces)
4084 capture = self.pg6.get_capture(1)
4089 self.assertEqual(ip.src, self.pg1.remote_ip4)
4090 self.assertEqual(tcp.sport, external_port)
4091 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4092 self.assertEqual(tcp.dport, 12345)
4093 self.assert_packet_checksums_valid(p)
4095 self.logger.error(ppp("Unexpected or invalid packet:", p))
4098 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4100 twice_nat_addr = '10.0.1.3'
4108 port_in1 = port_in+1
4109 port_in2 = port_in+2
4114 server1 = self.pg0.remote_hosts[0]
4115 server2 = self.pg0.remote_hosts[1]
4127 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4130 self.nat44_add_address(self.nat_addr)
4131 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4133 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4135 proto=IP_PROTOS.tcp,
4136 twice_nat=int(not self_twice_nat),
4137 self_twice_nat=int(self_twice_nat))
4139 locals = [{'addr': server1.ip4n,
4143 {'addr': server2.ip4n,
4147 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4148 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4152 not self_twice_nat),
4155 local_num=len(locals),
4157 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4158 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4165 assert client_id is not None
4167 client = self.pg0.remote_hosts[0]
4168 elif client_id == 2:
4169 client = self.pg0.remote_hosts[1]
4171 client = pg1.remote_hosts[0]
4172 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4173 IP(src=client.ip4, dst=self.nat_addr) /
4174 TCP(sport=eh_port_out, dport=port_out))
4176 self.pg_enable_capture(self.pg_interfaces)
4178 capture = pg0.get_capture(1)
4184 if ip.dst == server1.ip4:
4190 self.assertEqual(ip.dst, server.ip4)
4192 self.assertIn(tcp.dport, [port_in1, port_in2])
4194 self.assertEqual(tcp.dport, port_in)
4196 self.assertEqual(ip.src, twice_nat_addr)
4197 self.assertNotEqual(tcp.sport, eh_port_out)
4199 self.assertEqual(ip.src, client.ip4)
4200 self.assertEqual(tcp.sport, eh_port_out)
4202 eh_port_in = tcp.sport
4203 saved_port_in = tcp.dport
4204 self.assert_packet_checksums_valid(p)
4206 self.logger.error(ppp("Unexpected or invalid packet:", p))
4209 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4210 IP(src=server.ip4, dst=eh_addr_in) /
4211 TCP(sport=saved_port_in, dport=eh_port_in))
4213 self.pg_enable_capture(self.pg_interfaces)
4215 capture = pg1.get_capture(1)
4220 self.assertEqual(ip.dst, client.ip4)
4221 self.assertEqual(ip.src, self.nat_addr)
4222 self.assertEqual(tcp.dport, eh_port_out)
4223 self.assertEqual(tcp.sport, port_out)
4224 self.assert_packet_checksums_valid(p)
4226 self.logger.error(ppp("Unexpected or invalid packet:", p))
4230 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4231 self.assertEqual(len(sessions), 1)
4232 self.assertTrue(sessions[0].ext_host_valid)
4233 self.assertTrue(sessions[0].is_twicenat)
4234 self.vapi.nat44_del_session(
4235 sessions[0].inside_ip_address,
4236 sessions[0].inside_port,
4237 sessions[0].protocol,
4238 ext_host_address=sessions[0].ext_host_nat_address,
4239 ext_host_port=sessions[0].ext_host_nat_port)
4240 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4241 self.assertEqual(len(sessions), 0)
4243 def test_twice_nat(self):
4245 self.twice_nat_common()
4247 def test_self_twice_nat_positive(self):
4248 """ Self Twice NAT44 (positive test) """
4249 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4251 def test_self_twice_nat_negative(self):
4252 """ Self Twice NAT44 (negative test) """
4253 self.twice_nat_common(self_twice_nat=True)
4255 def test_twice_nat_lb(self):
4256 """ Twice NAT44 local service load balancing """
4257 self.twice_nat_common(lb=True)
4259 def test_self_twice_nat_lb_positive(self):
4260 """ Self Twice NAT44 local service load balancing (positive test) """
4261 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4264 def test_self_twice_nat_lb_negative(self):
4265 """ Self Twice NAT44 local service load balancing (negative test) """
4266 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4269 def test_twice_nat_interface_addr(self):
4270 """ Acquire twice NAT44 addresses from interface """
4271 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4273 # no address in NAT pool
4274 adresses = self.vapi.nat44_address_dump()
4275 self.assertEqual(0, len(adresses))
4277 # configure interface address and check NAT address pool
4278 self.pg3.config_ip4()
4279 adresses = self.vapi.nat44_address_dump()
4280 self.assertEqual(1, len(adresses))
4281 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4282 self.assertEqual(adresses[0].twice_nat, 1)
4284 # remove interface address and check NAT address pool
4285 self.pg3.unconfig_ip4()
4286 adresses = self.vapi.nat44_address_dump()
4287 self.assertEqual(0, len(adresses))
4289 def test_tcp_session_close_in(self):
4290 """ Close TCP session from inside network """
4291 self.tcp_port_out = 10505
4292 self.nat44_add_address(self.nat_addr)
4293 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4297 proto=IP_PROTOS.tcp,
4299 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4300 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4303 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4304 start_sessnum = len(sessions)
4306 self.initiate_tcp_session(self.pg0, self.pg1)
4308 # FIN packet in -> out
4309 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4310 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4311 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4312 flags="FA", seq=100, ack=300))
4313 self.pg0.add_stream(p)
4314 self.pg_enable_capture(self.pg_interfaces)
4316 self.pg1.get_capture(1)
4320 # ACK packet out -> in
4321 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4322 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4323 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4324 flags="A", seq=300, ack=101))
4327 # FIN packet out -> in
4328 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4329 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4330 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4331 flags="FA", seq=300, ack=101))
4334 self.pg1.add_stream(pkts)
4335 self.pg_enable_capture(self.pg_interfaces)
4337 self.pg0.get_capture(2)
4339 # ACK packet in -> out
4340 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4341 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4342 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4343 flags="A", seq=101, ack=301))
4344 self.pg0.add_stream(p)
4345 self.pg_enable_capture(self.pg_interfaces)
4347 self.pg1.get_capture(1)
4349 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4351 self.assertEqual(len(sessions) - start_sessnum, 0)
4353 def test_tcp_session_close_out(self):
4354 """ Close TCP session from outside network """
4355 self.tcp_port_out = 10505
4356 self.nat44_add_address(self.nat_addr)
4357 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4361 proto=IP_PROTOS.tcp,
4363 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4364 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4367 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4368 start_sessnum = len(sessions)
4370 self.initiate_tcp_session(self.pg0, self.pg1)
4372 # FIN packet out -> in
4373 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4374 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4375 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4376 flags="FA", seq=100, ack=300))
4377 self.pg1.add_stream(p)
4378 self.pg_enable_capture(self.pg_interfaces)
4380 self.pg0.get_capture(1)
4382 # FIN+ACK packet in -> out
4383 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4384 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4385 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4386 flags="FA", seq=300, ack=101))
4388 self.pg0.add_stream(p)
4389 self.pg_enable_capture(self.pg_interfaces)
4391 self.pg1.get_capture(1)
4393 # ACK packet out -> in
4394 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4395 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4396 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4397 flags="A", seq=101, ack=301))
4398 self.pg1.add_stream(p)
4399 self.pg_enable_capture(self.pg_interfaces)
4401 self.pg0.get_capture(1)
4403 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4405 self.assertEqual(len(sessions) - start_sessnum, 0)
4407 def test_tcp_session_close_simultaneous(self):
4408 """ Close TCP session from inside network """
4409 self.tcp_port_out = 10505
4410 self.nat44_add_address(self.nat_addr)
4411 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4415 proto=IP_PROTOS.tcp,
4417 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4418 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4421 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4422 start_sessnum = len(sessions)
4424 self.initiate_tcp_session(self.pg0, self.pg1)
4426 # FIN packet in -> out
4427 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4428 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4429 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4430 flags="FA", seq=100, ack=300))
4431 self.pg0.add_stream(p)
4432 self.pg_enable_capture(self.pg_interfaces)
4434 self.pg1.get_capture(1)
4436 # FIN packet out -> in
4437 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4438 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4439 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4440 flags="FA", seq=300, ack=100))
4441 self.pg1.add_stream(p)
4442 self.pg_enable_capture(self.pg_interfaces)
4444 self.pg0.get_capture(1)
4446 # ACK packet in -> out
4447 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4448 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4449 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4450 flags="A", seq=101, ack=301))
4451 self.pg0.add_stream(p)
4452 self.pg_enable_capture(self.pg_interfaces)
4454 self.pg1.get_capture(1)
4456 # ACK packet out -> in
4457 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4458 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4459 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4460 flags="A", seq=301, ack=101))
4461 self.pg1.add_stream(p)
4462 self.pg_enable_capture(self.pg_interfaces)
4464 self.pg0.get_capture(1)
4466 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4468 self.assertEqual(len(sessions) - start_sessnum, 0)
4470 def test_one_armed_nat44_static(self):
4471 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4472 remote_host = self.pg4.remote_hosts[0]
4473 local_host = self.pg4.remote_hosts[1]
4478 self.vapi.nat44_forwarding_enable_disable(1)
4479 self.nat44_add_address(self.nat_addr, twice_nat=1)
4480 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4481 local_port, external_port,
4482 proto=IP_PROTOS.tcp, out2in_only=1,
4484 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4485 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4488 # from client to service
4489 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4490 IP(src=remote_host.ip4, dst=self.nat_addr) /
4491 TCP(sport=12345, dport=external_port))
4492 self.pg4.add_stream(p)
4493 self.pg_enable_capture(self.pg_interfaces)
4495 capture = self.pg4.get_capture(1)
4500 self.assertEqual(ip.dst, local_host.ip4)
4501 self.assertEqual(ip.src, self.nat_addr)
4502 self.assertEqual(tcp.dport, local_port)
4503 self.assertNotEqual(tcp.sport, 12345)
4504 eh_port_in = tcp.sport
4505 self.assert_packet_checksums_valid(p)
4507 self.logger.error(ppp("Unexpected or invalid packet:", p))
4510 # from service back to client
4511 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4512 IP(src=local_host.ip4, dst=self.nat_addr) /
4513 TCP(sport=local_port, dport=eh_port_in))
4514 self.pg4.add_stream(p)
4515 self.pg_enable_capture(self.pg_interfaces)
4517 capture = self.pg4.get_capture(1)
4522 self.assertEqual(ip.src, self.nat_addr)
4523 self.assertEqual(ip.dst, remote_host.ip4)
4524 self.assertEqual(tcp.sport, external_port)
4525 self.assertEqual(tcp.dport, 12345)
4526 self.assert_packet_checksums_valid(p)
4528 self.logger.error(ppp("Unexpected or invalid packet:", p))
4531 def test_static_with_port_out2(self):
4532 """ 1:1 NAPT asymmetrical rule """
4537 self.vapi.nat44_forwarding_enable_disable(1)
4538 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4539 local_port, external_port,
4540 proto=IP_PROTOS.tcp, out2in_only=1)
4541 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4542 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4545 # from client to service
4546 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4547 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4548 TCP(sport=12345, dport=external_port))
4549 self.pg1.add_stream(p)
4550 self.pg_enable_capture(self.pg_interfaces)
4552 capture = self.pg0.get_capture(1)
4557 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4558 self.assertEqual(tcp.dport, local_port)
4559 self.assert_packet_checksums_valid(p)
4561 self.logger.error(ppp("Unexpected or invalid packet:", p))
4565 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4566 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4567 ICMP(type=11) / capture[0][IP])
4568 self.pg0.add_stream(p)
4569 self.pg_enable_capture(self.pg_interfaces)
4571 capture = self.pg1.get_capture(1)
4574 self.assertEqual(p[IP].src, self.nat_addr)
4576 self.assertEqual(inner.dst, self.nat_addr)
4577 self.assertEqual(inner[TCPerror].dport, external_port)
4579 self.logger.error(ppp("Unexpected or invalid packet:", p))
4582 # from service back to client
4583 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4584 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4585 TCP(sport=local_port, dport=12345))
4586 self.pg0.add_stream(p)
4587 self.pg_enable_capture(self.pg_interfaces)
4589 capture = self.pg1.get_capture(1)
4594 self.assertEqual(ip.src, self.nat_addr)
4595 self.assertEqual(tcp.sport, external_port)
4596 self.assert_packet_checksums_valid(p)
4598 self.logger.error(ppp("Unexpected or invalid packet:", p))
4602 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4603 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4604 ICMP(type=11) / capture[0][IP])
4605 self.pg1.add_stream(p)
4606 self.pg_enable_capture(self.pg_interfaces)
4608 capture = self.pg0.get_capture(1)
4611 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4613 self.assertEqual(inner.src, self.pg0.remote_ip4)
4614 self.assertEqual(inner[TCPerror].sport, local_port)
4616 self.logger.error(ppp("Unexpected or invalid packet:", p))
4619 # from client to server (no translation)
4620 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4621 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4622 TCP(sport=12346, dport=local_port))
4623 self.pg1.add_stream(p)
4624 self.pg_enable_capture(self.pg_interfaces)
4626 capture = self.pg0.get_capture(1)
4631 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4632 self.assertEqual(tcp.dport, local_port)
4633 self.assert_packet_checksums_valid(p)
4635 self.logger.error(ppp("Unexpected or invalid packet:", p))
4638 # from service back to client (no translation)
4639 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4640 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4641 TCP(sport=local_port, dport=12346))
4642 self.pg0.add_stream(p)
4643 self.pg_enable_capture(self.pg_interfaces)
4645 capture = self.pg1.get_capture(1)
4650 self.assertEqual(ip.src, self.pg0.remote_ip4)
4651 self.assertEqual(tcp.sport, local_port)
4652 self.assert_packet_checksums_valid(p)
4654 self.logger.error(ppp("Unexpected or invalid packet:", p))
4657 def test_output_feature(self):
4658 """ NAT44 interface output feature (in2out postrouting) """
4659 self.vapi.nat44_forwarding_enable_disable(1)
4660 self.nat44_add_address(self.nat_addr)
4661 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4663 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4667 pkts = self.create_stream_in(self.pg0, self.pg1)
4668 self.pg0.add_stream(pkts)
4669 self.pg_enable_capture(self.pg_interfaces)
4671 capture = self.pg1.get_capture(len(pkts))
4672 self.verify_capture_out(capture)
4675 pkts = self.create_stream_out(self.pg1)
4676 self.pg1.add_stream(pkts)
4677 self.pg_enable_capture(self.pg_interfaces)
4679 capture = self.pg0.get_capture(len(pkts))
4680 self.verify_capture_in(capture, self.pg0)
4682 def test_multiple_vrf(self):
4683 """ Multiple VRF setup """
4684 external_addr = '1.2.3.4'
4689 self.vapi.nat44_forwarding_enable_disable(1)
4690 self.nat44_add_address(self.nat_addr)
4691 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4692 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4694 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4696 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4697 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4699 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4701 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4702 local_port, external_port, vrf_id=1,
4703 proto=IP_PROTOS.tcp, out2in_only=1)
4704 self.nat44_add_static_mapping(
4705 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4706 local_port=local_port, vrf_id=0, external_port=external_port,
4707 proto=IP_PROTOS.tcp, out2in_only=1)
4709 # from client to service (both VRF1)
4710 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4711 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4712 TCP(sport=12345, dport=external_port))
4713 self.pg6.add_stream(p)
4714 self.pg_enable_capture(self.pg_interfaces)
4716 capture = self.pg5.get_capture(1)
4721 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4722 self.assertEqual(tcp.dport, local_port)
4723 self.assert_packet_checksums_valid(p)
4725 self.logger.error(ppp("Unexpected or invalid packet:", p))
4728 # from service back to client (both VRF1)
4729 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4730 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4731 TCP(sport=local_port, dport=12345))
4732 self.pg5.add_stream(p)
4733 self.pg_enable_capture(self.pg_interfaces)
4735 capture = self.pg6.get_capture(1)
4740 self.assertEqual(ip.src, external_addr)
4741 self.assertEqual(tcp.sport, external_port)
4742 self.assert_packet_checksums_valid(p)
4744 self.logger.error(ppp("Unexpected or invalid packet:", p))
4747 # dynamic NAT from VRF1 to VRF0 (output-feature)
4748 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4749 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4750 TCP(sport=2345, dport=22))
4751 self.pg5.add_stream(p)
4752 self.pg_enable_capture(self.pg_interfaces)
4754 capture = self.pg1.get_capture(1)
4759 self.assertEqual(ip.src, self.nat_addr)
4760 self.assertNotEqual(tcp.sport, 2345)
4761 self.assert_packet_checksums_valid(p)
4764 self.logger.error(ppp("Unexpected or invalid packet:", p))
4767 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4768 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4769 TCP(sport=22, dport=port))
4770 self.pg1.add_stream(p)
4771 self.pg_enable_capture(self.pg_interfaces)
4773 capture = self.pg5.get_capture(1)
4778 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4779 self.assertEqual(tcp.dport, 2345)
4780 self.assert_packet_checksums_valid(p)
4782 self.logger.error(ppp("Unexpected or invalid packet:", p))
4785 # from client VRF1 to service VRF0
4786 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4787 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4788 TCP(sport=12346, dport=external_port))
4789 self.pg6.add_stream(p)
4790 self.pg_enable_capture(self.pg_interfaces)
4792 capture = self.pg0.get_capture(1)
4797 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4798 self.assertEqual(tcp.dport, local_port)
4799 self.assert_packet_checksums_valid(p)
4801 self.logger.error(ppp("Unexpected or invalid packet:", p))
4804 # from service VRF0 back to client VRF1
4805 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4806 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4807 TCP(sport=local_port, dport=12346))
4808 self.pg0.add_stream(p)
4809 self.pg_enable_capture(self.pg_interfaces)
4811 capture = self.pg6.get_capture(1)
4816 self.assertEqual(ip.src, self.pg0.local_ip4)
4817 self.assertEqual(tcp.sport, external_port)
4818 self.assert_packet_checksums_valid(p)
4820 self.logger.error(ppp("Unexpected or invalid packet:", p))
4823 # from client VRF0 to service VRF1
4824 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4825 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4826 TCP(sport=12347, dport=external_port))
4827 self.pg0.add_stream(p)
4828 self.pg_enable_capture(self.pg_interfaces)
4830 capture = self.pg5.get_capture(1)
4835 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4836 self.assertEqual(tcp.dport, local_port)
4837 self.assert_packet_checksums_valid(p)
4839 self.logger.error(ppp("Unexpected or invalid packet:", p))
4842 # from service VRF1 back to client VRF0
4843 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4844 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4845 TCP(sport=local_port, dport=12347))
4846 self.pg5.add_stream(p)
4847 self.pg_enable_capture(self.pg_interfaces)
4849 capture = self.pg0.get_capture(1)
4854 self.assertEqual(ip.src, external_addr)
4855 self.assertEqual(tcp.sport, external_port)
4856 self.assert_packet_checksums_valid(p)
4858 self.logger.error(ppp("Unexpected or invalid packet:", p))
4861 # from client to server (both VRF1, no translation)
4862 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4863 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4864 TCP(sport=12348, dport=local_port))
4865 self.pg6.add_stream(p)
4866 self.pg_enable_capture(self.pg_interfaces)
4868 capture = self.pg5.get_capture(1)
4873 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4874 self.assertEqual(tcp.dport, local_port)
4875 self.assert_packet_checksums_valid(p)
4877 self.logger.error(ppp("Unexpected or invalid packet:", p))
4880 # from server back to client (both VRF1, no translation)
4881 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4882 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4883 TCP(sport=local_port, dport=12348))
4884 self.pg5.add_stream(p)
4885 self.pg_enable_capture(self.pg_interfaces)
4887 capture = self.pg6.get_capture(1)
4892 self.assertEqual(ip.src, self.pg5.remote_ip4)
4893 self.assertEqual(tcp.sport, local_port)
4894 self.assert_packet_checksums_valid(p)
4896 self.logger.error(ppp("Unexpected or invalid packet:", p))
4899 # from client VRF1 to server VRF0 (no translation)
4900 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4901 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4902 TCP(sport=local_port, dport=12349))
4903 self.pg0.add_stream(p)
4904 self.pg_enable_capture(self.pg_interfaces)
4906 capture = self.pg6.get_capture(1)
4911 self.assertEqual(ip.src, self.pg0.remote_ip4)
4912 self.assertEqual(tcp.sport, local_port)
4913 self.assert_packet_checksums_valid(p)
4915 self.logger.error(ppp("Unexpected or invalid packet:", p))
4918 # from server VRF0 back to client VRF1 (no translation)
4919 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4920 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4921 TCP(sport=local_port, dport=12349))
4922 self.pg0.add_stream(p)
4923 self.pg_enable_capture(self.pg_interfaces)
4925 capture = self.pg6.get_capture(1)
4930 self.assertEqual(ip.src, self.pg0.remote_ip4)
4931 self.assertEqual(tcp.sport, local_port)
4932 self.assert_packet_checksums_valid(p)
4934 self.logger.error(ppp("Unexpected or invalid packet:", p))
4937 # from client VRF0 to server VRF1 (no translation)
4938 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4939 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4940 TCP(sport=12344, dport=local_port))
4941 self.pg0.add_stream(p)
4942 self.pg_enable_capture(self.pg_interfaces)
4944 capture = self.pg5.get_capture(1)
4949 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4950 self.assertEqual(tcp.dport, local_port)
4951 self.assert_packet_checksums_valid(p)
4953 self.logger.error(ppp("Unexpected or invalid packet:", p))
4956 # from server VRF1 back to client VRF0 (no translation)
4957 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4958 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4959 TCP(sport=local_port, dport=12344))
4960 self.pg5.add_stream(p)
4961 self.pg_enable_capture(self.pg_interfaces)
4963 capture = self.pg0.get_capture(1)
4968 self.assertEqual(ip.src, self.pg5.remote_ip4)
4969 self.assertEqual(tcp.sport, local_port)
4970 self.assert_packet_checksums_valid(p)
4972 self.logger.error(ppp("Unexpected or invalid packet:", p))
4975 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4976 def test_session_timeout(self):
4977 """ NAT44 session timeouts """
4978 self.nat44_add_address(self.nat_addr)
4979 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4980 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4982 self.vapi.nat_set_timeouts(icmp=5)
4986 for i in range(0, max_sessions):
4987 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
4988 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4989 IP(src=src, dst=self.pg1.remote_ip4) /
4990 ICMP(id=1025, type='echo-request'))
4992 self.pg0.add_stream(pkts)
4993 self.pg_enable_capture(self.pg_interfaces)
4995 self.pg1.get_capture(max_sessions)
5000 for i in range(0, max_sessions):
5001 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5002 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5003 IP(src=src, dst=self.pg1.remote_ip4) /
5004 ICMP(id=1026, type='echo-request'))
5006 self.pg0.add_stream(pkts)
5007 self.pg_enable_capture(self.pg_interfaces)
5009 self.pg1.get_capture(max_sessions)
5012 users = self.vapi.nat44_user_dump()
5014 nsessions = nsessions + user.nsessions
5015 self.assertLess(nsessions, 2 * max_sessions)
5017 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5018 def test_session_limit_per_user(self):
5019 """ Maximum sessions per user limit """
5020 self.nat44_add_address(self.nat_addr)
5021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5024 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5025 src_address=self.pg2.local_ip4n,
5027 template_interval=10)
5029 # get maximum number of translations per user
5030 nat44_config = self.vapi.nat_show_config()
5033 for port in range(0, nat44_config.max_translations_per_user):
5034 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5035 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5036 UDP(sport=1025 + port, dport=1025 + port))
5039 self.pg0.add_stream(pkts)
5040 self.pg_enable_capture(self.pg_interfaces)
5042 capture = self.pg1.get_capture(len(pkts))
5044 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5045 src_port=self.ipfix_src_port)
5047 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5048 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5049 UDP(sport=3001, dport=3002))
5050 self.pg0.add_stream(p)
5051 self.pg_enable_capture(self.pg_interfaces)
5053 capture = self.pg1.assert_nothing_captured()
5055 # verify IPFIX logging
5056 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5058 capture = self.pg2.get_capture(10)
5059 ipfix = IPFIXDecoder()
5060 # first load template
5062 self.assertTrue(p.haslayer(IPFIX))
5063 if p.haslayer(Template):
5064 ipfix.add_template(p.getlayer(Template))
5065 # verify events in data set
5067 if p.haslayer(Data):
5068 data = ipfix.decode_data_set(p.getlayer(Set))
5069 self.verify_ipfix_max_entries_per_user(
5071 nat44_config.max_translations_per_user,
5072 self.pg0.remote_ip4n)
5075 super(TestNAT44EndpointDependent, self).tearDown()
5076 if not self.vpp_dead:
5077 self.logger.info(self.vapi.cli("show nat44 addresses"))
5078 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5079 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5080 self.logger.info(self.vapi.cli("show nat44 interface address"))
5081 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5082 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5083 self.logger.info(self.vapi.cli("show nat timeouts"))
5085 self.vapi.cli("clear logging")
5088 class TestNAT44Out2InDPO(MethodHolder):
5089 """ NAT44 Test Cases using out2in DPO """
5092 def setUpConstants(cls):
5093 super(TestNAT44Out2InDPO, cls).setUpConstants()
5094 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5097 def setUpClass(cls):
5098 super(TestNAT44Out2InDPO, cls).setUpClass()
5099 cls.vapi.cli("set log class nat level debug")
5102 cls.tcp_port_in = 6303
5103 cls.tcp_port_out = 6303
5104 cls.udp_port_in = 6304
5105 cls.udp_port_out = 6304
5106 cls.icmp_id_in = 6305
5107 cls.icmp_id_out = 6305
5108 cls.nat_addr = '10.0.0.3'
5109 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5110 cls.dst_ip4 = '192.168.70.1'
5112 cls.create_pg_interfaces(range(2))
5115 cls.pg0.config_ip4()
5116 cls.pg0.resolve_arp()
5119 cls.pg1.config_ip6()
5120 cls.pg1.resolve_ndp()
5122 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5123 dst_address_length=0,
5124 next_hop_address=cls.pg1.remote_ip6n,
5125 next_hop_sw_if_index=cls.pg1.sw_if_index)
5128 super(TestNAT44Out2InDPO, cls).tearDownClass()
5131 def configure_xlat(self):
5132 self.dst_ip6_pfx = '1:2:3::'
5133 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5135 self.dst_ip6_pfx_len = 96
5136 self.src_ip6_pfx = '4:5:6::'
5137 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5139 self.src_ip6_pfx_len = 96
5140 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5141 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5142 '\x00\x00\x00\x00', 0, is_translation=1,
5145 def test_464xlat_ce(self):
5146 """ Test 464XLAT CE with NAT44 """
5148 nat_config = self.vapi.nat_show_config()
5149 self.assertEqual(1, nat_config.out2in_dpo)
5151 self.configure_xlat()
5153 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5154 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5156 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5157 self.dst_ip6_pfx_len)
5158 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5159 self.src_ip6_pfx_len)
5162 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5163 self.pg0.add_stream(pkts)
5164 self.pg_enable_capture(self.pg_interfaces)
5166 capture = self.pg1.get_capture(len(pkts))
5167 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5170 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5172 self.pg1.add_stream(pkts)
5173 self.pg_enable_capture(self.pg_interfaces)
5175 capture = self.pg0.get_capture(len(pkts))
5176 self.verify_capture_in(capture, self.pg0)
5178 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5180 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5181 self.nat_addr_n, is_add=0)
5183 def test_464xlat_ce_no_nat(self):
5184 """ Test 464XLAT CE without NAT44 """
5186 self.configure_xlat()
5188 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5189 self.dst_ip6_pfx_len)
5190 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5191 self.src_ip6_pfx_len)
5193 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5194 self.pg0.add_stream(pkts)
5195 self.pg_enable_capture(self.pg_interfaces)
5197 capture = self.pg1.get_capture(len(pkts))
5198 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5199 nat_ip=out_dst_ip6, same_port=True)
5201 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5202 self.pg1.add_stream(pkts)
5203 self.pg_enable_capture(self.pg_interfaces)
5205 capture = self.pg0.get_capture(len(pkts))
5206 self.verify_capture_in(capture, self.pg0)
5209 class TestDeterministicNAT(MethodHolder):
5210 """ Deterministic NAT Test Cases """
5213 def setUpConstants(cls):
5214 super(TestDeterministicNAT, cls).setUpConstants()
5215 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5218 def setUpClass(cls):
5219 super(TestDeterministicNAT, cls).setUpClass()
5220 cls.vapi.cli("set log class nat level debug")
5223 cls.tcp_port_in = 6303
5224 cls.tcp_external_port = 6303
5225 cls.udp_port_in = 6304
5226 cls.udp_external_port = 6304
5227 cls.icmp_id_in = 6305
5228 cls.nat_addr = '10.0.0.3'
5230 cls.create_pg_interfaces(range(3))
5231 cls.interfaces = list(cls.pg_interfaces)
5233 for i in cls.interfaces:
5238 cls.pg0.generate_remote_hosts(2)
5239 cls.pg0.configure_ipv4_neighbors()
5242 super(TestDeterministicNAT, cls).tearDownClass()
5245 def create_stream_in(self, in_if, out_if, ttl=64):
5247 Create packet stream for inside network
5249 :param in_if: Inside interface
5250 :param out_if: Outside interface
5251 :param ttl: TTL of generated packets
5255 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5256 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5257 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5261 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5262 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5263 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5267 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5268 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5269 ICMP(id=self.icmp_id_in, type='echo-request'))
5274 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5276 Create packet stream for outside network
5278 :param out_if: Outside interface
5279 :param dst_ip: Destination IP address (Default use global NAT address)
5280 :param ttl: TTL of generated packets
5283 dst_ip = self.nat_addr
5286 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5287 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5288 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5292 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5293 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5294 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5298 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5299 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5300 ICMP(id=self.icmp_external_id, type='echo-reply'))
5305 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5307 Verify captured packets on outside network
5309 :param capture: Captured packets
5310 :param nat_ip: Translated IP address (Default use global NAT address)
5311 :param same_port: Sorce port number is not translated (Default False)
5312 :param packet_num: Expected number of packets (Default 3)
5315 nat_ip = self.nat_addr
5316 self.assertEqual(packet_num, len(capture))
5317 for packet in capture:
5319 self.assertEqual(packet[IP].src, nat_ip)
5320 if packet.haslayer(TCP):
5321 self.tcp_port_out = packet[TCP].sport
5322 elif packet.haslayer(UDP):
5323 self.udp_port_out = packet[UDP].sport
5325 self.icmp_external_id = packet[ICMP].id
5327 self.logger.error(ppp("Unexpected or invalid packet "
5328 "(outside network):", packet))
5331 def test_deterministic_mode(self):
5332 """ NAT plugin run deterministic mode """
5333 in_addr = '172.16.255.0'
5334 out_addr = '172.17.255.50'
5335 in_addr_t = '172.16.255.20'
5336 in_addr_n = socket.inet_aton(in_addr)
5337 out_addr_n = socket.inet_aton(out_addr)
5338 in_addr_t_n = socket.inet_aton(in_addr_t)
5342 nat_config = self.vapi.nat_show_config()
5343 self.assertEqual(1, nat_config.deterministic)
5345 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5347 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5348 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5349 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5350 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5352 deterministic_mappings = self.vapi.nat_det_map_dump()
5353 self.assertEqual(len(deterministic_mappings), 1)
5354 dsm = deterministic_mappings[0]
5355 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5356 self.assertEqual(in_plen, dsm.in_plen)
5357 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5358 self.assertEqual(out_plen, dsm.out_plen)
5360 self.clear_nat_det()
5361 deterministic_mappings = self.vapi.nat_det_map_dump()
5362 self.assertEqual(len(deterministic_mappings), 0)
5364 def test_set_timeouts(self):
5365 """ Set deterministic NAT timeouts """
5366 timeouts_before = self.vapi.nat_get_timeouts()
5368 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5369 timeouts_before.tcp_established + 10,
5370 timeouts_before.tcp_transitory + 10,
5371 timeouts_before.icmp + 10)
5373 timeouts_after = self.vapi.nat_get_timeouts()
5375 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5376 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5377 self.assertNotEqual(timeouts_before.tcp_established,
5378 timeouts_after.tcp_established)
5379 self.assertNotEqual(timeouts_before.tcp_transitory,
5380 timeouts_after.tcp_transitory)
5382 def test_det_in(self):
5383 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5385 nat_ip = "10.0.0.10"
5387 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5389 socket.inet_aton(nat_ip),
5391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5396 pkts = self.create_stream_in(self.pg0, self.pg1)
5397 self.pg0.add_stream(pkts)
5398 self.pg_enable_capture(self.pg_interfaces)
5400 capture = self.pg1.get_capture(len(pkts))
5401 self.verify_capture_out(capture, nat_ip)
5404 pkts = self.create_stream_out(self.pg1, nat_ip)
5405 self.pg1.add_stream(pkts)
5406 self.pg_enable_capture(self.pg_interfaces)
5408 capture = self.pg0.get_capture(len(pkts))
5409 self.verify_capture_in(capture, self.pg0)
5412 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5413 self.assertEqual(len(sessions), 3)
5417 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5418 self.assertEqual(s.in_port, self.tcp_port_in)
5419 self.assertEqual(s.out_port, self.tcp_port_out)
5420 self.assertEqual(s.ext_port, self.tcp_external_port)
5424 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5425 self.assertEqual(s.in_port, self.udp_port_in)
5426 self.assertEqual(s.out_port, self.udp_port_out)
5427 self.assertEqual(s.ext_port, self.udp_external_port)
5431 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5432 self.assertEqual(s.in_port, self.icmp_id_in)
5433 self.assertEqual(s.out_port, self.icmp_external_id)
5435 def test_multiple_users(self):
5436 """ Deterministic NAT multiple users """
5438 nat_ip = "10.0.0.10"
5440 external_port = 6303
5442 host0 = self.pg0.remote_hosts[0]
5443 host1 = self.pg0.remote_hosts[1]
5445 self.vapi.nat_det_add_del_map(host0.ip4n,
5447 socket.inet_aton(nat_ip),
5449 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5450 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5454 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5455 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5456 TCP(sport=port_in, dport=external_port))
5457 self.pg0.add_stream(p)
5458 self.pg_enable_capture(self.pg_interfaces)
5460 capture = self.pg1.get_capture(1)
5465 self.assertEqual(ip.src, nat_ip)
5466 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5467 self.assertEqual(tcp.dport, external_port)
5468 port_out0 = tcp.sport
5470 self.logger.error(ppp("Unexpected or invalid packet:", p))
5474 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5475 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5476 TCP(sport=port_in, dport=external_port))
5477 self.pg0.add_stream(p)
5478 self.pg_enable_capture(self.pg_interfaces)
5480 capture = self.pg1.get_capture(1)
5485 self.assertEqual(ip.src, nat_ip)
5486 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5487 self.assertEqual(tcp.dport, external_port)
5488 port_out1 = tcp.sport
5490 self.logger.error(ppp("Unexpected or invalid packet:", p))
5493 dms = self.vapi.nat_det_map_dump()
5494 self.assertEqual(1, len(dms))
5495 self.assertEqual(2, dms[0].ses_num)
5498 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5499 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5500 TCP(sport=external_port, dport=port_out0))
5501 self.pg1.add_stream(p)
5502 self.pg_enable_capture(self.pg_interfaces)
5504 capture = self.pg0.get_capture(1)
5509 self.assertEqual(ip.src, self.pg1.remote_ip4)
5510 self.assertEqual(ip.dst, host0.ip4)
5511 self.assertEqual(tcp.dport, port_in)
5512 self.assertEqual(tcp.sport, external_port)
5514 self.logger.error(ppp("Unexpected or invalid packet:", p))
5518 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5519 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5520 TCP(sport=external_port, dport=port_out1))
5521 self.pg1.add_stream(p)
5522 self.pg_enable_capture(self.pg_interfaces)
5524 capture = self.pg0.get_capture(1)
5529 self.assertEqual(ip.src, self.pg1.remote_ip4)
5530 self.assertEqual(ip.dst, host1.ip4)
5531 self.assertEqual(tcp.dport, port_in)
5532 self.assertEqual(tcp.sport, external_port)
5534 self.logger.error(ppp("Unexpected or invalid packet", p))
5537 # session close api test
5538 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5540 self.pg1.remote_ip4n,
5542 dms = self.vapi.nat_det_map_dump()
5543 self.assertEqual(dms[0].ses_num, 1)
5545 self.vapi.nat_det_close_session_in(host0.ip4n,
5547 self.pg1.remote_ip4n,
5549 dms = self.vapi.nat_det_map_dump()
5550 self.assertEqual(dms[0].ses_num, 0)
5552 def test_tcp_session_close_detection_in(self):
5553 """ Deterministic NAT TCP session close from inside network """
5554 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5556 socket.inet_aton(self.nat_addr),
5558 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5559 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5562 self.initiate_tcp_session(self.pg0, self.pg1)
5564 # close the session from inside
5566 # FIN packet in -> out
5567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5569 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5571 self.pg0.add_stream(p)
5572 self.pg_enable_capture(self.pg_interfaces)
5574 self.pg1.get_capture(1)
5578 # ACK packet out -> in
5579 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5580 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5581 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5585 # FIN packet out -> in
5586 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5587 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5588 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5592 self.pg1.add_stream(pkts)
5593 self.pg_enable_capture(self.pg_interfaces)
5595 self.pg0.get_capture(2)
5597 # ACK packet in -> out
5598 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5599 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5600 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5602 self.pg0.add_stream(p)
5603 self.pg_enable_capture(self.pg_interfaces)
5605 self.pg1.get_capture(1)
5607 # Check if deterministic NAT44 closed the session
5608 dms = self.vapi.nat_det_map_dump()
5609 self.assertEqual(0, dms[0].ses_num)
5611 self.logger.error("TCP session termination failed")
5614 def test_tcp_session_close_detection_out(self):
5615 """ Deterministic NAT TCP session close from outside network """
5616 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5618 socket.inet_aton(self.nat_addr),
5620 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5621 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5624 self.initiate_tcp_session(self.pg0, self.pg1)
5626 # close the session from outside
5628 # FIN packet out -> in
5629 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5630 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5631 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5633 self.pg1.add_stream(p)
5634 self.pg_enable_capture(self.pg_interfaces)
5636 self.pg0.get_capture(1)
5640 # ACK packet in -> out
5641 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5642 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5643 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5647 # ACK packet in -> out
5648 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5649 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5650 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5654 self.pg0.add_stream(pkts)
5655 self.pg_enable_capture(self.pg_interfaces)
5657 self.pg1.get_capture(2)
5659 # ACK packet out -> in
5660 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5661 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5662 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5664 self.pg1.add_stream(p)
5665 self.pg_enable_capture(self.pg_interfaces)
5667 self.pg0.get_capture(1)
5669 # Check if deterministic NAT44 closed the session
5670 dms = self.vapi.nat_det_map_dump()
5671 self.assertEqual(0, dms[0].ses_num)
5673 self.logger.error("TCP session termination failed")
5676 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5677 def test_session_timeout(self):
5678 """ Deterministic NAT session timeouts """
5679 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5681 socket.inet_aton(self.nat_addr),
5683 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5684 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5687 self.initiate_tcp_session(self.pg0, self.pg1)
5688 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5689 pkts = self.create_stream_in(self.pg0, self.pg1)
5690 self.pg0.add_stream(pkts)
5691 self.pg_enable_capture(self.pg_interfaces)
5693 capture = self.pg1.get_capture(len(pkts))
5696 dms = self.vapi.nat_det_map_dump()
5697 self.assertEqual(0, dms[0].ses_num)
5699 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5700 def test_session_limit_per_user(self):
5701 """ Deterministic NAT maximum sessions per user limit """
5702 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5704 socket.inet_aton(self.nat_addr),
5706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5709 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5710 src_address=self.pg2.local_ip4n,
5712 template_interval=10)
5713 self.vapi.nat_ipfix()
5716 for port in range(1025, 2025):
5717 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5718 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5719 UDP(sport=port, dport=port))
5722 self.pg0.add_stream(pkts)
5723 self.pg_enable_capture(self.pg_interfaces)
5725 capture = self.pg1.get_capture(len(pkts))
5727 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5728 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5729 UDP(sport=3001, dport=3002))
5730 self.pg0.add_stream(p)
5731 self.pg_enable_capture(self.pg_interfaces)
5733 capture = self.pg1.assert_nothing_captured()
5735 # verify ICMP error packet
5736 capture = self.pg0.get_capture(1)
5738 self.assertTrue(p.haslayer(ICMP))
5740 self.assertEqual(icmp.type, 3)
5741 self.assertEqual(icmp.code, 1)
5742 self.assertTrue(icmp.haslayer(IPerror))
5743 inner_ip = icmp[IPerror]
5744 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5745 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5747 dms = self.vapi.nat_det_map_dump()
5749 self.assertEqual(1000, dms[0].ses_num)
5751 # verify IPFIX logging
5752 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5754 capture = self.pg2.get_capture(2)
5755 ipfix = IPFIXDecoder()
5756 # first load template
5758 self.assertTrue(p.haslayer(IPFIX))
5759 if p.haslayer(Template):
5760 ipfix.add_template(p.getlayer(Template))
5761 # verify events in data set
5763 if p.haslayer(Data):
5764 data = ipfix.decode_data_set(p.getlayer(Set))
5765 self.verify_ipfix_max_entries_per_user(data,
5767 self.pg0.remote_ip4n)
5769 def clear_nat_det(self):
5771 Clear deterministic NAT configuration.
5773 self.vapi.nat_ipfix(enable=0)
5774 self.vapi.nat_set_timeouts()
5775 deterministic_mappings = self.vapi.nat_det_map_dump()
5776 for dsm in deterministic_mappings:
5777 self.vapi.nat_det_add_del_map(dsm.in_addr,
5783 interfaces = self.vapi.nat44_interface_dump()
5784 for intf in interfaces:
5785 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5790 super(TestDeterministicNAT, self).tearDown()
5791 if not self.vpp_dead:
5792 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5793 self.logger.info(self.vapi.cli("show nat timeouts"))
5795 self.vapi.cli("show nat44 deterministic mappings"))
5797 self.vapi.cli("show nat44 deterministic sessions"))
5798 self.clear_nat_det()
5801 class TestNAT64(MethodHolder):
5802 """ NAT64 Test Cases """
5805 def setUpConstants(cls):
5806 super(TestNAT64, cls).setUpConstants()
5807 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5808 "nat64 st hash buckets 256", "}"])
5811 def setUpClass(cls):
5812 super(TestNAT64, cls).setUpClass()
5815 cls.tcp_port_in = 6303
5816 cls.tcp_port_out = 6303
5817 cls.udp_port_in = 6304
5818 cls.udp_port_out = 6304
5819 cls.icmp_id_in = 6305
5820 cls.icmp_id_out = 6305
5821 cls.nat_addr = '10.0.0.3'
5822 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5824 cls.vrf1_nat_addr = '10.0.10.3'
5825 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5827 cls.ipfix_src_port = 4739
5828 cls.ipfix_domain_id = 1
5830 cls.create_pg_interfaces(range(6))
5831 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5832 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5833 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5835 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5837 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5839 cls.pg0.generate_remote_hosts(2)
5841 for i in cls.ip6_interfaces:
5844 i.configure_ipv6_neighbors()
5846 for i in cls.ip4_interfaces:
5852 cls.pg3.config_ip4()
5853 cls.pg3.resolve_arp()
5854 cls.pg3.config_ip6()
5855 cls.pg3.configure_ipv6_neighbors()
5858 cls.pg5.config_ip6()
5861 super(TestNAT64, cls).tearDownClass()
5864 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5865 """ NAT64 inside interface handles Neighbor Advertisement """
5867 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5870 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5871 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5872 ICMPv6EchoRequest())
5874 self.pg5.add_stream(pkts)
5875 self.pg_enable_capture(self.pg_interfaces)
5878 # Wait for Neighbor Solicitation
5879 capture = self.pg5.get_capture(len(pkts))
5880 self.assertEqual(1, len(capture))
5883 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5884 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5885 tgt = packet[ICMPv6ND_NS].tgt
5887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5890 # Send Neighbor Advertisement
5891 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5892 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5893 ICMPv6ND_NA(tgt=tgt) /
5894 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5896 self.pg5.add_stream(pkts)
5897 self.pg_enable_capture(self.pg_interfaces)
5900 # Try to send ping again
5902 self.pg5.add_stream(pkts)
5903 self.pg_enable_capture(self.pg_interfaces)
5906 # Wait for ping reply
5907 capture = self.pg5.get_capture(len(pkts))
5908 self.assertEqual(1, len(capture))
5911 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5912 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5913 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5915 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5918 def test_pool(self):
5919 """ Add/delete address to NAT64 pool """
5920 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5922 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5924 addresses = self.vapi.nat64_pool_addr_dump()
5925 self.assertEqual(len(addresses), 1)
5926 self.assertEqual(addresses[0].address, nat_addr)
5928 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5930 addresses = self.vapi.nat64_pool_addr_dump()
5931 self.assertEqual(len(addresses), 0)
5933 def test_interface(self):
5934 """ Enable/disable NAT64 feature on the interface """
5935 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5936 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5938 interfaces = self.vapi.nat64_interface_dump()
5939 self.assertEqual(len(interfaces), 2)
5942 for intf in interfaces:
5943 if intf.sw_if_index == self.pg0.sw_if_index:
5944 self.assertEqual(intf.is_inside, 1)
5946 elif intf.sw_if_index == self.pg1.sw_if_index:
5947 self.assertEqual(intf.is_inside, 0)
5949 self.assertTrue(pg0_found)
5950 self.assertTrue(pg1_found)
5952 features = self.vapi.cli("show interface features pg0")
5953 self.assertNotEqual(features.find('nat64-in2out'), -1)
5954 features = self.vapi.cli("show interface features pg1")
5955 self.assertNotEqual(features.find('nat64-out2in'), -1)
5957 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5958 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5960 interfaces = self.vapi.nat64_interface_dump()
5961 self.assertEqual(len(interfaces), 0)
5963 def test_static_bib(self):
5964 """ Add/delete static BIB entry """
5965 in_addr = socket.inet_pton(socket.AF_INET6,
5966 '2001:db8:85a3::8a2e:370:7334')
5967 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5970 proto = IP_PROTOS.tcp
5972 self.vapi.nat64_add_del_static_bib(in_addr,
5977 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5982 self.assertEqual(bibe.i_addr, in_addr)
5983 self.assertEqual(bibe.o_addr, out_addr)
5984 self.assertEqual(bibe.i_port, in_port)
5985 self.assertEqual(bibe.o_port, out_port)
5986 self.assertEqual(static_bib_num, 1)
5988 self.vapi.nat64_add_del_static_bib(in_addr,
5994 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5999 self.assertEqual(static_bib_num, 0)
6001 def test_set_timeouts(self):
6002 """ Set NAT64 timeouts """
6003 # verify default values
6004 timeouts = self.vapi.nat_get_timeouts()
6005 self.assertEqual(timeouts.udp, 300)
6006 self.assertEqual(timeouts.icmp, 60)
6007 self.assertEqual(timeouts.tcp_transitory, 240)
6008 self.assertEqual(timeouts.tcp_established, 7440)
6010 # set and verify custom values
6011 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6012 tcp_established=7450)
6013 timeouts = self.vapi.nat_get_timeouts()
6014 self.assertEqual(timeouts.udp, 200)
6015 self.assertEqual(timeouts.icmp, 30)
6016 self.assertEqual(timeouts.tcp_transitory, 250)
6017 self.assertEqual(timeouts.tcp_established, 7450)
6019 def test_dynamic(self):
6020 """ NAT64 dynamic translation test """
6021 self.tcp_port_in = 6303
6022 self.udp_port_in = 6304
6023 self.icmp_id_in = 6305
6025 ses_num_start = self.nat64_get_ses_num()
6027 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6029 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6030 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6033 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6034 self.pg0.add_stream(pkts)
6035 self.pg_enable_capture(self.pg_interfaces)
6037 capture = self.pg1.get_capture(len(pkts))
6038 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6039 dst_ip=self.pg1.remote_ip4)
6042 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6043 self.pg1.add_stream(pkts)
6044 self.pg_enable_capture(self.pg_interfaces)
6046 capture = self.pg0.get_capture(len(pkts))
6047 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6048 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6051 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6052 self.pg0.add_stream(pkts)
6053 self.pg_enable_capture(self.pg_interfaces)
6055 capture = self.pg1.get_capture(len(pkts))
6056 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6057 dst_ip=self.pg1.remote_ip4)
6060 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6061 self.pg1.add_stream(pkts)
6062 self.pg_enable_capture(self.pg_interfaces)
6064 capture = self.pg0.get_capture(len(pkts))
6065 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6067 ses_num_end = self.nat64_get_ses_num()
6069 self.assertEqual(ses_num_end - ses_num_start, 3)
6071 # tenant with specific VRF
6072 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6073 self.vrf1_nat_addr_n,
6074 vrf_id=self.vrf1_id)
6075 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6077 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6078 self.pg2.add_stream(pkts)
6079 self.pg_enable_capture(self.pg_interfaces)
6081 capture = self.pg1.get_capture(len(pkts))
6082 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6083 dst_ip=self.pg1.remote_ip4)
6085 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6086 self.pg1.add_stream(pkts)
6087 self.pg_enable_capture(self.pg_interfaces)
6089 capture = self.pg2.get_capture(len(pkts))
6090 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6092 def test_static(self):
6093 """ NAT64 static translation test """
6094 self.tcp_port_in = 60303
6095 self.udp_port_in = 60304
6096 self.icmp_id_in = 60305
6097 self.tcp_port_out = 60303
6098 self.udp_port_out = 60304
6099 self.icmp_id_out = 60305
6101 ses_num_start = self.nat64_get_ses_num()
6103 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6105 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6106 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6108 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6113 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6118 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6125 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6126 self.pg0.add_stream(pkts)
6127 self.pg_enable_capture(self.pg_interfaces)
6129 capture = self.pg1.get_capture(len(pkts))
6130 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6131 dst_ip=self.pg1.remote_ip4, same_port=True)
6134 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6135 self.pg1.add_stream(pkts)
6136 self.pg_enable_capture(self.pg_interfaces)
6138 capture = self.pg0.get_capture(len(pkts))
6139 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6140 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6142 ses_num_end = self.nat64_get_ses_num()
6144 self.assertEqual(ses_num_end - ses_num_start, 3)
6146 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6147 def test_session_timeout(self):
6148 """ NAT64 session timeout """
6149 self.icmp_id_in = 1234
6150 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6152 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6153 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6154 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6156 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6157 self.pg0.add_stream(pkts)
6158 self.pg_enable_capture(self.pg_interfaces)
6160 capture = self.pg1.get_capture(len(pkts))
6162 ses_num_before_timeout = self.nat64_get_ses_num()
6166 # ICMP and TCP session after timeout
6167 ses_num_after_timeout = self.nat64_get_ses_num()
6168 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6170 def test_icmp_error(self):
6171 """ NAT64 ICMP Error message translation """
6172 self.tcp_port_in = 6303
6173 self.udp_port_in = 6304
6174 self.icmp_id_in = 6305
6176 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6178 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6179 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6181 # send some packets to create sessions
6182 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6183 self.pg0.add_stream(pkts)
6184 self.pg_enable_capture(self.pg_interfaces)
6186 capture_ip4 = self.pg1.get_capture(len(pkts))
6187 self.verify_capture_out(capture_ip4,
6188 nat_ip=self.nat_addr,
6189 dst_ip=self.pg1.remote_ip4)
6191 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6192 self.pg1.add_stream(pkts)
6193 self.pg_enable_capture(self.pg_interfaces)
6195 capture_ip6 = self.pg0.get_capture(len(pkts))
6196 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6197 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6198 self.pg0.remote_ip6)
6201 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6202 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6203 ICMPv6DestUnreach(code=1) /
6204 packet[IPv6] for packet in capture_ip6]
6205 self.pg0.add_stream(pkts)
6206 self.pg_enable_capture(self.pg_interfaces)
6208 capture = self.pg1.get_capture(len(pkts))
6209 for packet in capture:
6211 self.assertEqual(packet[IP].src, self.nat_addr)
6212 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6213 self.assertEqual(packet[ICMP].type, 3)
6214 self.assertEqual(packet[ICMP].code, 13)
6215 inner = packet[IPerror]
6216 self.assertEqual(inner.src, self.pg1.remote_ip4)
6217 self.assertEqual(inner.dst, self.nat_addr)
6218 self.assert_packet_checksums_valid(packet)
6219 if inner.haslayer(TCPerror):
6220 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6221 elif inner.haslayer(UDPerror):
6222 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6224 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6226 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6230 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6231 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6232 ICMP(type=3, code=13) /
6233 packet[IP] for packet in capture_ip4]
6234 self.pg1.add_stream(pkts)
6235 self.pg_enable_capture(self.pg_interfaces)
6237 capture = self.pg0.get_capture(len(pkts))
6238 for packet in capture:
6240 self.assertEqual(packet[IPv6].src, ip.src)
6241 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6242 icmp = packet[ICMPv6DestUnreach]
6243 self.assertEqual(icmp.code, 1)
6244 inner = icmp[IPerror6]
6245 self.assertEqual(inner.src, self.pg0.remote_ip6)
6246 self.assertEqual(inner.dst, ip.src)
6247 self.assert_icmpv6_checksum_valid(packet)
6248 if inner.haslayer(TCPerror):
6249 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6250 elif inner.haslayer(UDPerror):
6251 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6253 self.assertEqual(inner[ICMPv6EchoRequest].id,
6256 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6259 def test_hairpinning(self):
6260 """ NAT64 hairpinning """
6262 client = self.pg0.remote_hosts[0]
6263 server = self.pg0.remote_hosts[1]
6264 server_tcp_in_port = 22
6265 server_tcp_out_port = 4022
6266 server_udp_in_port = 23
6267 server_udp_out_port = 4023
6268 client_tcp_in_port = 1234
6269 client_udp_in_port = 1235
6270 client_tcp_out_port = 0
6271 client_udp_out_port = 0
6272 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6273 nat_addr_ip6 = ip.src
6275 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6277 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6278 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6280 self.vapi.nat64_add_del_static_bib(server.ip6n,
6283 server_tcp_out_port,
6285 self.vapi.nat64_add_del_static_bib(server.ip6n,
6288 server_udp_out_port,
6293 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6294 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6295 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6297 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6298 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6299 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6301 self.pg0.add_stream(pkts)
6302 self.pg_enable_capture(self.pg_interfaces)
6304 capture = self.pg0.get_capture(len(pkts))
6305 for packet in capture:
6307 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6308 self.assertEqual(packet[IPv6].dst, server.ip6)
6309 self.assert_packet_checksums_valid(packet)
6310 if packet.haslayer(TCP):
6311 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6312 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6313 client_tcp_out_port = packet[TCP].sport
6315 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6316 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6317 client_udp_out_port = packet[UDP].sport
6319 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6324 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6325 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6326 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6328 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6329 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6330 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6332 self.pg0.add_stream(pkts)
6333 self.pg_enable_capture(self.pg_interfaces)
6335 capture = self.pg0.get_capture(len(pkts))
6336 for packet in capture:
6338 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6339 self.assertEqual(packet[IPv6].dst, client.ip6)
6340 self.assert_packet_checksums_valid(packet)
6341 if packet.haslayer(TCP):
6342 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6343 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6345 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6346 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6348 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6353 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6354 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6355 ICMPv6DestUnreach(code=1) /
6356 packet[IPv6] for packet in capture]
6357 self.pg0.add_stream(pkts)
6358 self.pg_enable_capture(self.pg_interfaces)
6360 capture = self.pg0.get_capture(len(pkts))
6361 for packet in capture:
6363 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6364 self.assertEqual(packet[IPv6].dst, server.ip6)
6365 icmp = packet[ICMPv6DestUnreach]
6366 self.assertEqual(icmp.code, 1)
6367 inner = icmp[IPerror6]
6368 self.assertEqual(inner.src, server.ip6)
6369 self.assertEqual(inner.dst, nat_addr_ip6)
6370 self.assert_packet_checksums_valid(packet)
6371 if inner.haslayer(TCPerror):
6372 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6373 self.assertEqual(inner[TCPerror].dport,
6374 client_tcp_out_port)
6376 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6377 self.assertEqual(inner[UDPerror].dport,
6378 client_udp_out_port)
6380 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6383 def test_prefix(self):
6384 """ NAT64 Network-Specific Prefix """
6386 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6388 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6389 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6390 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6391 self.vrf1_nat_addr_n,
6392 vrf_id=self.vrf1_id)
6393 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6396 global_pref64 = "2001:db8::"
6397 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6398 global_pref64_len = 32
6399 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6401 prefix = self.vapi.nat64_prefix_dump()
6402 self.assertEqual(len(prefix), 1)
6403 self.assertEqual(prefix[0].prefix, global_pref64_n)
6404 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6405 self.assertEqual(prefix[0].vrf_id, 0)
6407 # Add tenant specific prefix
6408 vrf1_pref64 = "2001:db8:122:300::"
6409 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6410 vrf1_pref64_len = 56
6411 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6413 vrf_id=self.vrf1_id)
6414 prefix = self.vapi.nat64_prefix_dump()
6415 self.assertEqual(len(prefix), 2)
6418 pkts = self.create_stream_in_ip6(self.pg0,
6421 plen=global_pref64_len)
6422 self.pg0.add_stream(pkts)
6423 self.pg_enable_capture(self.pg_interfaces)
6425 capture = self.pg1.get_capture(len(pkts))
6426 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6427 dst_ip=self.pg1.remote_ip4)
6429 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6430 self.pg1.add_stream(pkts)
6431 self.pg_enable_capture(self.pg_interfaces)
6433 capture = self.pg0.get_capture(len(pkts))
6434 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6437 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6439 # Tenant specific prefix
6440 pkts = self.create_stream_in_ip6(self.pg2,
6443 plen=vrf1_pref64_len)
6444 self.pg2.add_stream(pkts)
6445 self.pg_enable_capture(self.pg_interfaces)
6447 capture = self.pg1.get_capture(len(pkts))
6448 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6449 dst_ip=self.pg1.remote_ip4)
6451 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6452 self.pg1.add_stream(pkts)
6453 self.pg_enable_capture(self.pg_interfaces)
6455 capture = self.pg2.get_capture(len(pkts))
6456 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6459 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6461 def test_unknown_proto(self):
6462 """ NAT64 translate packet with unknown protocol """
6464 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6466 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6467 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6468 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6471 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6472 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6473 TCP(sport=self.tcp_port_in, dport=20))
6474 self.pg0.add_stream(p)
6475 self.pg_enable_capture(self.pg_interfaces)
6477 p = self.pg1.get_capture(1)
6479 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6480 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6482 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6483 TCP(sport=1234, dport=1234))
6484 self.pg0.add_stream(p)
6485 self.pg_enable_capture(self.pg_interfaces)
6487 p = self.pg1.get_capture(1)
6490 self.assertEqual(packet[IP].src, self.nat_addr)
6491 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6492 self.assertTrue(packet.haslayer(GRE))
6493 self.assert_packet_checksums_valid(packet)
6495 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6499 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6500 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6502 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6503 TCP(sport=1234, dport=1234))
6504 self.pg1.add_stream(p)
6505 self.pg_enable_capture(self.pg_interfaces)
6507 p = self.pg0.get_capture(1)
6510 self.assertEqual(packet[IPv6].src, remote_ip6)
6511 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6512 self.assertEqual(packet[IPv6].nh, 47)
6514 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6517 def test_hairpinning_unknown_proto(self):
6518 """ NAT64 translate packet with unknown protocol - hairpinning """
6520 client = self.pg0.remote_hosts[0]
6521 server = self.pg0.remote_hosts[1]
6522 server_tcp_in_port = 22
6523 server_tcp_out_port = 4022
6524 client_tcp_in_port = 1234
6525 client_tcp_out_port = 1235
6526 server_nat_ip = "10.0.0.100"
6527 client_nat_ip = "10.0.0.110"
6528 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6529 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6530 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6531 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6533 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6535 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6536 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6538 self.vapi.nat64_add_del_static_bib(server.ip6n,
6541 server_tcp_out_port,
6544 self.vapi.nat64_add_del_static_bib(server.ip6n,
6550 self.vapi.nat64_add_del_static_bib(client.ip6n,
6553 client_tcp_out_port,
6557 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6558 IPv6(src=client.ip6, dst=server_nat_ip6) /
6559 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6560 self.pg0.add_stream(p)
6561 self.pg_enable_capture(self.pg_interfaces)
6563 p = self.pg0.get_capture(1)
6565 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6566 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6568 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6569 TCP(sport=1234, dport=1234))
6570 self.pg0.add_stream(p)
6571 self.pg_enable_capture(self.pg_interfaces)
6573 p = self.pg0.get_capture(1)
6576 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6577 self.assertEqual(packet[IPv6].dst, server.ip6)
6578 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6580 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6584 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6585 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6587 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6588 TCP(sport=1234, dport=1234))
6589 self.pg0.add_stream(p)
6590 self.pg_enable_capture(self.pg_interfaces)
6592 p = self.pg0.get_capture(1)
6595 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6596 self.assertEqual(packet[IPv6].dst, client.ip6)
6597 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6599 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6602 def test_one_armed_nat64(self):
6603 """ One armed NAT64 """
6605 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6609 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6611 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6612 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6615 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6616 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6617 TCP(sport=12345, dport=80))
6618 self.pg3.add_stream(p)
6619 self.pg_enable_capture(self.pg_interfaces)
6621 capture = self.pg3.get_capture(1)
6626 self.assertEqual(ip.src, self.nat_addr)
6627 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6628 self.assertNotEqual(tcp.sport, 12345)
6629 external_port = tcp.sport
6630 self.assertEqual(tcp.dport, 80)
6631 self.assert_packet_checksums_valid(p)
6633 self.logger.error(ppp("Unexpected or invalid packet:", p))
6637 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6638 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6639 TCP(sport=80, dport=external_port))
6640 self.pg3.add_stream(p)
6641 self.pg_enable_capture(self.pg_interfaces)
6643 capture = self.pg3.get_capture(1)
6648 self.assertEqual(ip.src, remote_host_ip6)
6649 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6650 self.assertEqual(tcp.sport, 80)
6651 self.assertEqual(tcp.dport, 12345)
6652 self.assert_packet_checksums_valid(p)
6654 self.logger.error(ppp("Unexpected or invalid packet:", p))
6657 def test_frag_in_order(self):
6658 """ NAT64 translate fragments arriving in order """
6659 self.tcp_port_in = random.randint(1025, 65535)
6661 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6663 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6664 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6666 reass = self.vapi.nat_reass_dump()
6667 reass_n_start = len(reass)
6671 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6672 self.tcp_port_in, 20, data)
6673 self.pg0.add_stream(pkts)
6674 self.pg_enable_capture(self.pg_interfaces)
6676 frags = self.pg1.get_capture(len(pkts))
6677 p = self.reass_frags_and_verify(frags,
6679 self.pg1.remote_ip4)
6680 self.assertEqual(p[TCP].dport, 20)
6681 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6682 self.tcp_port_out = p[TCP].sport
6683 self.assertEqual(data, p[Raw].load)
6686 data = "A" * 4 + "b" * 16 + "C" * 3
6687 pkts = self.create_stream_frag(self.pg1,
6692 self.pg1.add_stream(pkts)
6693 self.pg_enable_capture(self.pg_interfaces)
6695 frags = self.pg0.get_capture(len(pkts))
6696 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6697 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6698 self.assertEqual(p[TCP].sport, 20)
6699 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6700 self.assertEqual(data, p[Raw].load)
6702 reass = self.vapi.nat_reass_dump()
6703 reass_n_end = len(reass)
6705 self.assertEqual(reass_n_end - reass_n_start, 2)
6707 def test_reass_hairpinning(self):
6708 """ NAT64 fragments hairpinning """
6710 server = self.pg0.remote_hosts[1]
6711 server_in_port = random.randint(1025, 65535)
6712 server_out_port = random.randint(1025, 65535)
6713 client_in_port = random.randint(1025, 65535)
6714 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6715 nat_addr_ip6 = ip.src
6717 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6719 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6720 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6722 # add static BIB entry for server
6723 self.vapi.nat64_add_del_static_bib(server.ip6n,
6729 # send packet from host to server
6730 pkts = self.create_stream_frag_ip6(self.pg0,
6735 self.pg0.add_stream(pkts)
6736 self.pg_enable_capture(self.pg_interfaces)
6738 frags = self.pg0.get_capture(len(pkts))
6739 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6740 self.assertNotEqual(p[TCP].sport, client_in_port)
6741 self.assertEqual(p[TCP].dport, server_in_port)
6742 self.assertEqual(data, p[Raw].load)
6744 def test_frag_out_of_order(self):
6745 """ NAT64 translate fragments arriving out of order """
6746 self.tcp_port_in = random.randint(1025, 65535)
6748 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6750 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6751 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6755 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6756 self.tcp_port_in, 20, data)
6758 self.pg0.add_stream(pkts)
6759 self.pg_enable_capture(self.pg_interfaces)
6761 frags = self.pg1.get_capture(len(pkts))
6762 p = self.reass_frags_and_verify(frags,
6764 self.pg1.remote_ip4)
6765 self.assertEqual(p[TCP].dport, 20)
6766 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6767 self.tcp_port_out = p[TCP].sport
6768 self.assertEqual(data, p[Raw].load)
6771 data = "A" * 4 + "B" * 16 + "C" * 3
6772 pkts = self.create_stream_frag(self.pg1,
6778 self.pg1.add_stream(pkts)
6779 self.pg_enable_capture(self.pg_interfaces)
6781 frags = self.pg0.get_capture(len(pkts))
6782 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6783 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6784 self.assertEqual(p[TCP].sport, 20)
6785 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6786 self.assertEqual(data, p[Raw].load)
6788 def test_interface_addr(self):
6789 """ Acquire NAT64 pool addresses from interface """
6790 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6792 # no address in NAT64 pool
6793 adresses = self.vapi.nat44_address_dump()
6794 self.assertEqual(0, len(adresses))
6796 # configure interface address and check NAT64 address pool
6797 self.pg4.config_ip4()
6798 addresses = self.vapi.nat64_pool_addr_dump()
6799 self.assertEqual(len(addresses), 1)
6800 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6802 # remove interface address and check NAT64 address pool
6803 self.pg4.unconfig_ip4()
6804 addresses = self.vapi.nat64_pool_addr_dump()
6805 self.assertEqual(0, len(adresses))
6807 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6808 def test_ipfix_max_bibs_sessions(self):
6809 """ IPFIX logging maximum session and BIB entries exceeded """
6812 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6816 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6818 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6819 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6823 for i in range(0, max_bibs):
6824 src = "fd01:aa::%x" % (i)
6825 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6826 IPv6(src=src, dst=remote_host_ip6) /
6827 TCP(sport=12345, dport=80))
6829 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6830 IPv6(src=src, dst=remote_host_ip6) /
6831 TCP(sport=12345, dport=22))
6833 self.pg0.add_stream(pkts)
6834 self.pg_enable_capture(self.pg_interfaces)
6836 self.pg1.get_capture(max_sessions)
6838 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6839 src_address=self.pg3.local_ip4n,
6841 template_interval=10)
6842 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6843 src_port=self.ipfix_src_port)
6845 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6846 IPv6(src=src, dst=remote_host_ip6) /
6847 TCP(sport=12345, dport=25))
6848 self.pg0.add_stream(p)
6849 self.pg_enable_capture(self.pg_interfaces)
6851 self.pg1.assert_nothing_captured()
6853 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6854 capture = self.pg3.get_capture(9)
6855 ipfix = IPFIXDecoder()
6856 # first load template
6858 self.assertTrue(p.haslayer(IPFIX))
6859 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6860 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6861 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6862 self.assertEqual(p[UDP].dport, 4739)
6863 self.assertEqual(p[IPFIX].observationDomainID,
6864 self.ipfix_domain_id)
6865 if p.haslayer(Template):
6866 ipfix.add_template(p.getlayer(Template))
6867 # verify events in data set
6869 if p.haslayer(Data):
6870 data = ipfix.decode_data_set(p.getlayer(Set))
6871 self.verify_ipfix_max_sessions(data, max_sessions)
6873 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6874 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6875 TCP(sport=12345, dport=80))
6876 self.pg0.add_stream(p)
6877 self.pg_enable_capture(self.pg_interfaces)
6879 self.pg1.assert_nothing_captured()
6881 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6882 capture = self.pg3.get_capture(1)
6883 # verify events in data set
6885 self.assertTrue(p.haslayer(IPFIX))
6886 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6887 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6888 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6889 self.assertEqual(p[UDP].dport, 4739)
6890 self.assertEqual(p[IPFIX].observationDomainID,
6891 self.ipfix_domain_id)
6892 if p.haslayer(Data):
6893 data = ipfix.decode_data_set(p.getlayer(Set))
6894 self.verify_ipfix_max_bibs(data, max_bibs)
6896 def test_ipfix_max_frags(self):
6897 """ IPFIX logging maximum fragments pending reassembly exceeded """
6898 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6900 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6901 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6902 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6903 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6904 src_address=self.pg3.local_ip4n,
6906 template_interval=10)
6907 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6908 src_port=self.ipfix_src_port)
6911 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6912 self.tcp_port_in, 20, data)
6913 self.pg0.add_stream(pkts[-1])
6914 self.pg_enable_capture(self.pg_interfaces)
6916 self.pg1.assert_nothing_captured()
6918 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6919 capture = self.pg3.get_capture(9)
6920 ipfix = IPFIXDecoder()
6921 # first load template
6923 self.assertTrue(p.haslayer(IPFIX))
6924 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6925 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6926 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6927 self.assertEqual(p[UDP].dport, 4739)
6928 self.assertEqual(p[IPFIX].observationDomainID,
6929 self.ipfix_domain_id)
6930 if p.haslayer(Template):
6931 ipfix.add_template(p.getlayer(Template))
6932 # verify events in data set
6934 if p.haslayer(Data):
6935 data = ipfix.decode_data_set(p.getlayer(Set))
6936 self.verify_ipfix_max_fragments_ip6(data, 0,
6937 self.pg0.remote_ip6n)
6939 def test_ipfix_bib_ses(self):
6940 """ IPFIX logging NAT64 BIB/session create and delete events """
6941 self.tcp_port_in = random.randint(1025, 65535)
6942 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6946 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6948 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6949 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6950 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6951 src_address=self.pg3.local_ip4n,
6953 template_interval=10)
6954 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6955 src_port=self.ipfix_src_port)
6958 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6959 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6960 TCP(sport=self.tcp_port_in, dport=25))
6961 self.pg0.add_stream(p)
6962 self.pg_enable_capture(self.pg_interfaces)
6964 p = self.pg1.get_capture(1)
6965 self.tcp_port_out = p[0][TCP].sport
6966 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6967 capture = self.pg3.get_capture(10)
6968 ipfix = IPFIXDecoder()
6969 # first load template
6971 self.assertTrue(p.haslayer(IPFIX))
6972 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6973 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6974 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6975 self.assertEqual(p[UDP].dport, 4739)
6976 self.assertEqual(p[IPFIX].observationDomainID,
6977 self.ipfix_domain_id)
6978 if p.haslayer(Template):
6979 ipfix.add_template(p.getlayer(Template))
6980 # verify events in data set
6982 if p.haslayer(Data):
6983 data = ipfix.decode_data_set(p.getlayer(Set))
6984 if ord(data[0][230]) == 10:
6985 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6986 elif ord(data[0][230]) == 6:
6987 self.verify_ipfix_nat64_ses(data,
6989 self.pg0.remote_ip6n,
6990 self.pg1.remote_ip4,
6993 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6996 self.pg_enable_capture(self.pg_interfaces)
6997 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7000 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7001 capture = self.pg3.get_capture(2)
7002 # verify events in data set
7004 self.assertTrue(p.haslayer(IPFIX))
7005 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7006 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7007 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7008 self.assertEqual(p[UDP].dport, 4739)
7009 self.assertEqual(p[IPFIX].observationDomainID,
7010 self.ipfix_domain_id)
7011 if p.haslayer(Data):
7012 data = ipfix.decode_data_set(p.getlayer(Set))
7013 if ord(data[0][230]) == 11:
7014 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7015 elif ord(data[0][230]) == 7:
7016 self.verify_ipfix_nat64_ses(data,
7018 self.pg0.remote_ip6n,
7019 self.pg1.remote_ip4,
7022 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7024 def nat64_get_ses_num(self):
7026 Return number of active NAT64 sessions.
7028 st = self.vapi.nat64_st_dump()
7031 def clear_nat64(self):
7033 Clear NAT64 configuration.
7035 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7036 domain_id=self.ipfix_domain_id)
7037 self.ipfix_src_port = 4739
7038 self.ipfix_domain_id = 1
7040 self.vapi.nat_set_timeouts()
7042 interfaces = self.vapi.nat64_interface_dump()
7043 for intf in interfaces:
7044 if intf.is_inside > 1:
7045 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7048 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7052 bib = self.vapi.nat64_bib_dump(255)
7055 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7063 adresses = self.vapi.nat64_pool_addr_dump()
7064 for addr in adresses:
7065 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7070 prefixes = self.vapi.nat64_prefix_dump()
7071 for prefix in prefixes:
7072 self.vapi.nat64_add_del_prefix(prefix.prefix,
7074 vrf_id=prefix.vrf_id,
7078 super(TestNAT64, self).tearDown()
7079 if not self.vpp_dead:
7080 self.logger.info(self.vapi.cli("show nat64 pool"))
7081 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7082 self.logger.info(self.vapi.cli("show nat64 prefix"))
7083 self.logger.info(self.vapi.cli("show nat64 bib all"))
7084 self.logger.info(self.vapi.cli("show nat64 session table all"))
7085 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7089 class TestDSlite(MethodHolder):
7090 """ DS-Lite Test Cases """
7093 def setUpClass(cls):
7094 super(TestDSlite, cls).setUpClass()
7097 cls.nat_addr = '10.0.0.3'
7098 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7100 cls.create_pg_interfaces(range(2))
7102 cls.pg0.config_ip4()
7103 cls.pg0.resolve_arp()
7105 cls.pg1.config_ip6()
7106 cls.pg1.generate_remote_hosts(2)
7107 cls.pg1.configure_ipv6_neighbors()
7110 super(TestDSlite, cls).tearDownClass()
7113 def test_dslite(self):
7114 """ Test DS-Lite """
7115 nat_config = self.vapi.nat_show_config()
7116 self.assertEqual(0, nat_config.dslite_ce)
7118 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7120 aftr_ip4 = '192.0.0.1'
7121 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7122 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7123 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7124 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7127 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7128 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7129 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7130 UDP(sport=20000, dport=10000))
7131 self.pg1.add_stream(p)
7132 self.pg_enable_capture(self.pg_interfaces)
7134 capture = self.pg0.get_capture(1)
7135 capture = capture[0]
7136 self.assertFalse(capture.haslayer(IPv6))
7137 self.assertEqual(capture[IP].src, self.nat_addr)
7138 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7139 self.assertNotEqual(capture[UDP].sport, 20000)
7140 self.assertEqual(capture[UDP].dport, 10000)
7141 self.assert_packet_checksums_valid(capture)
7142 out_port = capture[UDP].sport
7144 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7145 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7146 UDP(sport=10000, dport=out_port))
7147 self.pg0.add_stream(p)
7148 self.pg_enable_capture(self.pg_interfaces)
7150 capture = self.pg1.get_capture(1)
7151 capture = capture[0]
7152 self.assertEqual(capture[IPv6].src, aftr_ip6)
7153 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7154 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7155 self.assertEqual(capture[IP].dst, '192.168.1.1')
7156 self.assertEqual(capture[UDP].sport, 10000)
7157 self.assertEqual(capture[UDP].dport, 20000)
7158 self.assert_packet_checksums_valid(capture)
7161 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7162 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7163 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7164 TCP(sport=20001, dport=10001))
7165 self.pg1.add_stream(p)
7166 self.pg_enable_capture(self.pg_interfaces)
7168 capture = self.pg0.get_capture(1)
7169 capture = capture[0]
7170 self.assertFalse(capture.haslayer(IPv6))
7171 self.assertEqual(capture[IP].src, self.nat_addr)
7172 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7173 self.assertNotEqual(capture[TCP].sport, 20001)
7174 self.assertEqual(capture[TCP].dport, 10001)
7175 self.assert_packet_checksums_valid(capture)
7176 out_port = capture[TCP].sport
7178 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7179 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7180 TCP(sport=10001, dport=out_port))
7181 self.pg0.add_stream(p)
7182 self.pg_enable_capture(self.pg_interfaces)
7184 capture = self.pg1.get_capture(1)
7185 capture = capture[0]
7186 self.assertEqual(capture[IPv6].src, aftr_ip6)
7187 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7188 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7189 self.assertEqual(capture[IP].dst, '192.168.1.1')
7190 self.assertEqual(capture[TCP].sport, 10001)
7191 self.assertEqual(capture[TCP].dport, 20001)
7192 self.assert_packet_checksums_valid(capture)
7195 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7196 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7197 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7198 ICMP(id=4000, type='echo-request'))
7199 self.pg1.add_stream(p)
7200 self.pg_enable_capture(self.pg_interfaces)
7202 capture = self.pg0.get_capture(1)
7203 capture = capture[0]
7204 self.assertFalse(capture.haslayer(IPv6))
7205 self.assertEqual(capture[IP].src, self.nat_addr)
7206 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7207 self.assertNotEqual(capture[ICMP].id, 4000)
7208 self.assert_packet_checksums_valid(capture)
7209 out_id = capture[ICMP].id
7211 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7212 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7213 ICMP(id=out_id, type='echo-reply'))
7214 self.pg0.add_stream(p)
7215 self.pg_enable_capture(self.pg_interfaces)
7217 capture = self.pg1.get_capture(1)
7218 capture = capture[0]
7219 self.assertEqual(capture[IPv6].src, aftr_ip6)
7220 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7221 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7222 self.assertEqual(capture[IP].dst, '192.168.1.1')
7223 self.assertEqual(capture[ICMP].id, 4000)
7224 self.assert_packet_checksums_valid(capture)
7226 # ping DS-Lite AFTR tunnel endpoint address
7227 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7228 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7229 ICMPv6EchoRequest())
7230 self.pg1.add_stream(p)
7231 self.pg_enable_capture(self.pg_interfaces)
7233 capture = self.pg1.get_capture(1)
7234 self.assertEqual(1, len(capture))
7235 capture = capture[0]
7236 self.assertEqual(capture[IPv6].src, aftr_ip6)
7237 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7238 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7241 super(TestDSlite, self).tearDown()
7242 if not self.vpp_dead:
7243 self.logger.info(self.vapi.cli("show dslite pool"))
7245 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7246 self.logger.info(self.vapi.cli("show dslite sessions"))
7249 class TestDSliteCE(MethodHolder):
7250 """ DS-Lite CE Test Cases """
7253 def setUpConstants(cls):
7254 super(TestDSliteCE, cls).setUpConstants()
7255 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7258 def setUpClass(cls):
7259 super(TestDSliteCE, cls).setUpClass()
7262 cls.create_pg_interfaces(range(2))
7264 cls.pg0.config_ip4()
7265 cls.pg0.resolve_arp()
7267 cls.pg1.config_ip6()
7268 cls.pg1.generate_remote_hosts(1)
7269 cls.pg1.configure_ipv6_neighbors()
7272 super(TestDSliteCE, cls).tearDownClass()
7275 def test_dslite_ce(self):
7276 """ Test DS-Lite CE """
7278 nat_config = self.vapi.nat_show_config()
7279 self.assertEqual(1, nat_config.dslite_ce)
7281 b4_ip4 = '192.0.0.2'
7282 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7283 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7284 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7285 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7287 aftr_ip4 = '192.0.0.1'
7288 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7289 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7290 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7291 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7293 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7294 dst_address_length=128,
7295 next_hop_address=self.pg1.remote_ip6n,
7296 next_hop_sw_if_index=self.pg1.sw_if_index,
7300 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7301 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7302 UDP(sport=10000, dport=20000))
7303 self.pg0.add_stream(p)
7304 self.pg_enable_capture(self.pg_interfaces)
7306 capture = self.pg1.get_capture(1)
7307 capture = capture[0]
7308 self.assertEqual(capture[IPv6].src, b4_ip6)
7309 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7310 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7311 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7312 self.assertEqual(capture[UDP].sport, 10000)
7313 self.assertEqual(capture[UDP].dport, 20000)
7314 self.assert_packet_checksums_valid(capture)
7317 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7318 IPv6(dst=b4_ip6, src=aftr_ip6) /
7319 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7320 UDP(sport=20000, dport=10000))
7321 self.pg1.add_stream(p)
7322 self.pg_enable_capture(self.pg_interfaces)
7324 capture = self.pg0.get_capture(1)
7325 capture = capture[0]
7326 self.assertFalse(capture.haslayer(IPv6))
7327 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7328 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7329 self.assertEqual(capture[UDP].sport, 20000)
7330 self.assertEqual(capture[UDP].dport, 10000)
7331 self.assert_packet_checksums_valid(capture)
7333 # ping DS-Lite B4 tunnel endpoint address
7334 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7335 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7336 ICMPv6EchoRequest())
7337 self.pg1.add_stream(p)
7338 self.pg_enable_capture(self.pg_interfaces)
7340 capture = self.pg1.get_capture(1)
7341 self.assertEqual(1, len(capture))
7342 capture = capture[0]
7343 self.assertEqual(capture[IPv6].src, b4_ip6)
7344 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7345 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7348 super(TestDSliteCE, self).tearDown()
7349 if not self.vpp_dead:
7351 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7353 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7356 class TestNAT66(MethodHolder):
7357 """ NAT66 Test Cases """
7360 def setUpClass(cls):
7361 super(TestNAT66, cls).setUpClass()
7364 cls.nat_addr = 'fd01:ff::2'
7365 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7367 cls.create_pg_interfaces(range(2))
7368 cls.interfaces = list(cls.pg_interfaces)
7370 for i in cls.interfaces:
7373 i.configure_ipv6_neighbors()
7376 super(TestNAT66, cls).tearDownClass()
7379 def test_static(self):
7380 """ 1:1 NAT66 test """
7381 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7382 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7383 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7388 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7389 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7392 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7393 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7396 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7397 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7398 ICMPv6EchoRequest())
7400 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7401 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7402 GRE() / IP() / TCP())
7404 self.pg0.add_stream(pkts)
7405 self.pg_enable_capture(self.pg_interfaces)
7407 capture = self.pg1.get_capture(len(pkts))
7408 for packet in capture:
7410 self.assertEqual(packet[IPv6].src, self.nat_addr)
7411 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7412 self.assert_packet_checksums_valid(packet)
7414 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7419 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7420 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7423 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7424 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7427 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7428 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7431 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7432 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7433 GRE() / IP() / TCP())
7435 self.pg1.add_stream(pkts)
7436 self.pg_enable_capture(self.pg_interfaces)
7438 capture = self.pg0.get_capture(len(pkts))
7439 for packet in capture:
7441 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7442 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7443 self.assert_packet_checksums_valid(packet)
7445 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7448 sm = self.vapi.nat66_static_mapping_dump()
7449 self.assertEqual(len(sm), 1)
7450 self.assertEqual(sm[0].total_pkts, 8)
7452 def test_check_no_translate(self):
7453 """ NAT66 translate only when egress interface is outside interface """
7454 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7455 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7456 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7460 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7461 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7463 self.pg0.add_stream([p])
7464 self.pg_enable_capture(self.pg_interfaces)
7466 capture = self.pg1.get_capture(1)
7469 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7470 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7472 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7475 def clear_nat66(self):
7477 Clear NAT66 configuration.
7479 interfaces = self.vapi.nat66_interface_dump()
7480 for intf in interfaces:
7481 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7485 static_mappings = self.vapi.nat66_static_mapping_dump()
7486 for sm in static_mappings:
7487 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7488 sm.external_ip_address,
7493 super(TestNAT66, self).tearDown()
7494 if not self.vpp_dead:
7495 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7496 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7500 if __name__ == '__main__':
7501 unittest.main(testRunner=VppTestRunner)