9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
141 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
142 local_port=0, external_port=0, vrf_id=0,
143 is_add=1, external_sw_if_index=0xFFFFFFFF,
144 proto=0, twice_nat=0, self_twice_nat=0,
145 out2in_only=0, tag=""):
147 Add/delete NAT44 static mapping
149 :param local_ip: Local IP address
150 :param external_ip: External IP address
151 :param local_port: Local port number (Optional)
152 :param external_port: External port number (Optional)
153 :param vrf_id: VRF ID (Default 0)
154 :param is_add: 1 if add, 0 if delete (Default add)
155 :param external_sw_if_index: External interface instead of IP address
156 :param proto: IP protocol (Mandatory if port specified)
157 :param twice_nat: 1 if translate external host address and port
158 :param self_twice_nat: 1 if translate external host address and port
159 whenever external host address equals
160 local address of internal host
161 :param out2in_only: if 1 rule is matching only out2in direction
162 :param tag: Opaque string tag
165 if local_port and external_port:
167 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
168 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
169 self.vapi.nat44_add_del_static_mapping(
172 external_sw_if_index,
184 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
186 Add/delete NAT44 address
188 :param ip: IP address
189 :param is_add: 1 if add, 0 if delete (Default add)
190 :param twice_nat: twice NAT address for extenal hosts
192 nat_addr = socket.inet_pton(socket.AF_INET, ip)
193 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
197 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
199 Create packet stream for inside network
201 :param in_if: Inside interface
202 :param out_if: Outside interface
203 :param dst_ip: Destination address
204 :param ttl: TTL of generated packets
207 dst_ip = out_if.remote_ip4
211 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
212 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 TCP(sport=self.tcp_port_in, dport=20))
217 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
218 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 UDP(sport=self.udp_port_in, dport=20))
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
225 ICMP(id=self.icmp_id_in, type='echo-request'))
230 def compose_ip6(self, ip4, pref, plen):
232 Compose IPv4-embedded IPv6 addresses
234 :param ip4: IPv4 address
235 :param pref: IPv6 prefix
236 :param plen: IPv6 prefix length
237 :returns: IPv4-embedded IPv6 addresses
239 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
240 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
255 pref_n[10] = ip4_n[3]
259 pref_n[10] = ip4_n[2]
260 pref_n[11] = ip4_n[3]
263 pref_n[10] = ip4_n[1]
264 pref_n[11] = ip4_n[2]
265 pref_n[12] = ip4_n[3]
267 pref_n[12] = ip4_n[0]
268 pref_n[13] = ip4_n[1]
269 pref_n[14] = ip4_n[2]
270 pref_n[15] = ip4_n[3]
271 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
273 def extract_ip4(self, ip6, plen):
275 Extract IPv4 address embedded in IPv6 addresses
277 :param ip6: IPv6 address
278 :param plen: IPv6 prefix length
279 :returns: extracted IPv4 address
281 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
313 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
315 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
317 Create IPv6 packet stream for inside network
319 :param in_if: Inside interface
320 :param out_if: Outside interface
321 :param ttl: Hop Limit of generated packets
322 :param pref: NAT64 prefix
323 :param plen: NAT64 prefix length
327 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
329 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
333 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
334 TCP(sport=self.tcp_port_in, dport=20))
338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
339 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
340 UDP(sport=self.udp_port_in, dport=20))
344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
345 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
346 ICMPv6EchoRequest(id=self.icmp_id_in))
351 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
352 use_inside_ports=False):
354 Create packet stream for outside network
356 :param out_if: Outside interface
357 :param dst_ip: Destination IP address (Default use global NAT address)
358 :param ttl: TTL of generated packets
359 :param use_inside_ports: Use inside NAT ports as destination ports
360 instead of outside ports
363 dst_ip = self.nat_addr
364 if not use_inside_ports:
365 tcp_port = self.tcp_port_out
366 udp_port = self.udp_port_out
367 icmp_id = self.icmp_id_out
369 tcp_port = self.tcp_port_in
370 udp_port = self.udp_port_in
371 icmp_id = self.icmp_id_in
374 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
375 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
376 TCP(dport=tcp_port, sport=20))
380 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
381 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
382 UDP(dport=udp_port, sport=20))
386 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
387 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
388 ICMP(id=icmp_id, type='echo-reply'))
393 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
395 Create packet stream for outside network
397 :param out_if: Outside interface
398 :param dst_ip: Destination IP address (Default use global NAT address)
399 :param hl: HL of generated packets
403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
404 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
405 TCP(dport=self.tcp_port_out, sport=20))
409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
410 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
411 UDP(dport=self.udp_port_out, sport=20))
415 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
416 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
417 ICMPv6EchoReply(id=self.icmp_id_out))
422 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
423 packet_num=3, dst_ip=None, is_ip6=False):
425 Verify captured packets on outside network
427 :param capture: Captured packets
428 :param nat_ip: Translated IP address (Default use global NAT address)
429 :param same_port: Sorce port number is not translated (Default False)
430 :param packet_num: Expected number of packets (Default 3)
431 :param dst_ip: Destination IP address (Default do not verify)
432 :param is_ip6: If L3 protocol is IPv6 (Default False)
436 ICMP46 = ICMPv6EchoRequest
441 nat_ip = self.nat_addr
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
478 packet_num=3, dst_ip=None):
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param packet_num: Expected number of packets (Default 3)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
491 def verify_capture_in(self, capture, in_if, packet_num=3):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
497 :param packet_num: Expected number of packets (Default 3)
499 self.assertEqual(packet_num, len(capture))
500 for packet in capture:
502 self.assert_packet_checksums_valid(packet)
503 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
504 if packet.haslayer(TCP):
505 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
506 elif packet.haslayer(UDP):
507 self.assertEqual(packet[UDP].dport, self.udp_port_in)
509 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511 self.logger.error(ppp("Unexpected or invalid packet "
512 "(inside network):", packet))
515 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
517 Verify captured IPv6 packets on inside network
519 :param capture: Captured packets
520 :param src_ip: Source IP
521 :param dst_ip: Destination IP address
522 :param packet_num: Expected number of packets (Default 3)
524 self.assertEqual(packet_num, len(capture))
525 for packet in capture:
527 self.assertEqual(packet[IPv6].src, src_ip)
528 self.assertEqual(packet[IPv6].dst, dst_ip)
529 self.assert_packet_checksums_valid(packet)
530 if packet.haslayer(TCP):
531 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
532 elif packet.haslayer(UDP):
533 self.assertEqual(packet[UDP].dport, self.udp_port_in)
535 self.assertEqual(packet[ICMPv6EchoReply].id,
538 self.logger.error(ppp("Unexpected or invalid packet "
539 "(inside network):", packet))
542 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
544 Verify captured packet that don't have to be translated
546 :param capture: Captured packets
547 :param ingress_if: Ingress interface
548 :param egress_if: Egress interface
550 for packet in capture:
552 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
553 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
554 if packet.haslayer(TCP):
555 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
556 elif packet.haslayer(UDP):
557 self.assertEqual(packet[UDP].sport, self.udp_port_in)
559 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
561 self.logger.error(ppp("Unexpected or invalid packet "
562 "(inside network):", packet))
565 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
566 packet_num=3, icmp_type=11):
568 Verify captured packets with ICMP errors on outside network
570 :param capture: Captured packets
571 :param src_ip: Translated IP address or IP address of VPP
572 (Default use global NAT address)
573 :param packet_num: Expected number of packets (Default 3)
574 :param icmp_type: Type of error ICMP packet
575 we are expecting (Default 11)
578 src_ip = self.nat_addr
579 self.assertEqual(packet_num, len(capture))
580 for packet in capture:
582 self.assertEqual(packet[IP].src, src_ip)
583 self.assertTrue(packet.haslayer(ICMP))
585 self.assertEqual(icmp.type, icmp_type)
586 self.assertTrue(icmp.haslayer(IPerror))
587 inner_ip = icmp[IPerror]
588 if inner_ip.haslayer(TCPerror):
589 self.assertEqual(inner_ip[TCPerror].dport,
591 elif inner_ip.haslayer(UDPerror):
592 self.assertEqual(inner_ip[UDPerror].dport,
595 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
597 self.logger.error(ppp("Unexpected or invalid packet "
598 "(outside network):", packet))
601 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
604 Verify captured packets with ICMP errors on inside network
606 :param capture: Captured packets
607 :param in_if: Inside interface
608 :param packet_num: Expected number of packets (Default 3)
609 :param icmp_type: Type of error ICMP packet
610 we are expecting (Default 11)
612 self.assertEqual(packet_num, len(capture))
613 for packet in capture:
615 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
616 self.assertTrue(packet.haslayer(ICMP))
618 self.assertEqual(icmp.type, icmp_type)
619 self.assertTrue(icmp.haslayer(IPerror))
620 inner_ip = icmp[IPerror]
621 if inner_ip.haslayer(TCPerror):
622 self.assertEqual(inner_ip[TCPerror].sport,
624 elif inner_ip.haslayer(UDPerror):
625 self.assertEqual(inner_ip[UDPerror].sport,
628 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
630 self.logger.error(ppp("Unexpected or invalid packet "
631 "(inside network):", packet))
634 def create_stream_frag(self, src_if, dst, sport, dport, data):
636 Create fragmented packet stream
638 :param src_if: Source interface
639 :param dst: Destination IPv4 address
640 :param sport: Source TCP port
641 :param dport: Destination TCP port
642 :param data: Payload data
645 id = random.randint(0, 65535)
646 p = (IP(src=src_if.remote_ip4, dst=dst) /
647 TCP(sport=sport, dport=dport) /
649 p = p.__class__(str(p))
650 chksum = p['TCP'].chksum
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
654 TCP(sport=sport, dport=dport, chksum=chksum) /
657 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
658 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
659 proto=IP_PROTOS.tcp) /
662 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
663 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
669 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
670 pref=None, plen=0, frag_size=128):
672 Create fragmented packet stream
674 :param src_if: Source interface
675 :param dst: Destination IPv4 address
676 :param sport: Source TCP port
677 :param dport: Destination TCP port
678 :param data: Payload data
679 :param pref: NAT64 prefix
680 :param plen: NAT64 prefix length
681 :param fragsize: size of fragments
685 dst_ip6 = ''.join(['64:ff9b::', dst])
687 dst_ip6 = self.compose_ip6(dst, pref, plen)
689 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
690 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
691 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
692 TCP(sport=sport, dport=dport) /
695 return fragment6(p, frag_size)
697 def reass_frags_and_verify(self, frags, src, dst):
699 Reassemble and verify fragmented packet
701 :param frags: Captured fragments
702 :param src: Source IPv4 address to verify
703 :param dst: Destination IPv4 address to verify
705 :returns: Reassembled IPv4 packet
707 buffer = StringIO.StringIO()
709 self.assertEqual(p[IP].src, src)
710 self.assertEqual(p[IP].dst, dst)
711 self.assert_ip_checksum_valid(p)
712 buffer.seek(p[IP].frag * 8)
713 buffer.write(p[IP].payload)
714 ip = frags[0].getlayer(IP)
715 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
716 proto=frags[0][IP].proto)
717 if ip.proto == IP_PROTOS.tcp:
718 p = (ip / TCP(buffer.getvalue()))
719 self.assert_tcp_checksum_valid(p)
720 elif ip.proto == IP_PROTOS.udp:
721 p = (ip / UDP(buffer.getvalue()))
724 def reass_frags_and_verify_ip6(self, frags, src, dst):
726 Reassemble and verify fragmented packet
728 :param frags: Captured fragments
729 :param src: Source IPv6 address to verify
730 :param dst: Destination IPv6 address to verify
732 :returns: Reassembled IPv6 packet
734 buffer = StringIO.StringIO()
736 self.assertEqual(p[IPv6].src, src)
737 self.assertEqual(p[IPv6].dst, dst)
738 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
739 buffer.write(p[IPv6ExtHdrFragment].payload)
740 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
741 nh=frags[0][IPv6ExtHdrFragment].nh)
742 if ip.nh == IP_PROTOS.tcp:
743 p = (ip / TCP(buffer.getvalue()))
744 elif ip.nh == IP_PROTOS.udp:
745 p = (ip / UDP(buffer.getvalue()))
746 self.assert_packet_checksums_valid(p)
749 def initiate_tcp_session(self, in_if, out_if):
751 Initiates TCP session
753 :param in_if: Inside interface
754 :param out_if: Outside interface
758 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
759 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
760 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
763 self.pg_enable_capture(self.pg_interfaces)
765 capture = out_if.get_capture(1)
767 self.tcp_port_out = p[TCP].sport
769 # SYN + ACK packet out->in
770 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
771 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
772 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
775 self.pg_enable_capture(self.pg_interfaces)
780 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
782 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
785 self.pg_enable_capture(self.pg_interfaces)
787 out_if.get_capture(1)
790 self.logger.error("TCP 3 way handshake failed")
793 def verify_ipfix_nat44_ses(self, data):
795 Verify IPFIX NAT44 session create/delete event
797 :param data: Decoded IPFIX data records
799 nat44_ses_create_num = 0
800 nat44_ses_delete_num = 0
801 self.assertEqual(6, len(data))
804 self.assertIn(ord(record[230]), [4, 5])
805 if ord(record[230]) == 4:
806 nat44_ses_create_num += 1
808 nat44_ses_delete_num += 1
810 self.assertEqual(self.pg0.remote_ip4n, record[8])
811 # postNATSourceIPv4Address
812 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
815 self.assertEqual(struct.pack("!I", 0), record[234])
816 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
817 if IP_PROTOS.icmp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
819 self.assertEqual(struct.pack("!H", self.icmp_id_out),
821 elif IP_PROTOS.tcp == ord(record[4]):
822 self.assertEqual(struct.pack("!H", self.tcp_port_in),
824 self.assertEqual(struct.pack("!H", self.tcp_port_out),
826 elif IP_PROTOS.udp == ord(record[4]):
827 self.assertEqual(struct.pack("!H", self.udp_port_in),
829 self.assertEqual(struct.pack("!H", self.udp_port_out),
832 self.fail("Invalid protocol")
833 self.assertEqual(3, nat44_ses_create_num)
834 self.assertEqual(3, nat44_ses_delete_num)
836 def verify_ipfix_addr_exhausted(self, data):
838 Verify IPFIX NAT addresses event
840 :param data: Decoded IPFIX data records
842 self.assertEqual(1, len(data))
845 self.assertEqual(ord(record[230]), 3)
847 self.assertEqual(struct.pack("!I", 0), record[283])
849 def verify_ipfix_max_sessions(self, data, limit):
851 Verify IPFIX maximum session entries exceeded event
853 :param data: Decoded IPFIX data records
854 :param limit: Number of maximum session entries that can be created.
856 self.assertEqual(1, len(data))
859 self.assertEqual(ord(record[230]), 13)
860 # natQuotaExceededEvent
861 self.assertEqual(struct.pack("I", 1), record[466])
863 self.assertEqual(struct.pack("I", limit), record[471])
865 def verify_ipfix_max_bibs(self, data, limit):
867 Verify IPFIX maximum BIB entries exceeded event
869 :param data: Decoded IPFIX data records
870 :param limit: Number of maximum BIB entries that can be created.
872 self.assertEqual(1, len(data))
875 self.assertEqual(ord(record[230]), 13)
876 # natQuotaExceededEvent
877 self.assertEqual(struct.pack("I", 2), record[466])
879 self.assertEqual(struct.pack("I", limit), record[472])
881 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
883 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
885 :param data: Decoded IPFIX data records
886 :param limit: Number of maximum fragments pending reassembly
887 :param src_addr: IPv6 source address
889 self.assertEqual(1, len(data))
892 self.assertEqual(ord(record[230]), 13)
893 # natQuotaExceededEvent
894 self.assertEqual(struct.pack("I", 5), record[466])
895 # maxFragmentsPendingReassembly
896 self.assertEqual(struct.pack("I", limit), record[475])
898 self.assertEqual(src_addr, record[27])
900 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
902 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
904 :param data: Decoded IPFIX data records
905 :param limit: Number of maximum fragments pending reassembly
906 :param src_addr: IPv4 source address
908 self.assertEqual(1, len(data))
911 self.assertEqual(ord(record[230]), 13)
912 # natQuotaExceededEvent
913 self.assertEqual(struct.pack("I", 5), record[466])
914 # maxFragmentsPendingReassembly
915 self.assertEqual(struct.pack("I", limit), record[475])
917 self.assertEqual(src_addr, record[8])
919 def verify_ipfix_bib(self, data, is_create, src_addr):
921 Verify IPFIX NAT64 BIB create and delete events
923 :param data: Decoded IPFIX data records
924 :param is_create: Create event if nonzero value otherwise delete event
925 :param src_addr: IPv6 source address
927 self.assertEqual(1, len(data))
931 self.assertEqual(ord(record[230]), 10)
933 self.assertEqual(ord(record[230]), 11)
935 self.assertEqual(src_addr, record[27])
936 # postNATSourceIPv4Address
937 self.assertEqual(self.nat_addr_n, record[225])
939 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
941 self.assertEqual(struct.pack("!I", 0), record[234])
942 # sourceTransportPort
943 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
944 # postNAPTSourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
947 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
950 Verify IPFIX NAT64 session create and delete events
952 :param data: Decoded IPFIX data records
953 :param is_create: Create event if nonzero value otherwise delete event
954 :param src_addr: IPv6 source address
955 :param dst_addr: IPv4 destination address
956 :param dst_port: destination TCP port
958 self.assertEqual(1, len(data))
962 self.assertEqual(ord(record[230]), 6)
964 self.assertEqual(ord(record[230]), 7)
966 self.assertEqual(src_addr, record[27])
967 # destinationIPv6Address
968 self.assertEqual(socket.inet_pton(socket.AF_INET6,
969 self.compose_ip6(dst_addr,
973 # postNATSourceIPv4Address
974 self.assertEqual(self.nat_addr_n, record[225])
975 # postNATDestinationIPv4Address
976 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
979 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
981 self.assertEqual(struct.pack("!I", 0), record[234])
982 # sourceTransportPort
983 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
984 # postNAPTSourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
986 # destinationTransportPort
987 self.assertEqual(struct.pack("!H", dst_port), record[11])
988 # postNAPTDestinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[228])
991 def verify_no_nat44_user(self):
992 """ Verify that there is no NAT44 user """
993 users = self.vapi.nat44_user_dump()
994 self.assertEqual(len(users), 0)
997 class TestNAT44(MethodHolder):
998 """ NAT44 Test Cases """
1001 def setUpClass(cls):
1002 super(TestNAT44, cls).setUpClass()
1003 cls.vapi.cli("set log class nat level debug")
1006 cls.tcp_port_in = 6303
1007 cls.tcp_port_out = 6303
1008 cls.udp_port_in = 6304
1009 cls.udp_port_out = 6304
1010 cls.icmp_id_in = 6305
1011 cls.icmp_id_out = 6305
1012 cls.nat_addr = '10.0.0.3'
1013 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1014 cls.ipfix_src_port = 4739
1015 cls.ipfix_domain_id = 1
1016 cls.tcp_external_port = 80
1018 cls.create_pg_interfaces(range(10))
1019 cls.interfaces = list(cls.pg_interfaces[0:4])
1021 for i in cls.interfaces:
1026 cls.pg0.generate_remote_hosts(3)
1027 cls.pg0.configure_ipv4_neighbors()
1029 cls.pg1.generate_remote_hosts(1)
1030 cls.pg1.configure_ipv4_neighbors()
1032 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1033 cls.vapi.ip_table_add_del(10, is_add=1)
1034 cls.vapi.ip_table_add_del(20, is_add=1)
1036 cls.pg4._local_ip4 = "172.16.255.1"
1037 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1038 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1039 cls.pg4.set_table_ip4(10)
1040 cls.pg5._local_ip4 = "172.17.255.3"
1041 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1042 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1043 cls.pg5.set_table_ip4(10)
1044 cls.pg6._local_ip4 = "172.16.255.1"
1045 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1046 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1047 cls.pg6.set_table_ip4(20)
1048 for i in cls.overlapping_interfaces:
1056 cls.pg9.generate_remote_hosts(2)
1057 cls.pg9.config_ip4()
1058 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1059 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1063 cls.pg9.resolve_arp()
1064 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1065 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1066 cls.pg9.resolve_arp()
1069 super(TestNAT44, cls).tearDownClass()
1072 def test_dynamic(self):
1073 """ NAT44 dynamic translation test """
1075 self.nat44_add_address(self.nat_addr)
1076 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1077 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1081 pkts = self.create_stream_in(self.pg0, self.pg1)
1082 self.pg0.add_stream(pkts)
1083 self.pg_enable_capture(self.pg_interfaces)
1085 capture = self.pg1.get_capture(len(pkts))
1086 self.verify_capture_out(capture)
1089 pkts = self.create_stream_out(self.pg1)
1090 self.pg1.add_stream(pkts)
1091 self.pg_enable_capture(self.pg_interfaces)
1093 capture = self.pg0.get_capture(len(pkts))
1094 self.verify_capture_in(capture, self.pg0)
1096 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1097 """ NAT44 handling of client packets with TTL=1 """
1099 self.nat44_add_address(self.nat_addr)
1100 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1101 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1104 # Client side - generate traffic
1105 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1106 self.pg0.add_stream(pkts)
1107 self.pg_enable_capture(self.pg_interfaces)
1110 # Client side - verify ICMP type 11 packets
1111 capture = self.pg0.get_capture(len(pkts))
1112 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1114 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1115 """ NAT44 handling of server packets with TTL=1 """
1117 self.nat44_add_address(self.nat_addr)
1118 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1119 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1122 # Client side - create sessions
1123 pkts = self.create_stream_in(self.pg0, self.pg1)
1124 self.pg0.add_stream(pkts)
1125 self.pg_enable_capture(self.pg_interfaces)
1128 # Server side - generate traffic
1129 capture = self.pg1.get_capture(len(pkts))
1130 self.verify_capture_out(capture)
1131 pkts = self.create_stream_out(self.pg1, ttl=1)
1132 self.pg1.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1136 # Server side - verify ICMP type 11 packets
1137 capture = self.pg1.get_capture(len(pkts))
1138 self.verify_capture_out_with_icmp_errors(capture,
1139 src_ip=self.pg1.local_ip4)
1141 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1142 """ NAT44 handling of error responses to client packets with TTL=2 """
1144 self.nat44_add_address(self.nat_addr)
1145 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1149 # Client side - generate traffic
1150 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1151 self.pg0.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1155 # Server side - simulate ICMP type 11 response
1156 capture = self.pg1.get_capture(len(pkts))
1157 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1158 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1159 ICMP(type=11) / packet[IP] for packet in capture]
1160 self.pg1.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Client side - verify ICMP type 11 packets
1165 capture = self.pg0.get_capture(len(pkts))
1166 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1168 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1169 """ NAT44 handling of error responses to server packets with TTL=2 """
1171 self.nat44_add_address(self.nat_addr)
1172 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1173 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1176 # Client side - create sessions
1177 pkts = self.create_stream_in(self.pg0, self.pg1)
1178 self.pg0.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Server side - generate traffic
1183 capture = self.pg1.get_capture(len(pkts))
1184 self.verify_capture_out(capture)
1185 pkts = self.create_stream_out(self.pg1, ttl=2)
1186 self.pg1.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1190 # Client side - simulate ICMP type 11 response
1191 capture = self.pg0.get_capture(len(pkts))
1192 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1193 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1194 ICMP(type=11) / packet[IP] for packet in capture]
1195 self.pg0.add_stream(pkts)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 # Server side - verify ICMP type 11 packets
1200 capture = self.pg1.get_capture(len(pkts))
1201 self.verify_capture_out_with_icmp_errors(capture)
1203 def test_ping_out_interface_from_outside(self):
1204 """ Ping NAT44 out interface from outside network """
1206 self.nat44_add_address(self.nat_addr)
1207 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1208 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1211 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1212 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1213 ICMP(id=self.icmp_id_out, type='echo-request'))
1215 self.pg1.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1218 capture = self.pg1.get_capture(len(pkts))
1219 self.assertEqual(1, len(capture))
1222 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1223 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1224 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1225 self.assertEqual(packet[ICMP].type, 0) # echo reply
1227 self.logger.error(ppp("Unexpected or invalid packet "
1228 "(outside network):", packet))
1231 def test_ping_internal_host_from_outside(self):
1232 """ Ping internal host from outside network """
1234 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1235 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1236 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1240 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1241 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1242 ICMP(id=self.icmp_id_out, type='echo-request'))
1243 self.pg1.add_stream(pkt)
1244 self.pg_enable_capture(self.pg_interfaces)
1246 capture = self.pg0.get_capture(1)
1247 self.verify_capture_in(capture, self.pg0, packet_num=1)
1248 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1251 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1252 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1253 ICMP(id=self.icmp_id_in, type='echo-reply'))
1254 self.pg0.add_stream(pkt)
1255 self.pg_enable_capture(self.pg_interfaces)
1257 capture = self.pg1.get_capture(1)
1258 self.verify_capture_out(capture, same_port=True, packet_num=1)
1259 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1261 def test_forwarding(self):
1262 """ NAT44 forwarding test """
1264 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1265 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1267 self.vapi.nat44_forwarding_enable_disable(1)
1269 real_ip = self.pg0.remote_ip4n
1270 alias_ip = self.nat_addr_n
1271 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1272 external_ip=alias_ip)
1275 # static mapping match
1277 pkts = self.create_stream_out(self.pg1)
1278 self.pg1.add_stream(pkts)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg0.get_capture(len(pkts))
1282 self.verify_capture_in(capture, self.pg0)
1284 pkts = self.create_stream_in(self.pg0, self.pg1)
1285 self.pg0.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg1.get_capture(len(pkts))
1289 self.verify_capture_out(capture, same_port=True)
1291 # no static mapping match
1293 host0 = self.pg0.remote_hosts[0]
1294 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1296 pkts = self.create_stream_out(self.pg1,
1297 dst_ip=self.pg0.remote_ip4,
1298 use_inside_ports=True)
1299 self.pg1.add_stream(pkts)
1300 self.pg_enable_capture(self.pg_interfaces)
1302 capture = self.pg0.get_capture(len(pkts))
1303 self.verify_capture_in(capture, self.pg0)
1305 pkts = self.create_stream_in(self.pg0, self.pg1)
1306 self.pg0.add_stream(pkts)
1307 self.pg_enable_capture(self.pg_interfaces)
1309 capture = self.pg1.get_capture(len(pkts))
1310 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1313 self.pg0.remote_hosts[0] = host0
1316 self.vapi.nat44_forwarding_enable_disable(0)
1317 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318 external_ip=alias_ip,
1321 def test_static_in(self):
1322 """ 1:1 NAT initialized from inside network """
1324 nat_ip = "10.0.0.10"
1325 self.tcp_port_out = 6303
1326 self.udp_port_out = 6304
1327 self.icmp_id_out = 6305
1329 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1330 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1331 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1333 sm = self.vapi.nat44_static_mapping_dump()
1334 self.assertEqual(len(sm), 1)
1335 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1336 self.assertEqual(sm[0].protocol, 0)
1337 self.assertEqual(sm[0].local_port, 0)
1338 self.assertEqual(sm[0].external_port, 0)
1341 pkts = self.create_stream_in(self.pg0, self.pg1)
1342 self.pg0.add_stream(pkts)
1343 self.pg_enable_capture(self.pg_interfaces)
1345 capture = self.pg1.get_capture(len(pkts))
1346 self.verify_capture_out(capture, nat_ip, True)
1349 pkts = self.create_stream_out(self.pg1, nat_ip)
1350 self.pg1.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg0.get_capture(len(pkts))
1354 self.verify_capture_in(capture, self.pg0)
1356 def test_static_out(self):
1357 """ 1:1 NAT initialized from outside network """
1359 nat_ip = "10.0.0.20"
1360 self.tcp_port_out = 6303
1361 self.udp_port_out = 6304
1362 self.icmp_id_out = 6305
1365 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1366 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1367 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1369 sm = self.vapi.nat44_static_mapping_dump()
1370 self.assertEqual(len(sm), 1)
1371 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1374 pkts = self.create_stream_out(self.pg1, nat_ip)
1375 self.pg1.add_stream(pkts)
1376 self.pg_enable_capture(self.pg_interfaces)
1378 capture = self.pg0.get_capture(len(pkts))
1379 self.verify_capture_in(capture, self.pg0)
1382 pkts = self.create_stream_in(self.pg0, self.pg1)
1383 self.pg0.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg1.get_capture(len(pkts))
1387 self.verify_capture_out(capture, nat_ip, True)
1389 def test_static_with_port_in(self):
1390 """ 1:1 NAPT initialized from inside network """
1392 self.tcp_port_out = 3606
1393 self.udp_port_out = 3607
1394 self.icmp_id_out = 3608
1396 self.nat44_add_address(self.nat_addr)
1397 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1398 self.tcp_port_in, self.tcp_port_out,
1399 proto=IP_PROTOS.tcp)
1400 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1401 self.udp_port_in, self.udp_port_out,
1402 proto=IP_PROTOS.udp)
1403 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1404 self.icmp_id_in, self.icmp_id_out,
1405 proto=IP_PROTOS.icmp)
1406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1411 pkts = self.create_stream_in(self.pg0, self.pg1)
1412 self.pg0.add_stream(pkts)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg1.get_capture(len(pkts))
1416 self.verify_capture_out(capture)
1419 pkts = self.create_stream_out(self.pg1)
1420 self.pg1.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg0.get_capture(len(pkts))
1424 self.verify_capture_in(capture, self.pg0)
1426 def test_static_with_port_out(self):
1427 """ 1:1 NAPT initialized from outside network """
1429 self.tcp_port_out = 30606
1430 self.udp_port_out = 30607
1431 self.icmp_id_out = 30608
1433 self.nat44_add_address(self.nat_addr)
1434 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1435 self.tcp_port_in, self.tcp_port_out,
1436 proto=IP_PROTOS.tcp)
1437 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1438 self.udp_port_in, self.udp_port_out,
1439 proto=IP_PROTOS.udp)
1440 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1441 self.icmp_id_in, self.icmp_id_out,
1442 proto=IP_PROTOS.icmp)
1443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1448 pkts = self.create_stream_out(self.pg1)
1449 self.pg1.add_stream(pkts)
1450 self.pg_enable_capture(self.pg_interfaces)
1452 capture = self.pg0.get_capture(len(pkts))
1453 self.verify_capture_in(capture, self.pg0)
1456 pkts = self.create_stream_in(self.pg0, self.pg1)
1457 self.pg0.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg1.get_capture(len(pkts))
1461 self.verify_capture_out(capture)
1463 def test_static_vrf_aware(self):
1464 """ 1:1 NAT VRF awareness """
1466 nat_ip1 = "10.0.0.30"
1467 nat_ip2 = "10.0.0.40"
1468 self.tcp_port_out = 6303
1469 self.udp_port_out = 6304
1470 self.icmp_id_out = 6305
1472 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1474 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1476 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1478 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1479 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1481 # inside interface VRF match NAT44 static mapping VRF
1482 pkts = self.create_stream_in(self.pg4, self.pg3)
1483 self.pg4.add_stream(pkts)
1484 self.pg_enable_capture(self.pg_interfaces)
1486 capture = self.pg3.get_capture(len(pkts))
1487 self.verify_capture_out(capture, nat_ip1, True)
1489 # inside interface VRF don't match NAT44 static mapping VRF (packets
1491 pkts = self.create_stream_in(self.pg0, self.pg3)
1492 self.pg0.add_stream(pkts)
1493 self.pg_enable_capture(self.pg_interfaces)
1495 self.pg3.assert_nothing_captured()
1497 def test_dynamic_to_static(self):
1498 """ Switch from dynamic translation to 1:1NAT """
1499 nat_ip = "10.0.0.10"
1500 self.tcp_port_out = 6303
1501 self.udp_port_out = 6304
1502 self.icmp_id_out = 6305
1504 self.nat44_add_address(self.nat_addr)
1505 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1506 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1510 pkts = self.create_stream_in(self.pg0, self.pg1)
1511 self.pg0.add_stream(pkts)
1512 self.pg_enable_capture(self.pg_interfaces)
1514 capture = self.pg1.get_capture(len(pkts))
1515 self.verify_capture_out(capture)
1518 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1519 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1520 self.assertEqual(len(sessions), 0)
1521 pkts = self.create_stream_in(self.pg0, self.pg1)
1522 self.pg0.add_stream(pkts)
1523 self.pg_enable_capture(self.pg_interfaces)
1525 capture = self.pg1.get_capture(len(pkts))
1526 self.verify_capture_out(capture, nat_ip, True)
1528 def test_identity_nat(self):
1529 """ Identity NAT """
1531 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1532 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1533 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1536 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1537 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1538 TCP(sport=12345, dport=56789))
1539 self.pg1.add_stream(p)
1540 self.pg_enable_capture(self.pg_interfaces)
1542 capture = self.pg0.get_capture(1)
1547 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1548 self.assertEqual(ip.src, self.pg1.remote_ip4)
1549 self.assertEqual(tcp.dport, 56789)
1550 self.assertEqual(tcp.sport, 12345)
1551 self.assert_packet_checksums_valid(p)
1553 self.logger.error(ppp("Unexpected or invalid packet:", p))
1556 def test_multiple_inside_interfaces(self):
1557 """ NAT44 multiple non-overlapping address space inside interfaces """
1559 self.nat44_add_address(self.nat_addr)
1560 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1561 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1562 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1565 # between two NAT44 inside interfaces (no translation)
1566 pkts = self.create_stream_in(self.pg0, self.pg1)
1567 self.pg0.add_stream(pkts)
1568 self.pg_enable_capture(self.pg_interfaces)
1570 capture = self.pg1.get_capture(len(pkts))
1571 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1573 # from NAT44 inside to interface without NAT44 feature (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg2)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg2.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1581 # in2out 1st interface
1582 pkts = self.create_stream_in(self.pg0, self.pg3)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg3.get_capture(len(pkts))
1587 self.verify_capture_out(capture)
1589 # out2in 1st interface
1590 pkts = self.create_stream_out(self.pg3)
1591 self.pg3.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg0.get_capture(len(pkts))
1595 self.verify_capture_in(capture, self.pg0)
1597 # in2out 2nd interface
1598 pkts = self.create_stream_in(self.pg1, self.pg3)
1599 self.pg1.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg3.get_capture(len(pkts))
1603 self.verify_capture_out(capture)
1605 # out2in 2nd interface
1606 pkts = self.create_stream_out(self.pg3)
1607 self.pg3.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg1.get_capture(len(pkts))
1611 self.verify_capture_in(capture, self.pg1)
1613 def test_inside_overlapping_interfaces(self):
1614 """ NAT44 multiple inside interfaces with overlapping address space """
1616 static_nat_ip = "10.0.0.10"
1617 self.nat44_add_address(self.nat_addr)
1618 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1620 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1621 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1622 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1623 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1626 # between NAT44 inside interfaces with same VRF (no translation)
1627 pkts = self.create_stream_in(self.pg4, self.pg5)
1628 self.pg4.add_stream(pkts)
1629 self.pg_enable_capture(self.pg_interfaces)
1631 capture = self.pg5.get_capture(len(pkts))
1632 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1634 # between NAT44 inside interfaces with different VRF (hairpinning)
1635 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1636 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1637 TCP(sport=1234, dport=5678))
1638 self.pg4.add_stream(p)
1639 self.pg_enable_capture(self.pg_interfaces)
1641 capture = self.pg6.get_capture(1)
1646 self.assertEqual(ip.src, self.nat_addr)
1647 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1648 self.assertNotEqual(tcp.sport, 1234)
1649 self.assertEqual(tcp.dport, 5678)
1651 self.logger.error(ppp("Unexpected or invalid packet:", p))
1654 # in2out 1st interface
1655 pkts = self.create_stream_in(self.pg4, self.pg3)
1656 self.pg4.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 capture = self.pg3.get_capture(len(pkts))
1660 self.verify_capture_out(capture)
1662 # out2in 1st interface
1663 pkts = self.create_stream_out(self.pg3)
1664 self.pg3.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg4.get_capture(len(pkts))
1668 self.verify_capture_in(capture, self.pg4)
1670 # in2out 2nd interface
1671 pkts = self.create_stream_in(self.pg5, self.pg3)
1672 self.pg5.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg3.get_capture(len(pkts))
1676 self.verify_capture_out(capture)
1678 # out2in 2nd interface
1679 pkts = self.create_stream_out(self.pg3)
1680 self.pg3.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg5.get_capture(len(pkts))
1684 self.verify_capture_in(capture, self.pg5)
1687 addresses = self.vapi.nat44_address_dump()
1688 self.assertEqual(len(addresses), 1)
1689 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1690 self.assertEqual(len(sessions), 3)
1691 for session in sessions:
1692 self.assertFalse(session.is_static)
1693 self.assertEqual(session.inside_ip_address[0:4],
1694 self.pg5.remote_ip4n)
1695 self.assertEqual(session.outside_ip_address,
1696 addresses[0].ip_address)
1697 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1698 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1699 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1700 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1701 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1702 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1703 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1704 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1705 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1707 # in2out 3rd interface
1708 pkts = self.create_stream_in(self.pg6, self.pg3)
1709 self.pg6.add_stream(pkts)
1710 self.pg_enable_capture(self.pg_interfaces)
1712 capture = self.pg3.get_capture(len(pkts))
1713 self.verify_capture_out(capture, static_nat_ip, True)
1715 # out2in 3rd interface
1716 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1717 self.pg3.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg6.get_capture(len(pkts))
1721 self.verify_capture_in(capture, self.pg6)
1723 # general user and session dump verifications
1724 users = self.vapi.nat44_user_dump()
1725 self.assertTrue(len(users) >= 3)
1726 addresses = self.vapi.nat44_address_dump()
1727 self.assertEqual(len(addresses), 1)
1729 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1731 for session in sessions:
1732 self.assertEqual(user.ip_address, session.inside_ip_address)
1733 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1734 self.assertTrue(session.protocol in
1735 [IP_PROTOS.tcp, IP_PROTOS.udp,
1737 self.assertFalse(session.ext_host_valid)
1740 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1741 self.assertTrue(len(sessions) >= 4)
1742 for session in sessions:
1743 self.assertFalse(session.is_static)
1744 self.assertEqual(session.inside_ip_address[0:4],
1745 self.pg4.remote_ip4n)
1746 self.assertEqual(session.outside_ip_address,
1747 addresses[0].ip_address)
1750 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1751 self.assertTrue(len(sessions) >= 3)
1752 for session in sessions:
1753 self.assertTrue(session.is_static)
1754 self.assertEqual(session.inside_ip_address[0:4],
1755 self.pg6.remote_ip4n)
1756 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1757 map(int, static_nat_ip.split('.')))
1758 self.assertTrue(session.inside_port in
1759 [self.tcp_port_in, self.udp_port_in,
1762 def test_hairpinning(self):
1763 """ NAT44 hairpinning - 1:1 NAPT """
1765 host = self.pg0.remote_hosts[0]
1766 server = self.pg0.remote_hosts[1]
1769 server_in_port = 5678
1770 server_out_port = 8765
1772 self.nat44_add_address(self.nat_addr)
1773 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1774 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1776 # add static mapping for server
1777 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1778 server_in_port, server_out_port,
1779 proto=IP_PROTOS.tcp)
1781 # send packet from host to server
1782 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1783 IP(src=host.ip4, dst=self.nat_addr) /
1784 TCP(sport=host_in_port, dport=server_out_port))
1785 self.pg0.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg0.get_capture(1)
1793 self.assertEqual(ip.src, self.nat_addr)
1794 self.assertEqual(ip.dst, server.ip4)
1795 self.assertNotEqual(tcp.sport, host_in_port)
1796 self.assertEqual(tcp.dport, server_in_port)
1797 self.assert_packet_checksums_valid(p)
1798 host_out_port = tcp.sport
1800 self.logger.error(ppp("Unexpected or invalid packet:", p))
1803 # send reply from server to host
1804 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1805 IP(src=server.ip4, dst=self.nat_addr) /
1806 TCP(sport=server_in_port, dport=host_out_port))
1807 self.pg0.add_stream(p)
1808 self.pg_enable_capture(self.pg_interfaces)
1810 capture = self.pg0.get_capture(1)
1815 self.assertEqual(ip.src, self.nat_addr)
1816 self.assertEqual(ip.dst, host.ip4)
1817 self.assertEqual(tcp.sport, server_out_port)
1818 self.assertEqual(tcp.dport, host_in_port)
1819 self.assert_packet_checksums_valid(p)
1821 self.logger.error(ppp("Unexpected or invalid packet:", p))
1824 def test_hairpinning2(self):
1825 """ NAT44 hairpinning - 1:1 NAT"""
1827 server1_nat_ip = "10.0.0.10"
1828 server2_nat_ip = "10.0.0.11"
1829 host = self.pg0.remote_hosts[0]
1830 server1 = self.pg0.remote_hosts[1]
1831 server2 = self.pg0.remote_hosts[2]
1832 server_tcp_port = 22
1833 server_udp_port = 20
1835 self.nat44_add_address(self.nat_addr)
1836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1840 # add static mapping for servers
1841 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1842 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1846 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1847 IP(src=host.ip4, dst=server1_nat_ip) /
1848 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1850 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1851 IP(src=host.ip4, dst=server1_nat_ip) /
1852 UDP(sport=self.udp_port_in, dport=server_udp_port))
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 ICMP(id=self.icmp_id_in, type='echo-request'))
1858 self.pg0.add_stream(pkts)
1859 self.pg_enable_capture(self.pg_interfaces)
1861 capture = self.pg0.get_capture(len(pkts))
1862 for packet in capture:
1864 self.assertEqual(packet[IP].src, self.nat_addr)
1865 self.assertEqual(packet[IP].dst, server1.ip4)
1866 if packet.haslayer(TCP):
1867 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1868 self.assertEqual(packet[TCP].dport, server_tcp_port)
1869 self.tcp_port_out = packet[TCP].sport
1870 self.assert_packet_checksums_valid(packet)
1871 elif packet.haslayer(UDP):
1872 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1873 self.assertEqual(packet[UDP].dport, server_udp_port)
1874 self.udp_port_out = packet[UDP].sport
1876 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1877 self.icmp_id_out = packet[ICMP].id
1879 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1884 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1885 IP(src=server1.ip4, dst=self.nat_addr) /
1886 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1888 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1889 IP(src=server1.ip4, dst=self.nat_addr) /
1890 UDP(sport=server_udp_port, dport=self.udp_port_out))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 ICMP(id=self.icmp_id_out, type='echo-reply'))
1896 self.pg0.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg0.get_capture(len(pkts))
1900 for packet in capture:
1902 self.assertEqual(packet[IP].src, server1_nat_ip)
1903 self.assertEqual(packet[IP].dst, host.ip4)
1904 if packet.haslayer(TCP):
1905 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1906 self.assertEqual(packet[TCP].sport, server_tcp_port)
1907 self.assert_packet_checksums_valid(packet)
1908 elif packet.haslayer(UDP):
1909 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1910 self.assertEqual(packet[UDP].sport, server_udp_port)
1912 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1917 # server2 to server1
1919 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1920 IP(src=server2.ip4, dst=server1_nat_ip) /
1921 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1923 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1924 IP(src=server2.ip4, dst=server1_nat_ip) /
1925 UDP(sport=self.udp_port_in, dport=server_udp_port))
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 ICMP(id=self.icmp_id_in, type='echo-request'))
1931 self.pg0.add_stream(pkts)
1932 self.pg_enable_capture(self.pg_interfaces)
1934 capture = self.pg0.get_capture(len(pkts))
1935 for packet in capture:
1937 self.assertEqual(packet[IP].src, server2_nat_ip)
1938 self.assertEqual(packet[IP].dst, server1.ip4)
1939 if packet.haslayer(TCP):
1940 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1941 self.assertEqual(packet[TCP].dport, server_tcp_port)
1942 self.tcp_port_out = packet[TCP].sport
1943 self.assert_packet_checksums_valid(packet)
1944 elif packet.haslayer(UDP):
1945 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1946 self.assertEqual(packet[UDP].dport, server_udp_port)
1947 self.udp_port_out = packet[UDP].sport
1949 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1950 self.icmp_id_out = packet[ICMP].id
1952 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1955 # server1 to server2
1957 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1958 IP(src=server1.ip4, dst=server2_nat_ip) /
1959 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1961 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1962 IP(src=server1.ip4, dst=server2_nat_ip) /
1963 UDP(sport=server_udp_port, dport=self.udp_port_out))
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 ICMP(id=self.icmp_id_out, type='echo-reply'))
1969 self.pg0.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg0.get_capture(len(pkts))
1973 for packet in capture:
1975 self.assertEqual(packet[IP].src, server1_nat_ip)
1976 self.assertEqual(packet[IP].dst, server2.ip4)
1977 if packet.haslayer(TCP):
1978 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1979 self.assertEqual(packet[TCP].sport, server_tcp_port)
1980 self.assert_packet_checksums_valid(packet)
1981 elif packet.haslayer(UDP):
1982 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1983 self.assertEqual(packet[UDP].sport, server_udp_port)
1985 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1990 def test_max_translations_per_user(self):
1991 """ MAX translations per user - recycle the least recently used """
1993 self.nat44_add_address(self.nat_addr)
1994 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1995 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1998 # get maximum number of translations per user
1999 nat44_config = self.vapi.nat_show_config()
2001 # send more than maximum number of translations per user packets
2002 pkts_num = nat44_config.max_translations_per_user + 5
2004 for port in range(0, pkts_num):
2005 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2007 TCP(sport=1025 + port))
2009 self.pg0.add_stream(pkts)
2010 self.pg_enable_capture(self.pg_interfaces)
2013 # verify number of translated packet
2014 self.pg1.get_capture(pkts_num)
2016 users = self.vapi.nat44_user_dump()
2018 if user.ip_address == self.pg0.remote_ip4n:
2019 self.assertEqual(user.nsessions,
2020 nat44_config.max_translations_per_user)
2021 self.assertEqual(user.nstaticsessions, 0)
2024 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2026 proto=IP_PROTOS.tcp)
2027 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2028 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2029 TCP(sport=tcp_port))
2030 self.pg0.add_stream(p)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 self.pg1.get_capture(1)
2034 users = self.vapi.nat44_user_dump()
2036 if user.ip_address == self.pg0.remote_ip4n:
2037 self.assertEqual(user.nsessions,
2038 nat44_config.max_translations_per_user - 1)
2039 self.assertEqual(user.nstaticsessions, 1)
2041 def test_interface_addr(self):
2042 """ Acquire NAT44 addresses from interface """
2043 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2045 # no address in NAT pool
2046 adresses = self.vapi.nat44_address_dump()
2047 self.assertEqual(0, len(adresses))
2049 # configure interface address and check NAT address pool
2050 self.pg7.config_ip4()
2051 adresses = self.vapi.nat44_address_dump()
2052 self.assertEqual(1, len(adresses))
2053 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2055 # remove interface address and check NAT address pool
2056 self.pg7.unconfig_ip4()
2057 adresses = self.vapi.nat44_address_dump()
2058 self.assertEqual(0, len(adresses))
2060 def test_interface_addr_static_mapping(self):
2061 """ Static mapping with addresses from interface """
2064 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2065 self.nat44_add_static_mapping(
2067 external_sw_if_index=self.pg7.sw_if_index,
2070 # static mappings with external interface
2071 static_mappings = self.vapi.nat44_static_mapping_dump()
2072 self.assertEqual(1, len(static_mappings))
2073 self.assertEqual(self.pg7.sw_if_index,
2074 static_mappings[0].external_sw_if_index)
2075 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2077 # configure interface address and check static mappings
2078 self.pg7.config_ip4()
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(2, len(static_mappings))
2082 for sm in static_mappings:
2083 if sm.external_sw_if_index == 0xFFFFFFFF:
2084 self.assertEqual(sm.external_ip_address[0:4],
2085 self.pg7.local_ip4n)
2086 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2088 self.assertTrue(resolved)
2090 # remove interface address and check static mappings
2091 self.pg7.unconfig_ip4()
2092 static_mappings = self.vapi.nat44_static_mapping_dump()
2093 self.assertEqual(1, len(static_mappings))
2094 self.assertEqual(self.pg7.sw_if_index,
2095 static_mappings[0].external_sw_if_index)
2096 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2098 # configure interface address again and check static mappings
2099 self.pg7.config_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(2, len(static_mappings))
2103 for sm in static_mappings:
2104 if sm.external_sw_if_index == 0xFFFFFFFF:
2105 self.assertEqual(sm.external_ip_address[0:4],
2106 self.pg7.local_ip4n)
2107 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2109 self.assertTrue(resolved)
2111 # remove static mapping
2112 self.nat44_add_static_mapping(
2114 external_sw_if_index=self.pg7.sw_if_index,
2117 static_mappings = self.vapi.nat44_static_mapping_dump()
2118 self.assertEqual(0, len(static_mappings))
2120 def test_interface_addr_identity_nat(self):
2121 """ Identity NAT with addresses from interface """
2124 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2125 self.vapi.nat44_add_del_identity_mapping(
2126 sw_if_index=self.pg7.sw_if_index,
2128 protocol=IP_PROTOS.tcp,
2131 # identity mappings with external interface
2132 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2133 self.assertEqual(1, len(identity_mappings))
2134 self.assertEqual(self.pg7.sw_if_index,
2135 identity_mappings[0].sw_if_index)
2137 # configure interface address and check identity mappings
2138 self.pg7.config_ip4()
2139 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(2, len(identity_mappings))
2142 for sm in identity_mappings:
2143 if sm.sw_if_index == 0xFFFFFFFF:
2144 self.assertEqual(identity_mappings[0].ip_address,
2145 self.pg7.local_ip4n)
2146 self.assertEqual(port, identity_mappings[0].port)
2147 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2149 self.assertTrue(resolved)
2151 # remove interface address and check identity mappings
2152 self.pg7.unconfig_ip4()
2153 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2154 self.assertEqual(1, len(identity_mappings))
2155 self.assertEqual(self.pg7.sw_if_index,
2156 identity_mappings[0].sw_if_index)
2158 def test_ipfix_nat44_sess(self):
2159 """ IPFIX logging NAT44 session created/delted """
2160 self.ipfix_domain_id = 10
2161 self.ipfix_src_port = 20202
2162 colector_port = 30303
2163 bind_layers(UDP, IPFIX, dport=30303)
2164 self.nat44_add_address(self.nat_addr)
2165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2168 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2169 src_address=self.pg3.local_ip4n,
2171 template_interval=10,
2172 collector_port=colector_port)
2173 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2174 src_port=self.ipfix_src_port)
2176 pkts = self.create_stream_in(self.pg0, self.pg1)
2177 self.pg0.add_stream(pkts)
2178 self.pg_enable_capture(self.pg_interfaces)
2180 capture = self.pg1.get_capture(len(pkts))
2181 self.verify_capture_out(capture)
2182 self.nat44_add_address(self.nat_addr, is_add=0)
2183 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2184 capture = self.pg3.get_capture(9)
2185 ipfix = IPFIXDecoder()
2186 # first load template
2188 self.assertTrue(p.haslayer(IPFIX))
2189 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2190 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2191 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2192 self.assertEqual(p[UDP].dport, colector_port)
2193 self.assertEqual(p[IPFIX].observationDomainID,
2194 self.ipfix_domain_id)
2195 if p.haslayer(Template):
2196 ipfix.add_template(p.getlayer(Template))
2197 # verify events in data set
2199 if p.haslayer(Data):
2200 data = ipfix.decode_data_set(p.getlayer(Set))
2201 self.verify_ipfix_nat44_ses(data)
2203 def test_ipfix_addr_exhausted(self):
2204 """ IPFIX logging NAT addresses exhausted """
2205 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2206 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2208 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2209 src_address=self.pg3.local_ip4n,
2211 template_interval=10)
2212 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2213 src_port=self.ipfix_src_port)
2215 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2216 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2218 self.pg0.add_stream(p)
2219 self.pg_enable_capture(self.pg_interfaces)
2221 self.pg1.assert_nothing_captured()
2223 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2224 capture = self.pg3.get_capture(9)
2225 ipfix = IPFIXDecoder()
2226 # first load template
2228 self.assertTrue(p.haslayer(IPFIX))
2229 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2230 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2231 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2232 self.assertEqual(p[UDP].dport, 4739)
2233 self.assertEqual(p[IPFIX].observationDomainID,
2234 self.ipfix_domain_id)
2235 if p.haslayer(Template):
2236 ipfix.add_template(p.getlayer(Template))
2237 # verify events in data set
2239 if p.haslayer(Data):
2240 data = ipfix.decode_data_set(p.getlayer(Set))
2241 self.verify_ipfix_addr_exhausted(data)
2243 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2244 def test_ipfix_max_sessions(self):
2245 """ IPFIX logging maximum session entries exceeded """
2246 self.nat44_add_address(self.nat_addr)
2247 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2248 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2251 nat44_config = self.vapi.nat_show_config()
2252 max_sessions = 10 * nat44_config.translation_buckets
2255 for i in range(0, max_sessions):
2256 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=src, dst=self.pg1.remote_ip4) /
2261 self.pg0.add_stream(pkts)
2262 self.pg_enable_capture(self.pg_interfaces)
2265 self.pg1.get_capture(max_sessions)
2266 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2267 src_address=self.pg3.local_ip4n,
2269 template_interval=10)
2270 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2271 src_port=self.ipfix_src_port)
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2276 self.pg0.add_stream(p)
2277 self.pg_enable_capture(self.pg_interfaces)
2279 self.pg1.assert_nothing_captured()
2281 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2282 capture = self.pg3.get_capture(9)
2283 ipfix = IPFIXDecoder()
2284 # first load template
2286 self.assertTrue(p.haslayer(IPFIX))
2287 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2288 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2289 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2290 self.assertEqual(p[UDP].dport, 4739)
2291 self.assertEqual(p[IPFIX].observationDomainID,
2292 self.ipfix_domain_id)
2293 if p.haslayer(Template):
2294 ipfix.add_template(p.getlayer(Template))
2295 # verify events in data set
2297 if p.haslayer(Data):
2298 data = ipfix.decode_data_set(p.getlayer(Set))
2299 self.verify_ipfix_max_sessions(data, max_sessions)
2301 def test_pool_addr_fib(self):
2302 """ NAT44 add pool addresses to FIB """
2303 static_addr = '10.0.0.10'
2304 self.nat44_add_address(self.nat_addr)
2305 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2308 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2311 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2312 ARP(op=ARP.who_has, pdst=self.nat_addr,
2313 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2314 self.pg1.add_stream(p)
2315 self.pg_enable_capture(self.pg_interfaces)
2317 capture = self.pg1.get_capture(1)
2318 self.assertTrue(capture[0].haslayer(ARP))
2319 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2322 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2323 ARP(op=ARP.who_has, pdst=static_addr,
2324 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2325 self.pg1.add_stream(p)
2326 self.pg_enable_capture(self.pg_interfaces)
2328 capture = self.pg1.get_capture(1)
2329 self.assertTrue(capture[0].haslayer(ARP))
2330 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2332 # send ARP to non-NAT44 interface
2333 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2334 ARP(op=ARP.who_has, pdst=self.nat_addr,
2335 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2336 self.pg2.add_stream(p)
2337 self.pg_enable_capture(self.pg_interfaces)
2339 self.pg1.assert_nothing_captured()
2341 # remove addresses and verify
2342 self.nat44_add_address(self.nat_addr, is_add=0)
2343 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2346 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2347 ARP(op=ARP.who_has, pdst=self.nat_addr,
2348 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2349 self.pg1.add_stream(p)
2350 self.pg_enable_capture(self.pg_interfaces)
2352 self.pg1.assert_nothing_captured()
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=static_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 def test_vrf_mode(self):
2363 """ NAT44 tenant VRF aware address pool mode """
2367 nat_ip1 = "10.0.0.10"
2368 nat_ip2 = "10.0.0.11"
2370 self.pg0.unconfig_ip4()
2371 self.pg1.unconfig_ip4()
2372 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2373 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2374 self.pg0.set_table_ip4(vrf_id1)
2375 self.pg1.set_table_ip4(vrf_id2)
2376 self.pg0.config_ip4()
2377 self.pg1.config_ip4()
2378 self.pg0.resolve_arp()
2379 self.pg1.resolve_arp()
2381 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2382 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2383 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2384 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2385 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2390 pkts = self.create_stream_in(self.pg0, self.pg2)
2391 self.pg0.add_stream(pkts)
2392 self.pg_enable_capture(self.pg_interfaces)
2394 capture = self.pg2.get_capture(len(pkts))
2395 self.verify_capture_out(capture, nat_ip1)
2398 pkts = self.create_stream_in(self.pg1, self.pg2)
2399 self.pg1.add_stream(pkts)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 capture = self.pg2.get_capture(len(pkts))
2403 self.verify_capture_out(capture, nat_ip2)
2406 self.pg0.unconfig_ip4()
2407 self.pg1.unconfig_ip4()
2408 self.pg0.set_table_ip4(0)
2409 self.pg1.set_table_ip4(0)
2410 self.pg0.config_ip4()
2411 self.pg1.config_ip4()
2412 self.pg0.resolve_arp()
2413 self.pg1.resolve_arp()
2414 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2415 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2417 def test_vrf_feature_independent(self):
2418 """ NAT44 tenant VRF independent address pool mode """
2420 nat_ip1 = "10.0.0.10"
2421 nat_ip2 = "10.0.0.11"
2423 self.nat44_add_address(nat_ip1)
2424 self.nat44_add_address(nat_ip2, vrf_id=99)
2425 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2426 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2427 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2431 pkts = self.create_stream_in(self.pg0, self.pg2)
2432 self.pg0.add_stream(pkts)
2433 self.pg_enable_capture(self.pg_interfaces)
2435 capture = self.pg2.get_capture(len(pkts))
2436 self.verify_capture_out(capture, nat_ip1)
2439 pkts = self.create_stream_in(self.pg1, self.pg2)
2440 self.pg1.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2446 def test_dynamic_ipless_interfaces(self):
2447 """ NAT44 interfaces without configured IP address """
2449 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2450 mactobinary(self.pg7.remote_mac),
2451 self.pg7.remote_ip4n,
2453 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2454 mactobinary(self.pg8.remote_mac),
2455 self.pg8.remote_ip4n,
2458 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2459 dst_address_length=32,
2460 next_hop_address=self.pg7.remote_ip4n,
2461 next_hop_sw_if_index=self.pg7.sw_if_index)
2462 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2463 dst_address_length=32,
2464 next_hop_address=self.pg8.remote_ip4n,
2465 next_hop_sw_if_index=self.pg8.sw_if_index)
2467 self.nat44_add_address(self.nat_addr)
2468 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2469 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2473 pkts = self.create_stream_in(self.pg7, self.pg8)
2474 self.pg7.add_stream(pkts)
2475 self.pg_enable_capture(self.pg_interfaces)
2477 capture = self.pg8.get_capture(len(pkts))
2478 self.verify_capture_out(capture)
2481 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2482 self.pg8.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg7.get_capture(len(pkts))
2486 self.verify_capture_in(capture, self.pg7)
2488 def test_static_ipless_interfaces(self):
2489 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2491 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2492 mactobinary(self.pg7.remote_mac),
2493 self.pg7.remote_ip4n,
2495 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2496 mactobinary(self.pg8.remote_mac),
2497 self.pg8.remote_ip4n,
2500 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2501 dst_address_length=32,
2502 next_hop_address=self.pg7.remote_ip4n,
2503 next_hop_sw_if_index=self.pg7.sw_if_index)
2504 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2505 dst_address_length=32,
2506 next_hop_address=self.pg8.remote_ip4n,
2507 next_hop_sw_if_index=self.pg8.sw_if_index)
2509 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2510 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2511 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2515 pkts = self.create_stream_out(self.pg8)
2516 self.pg8.add_stream(pkts)
2517 self.pg_enable_capture(self.pg_interfaces)
2519 capture = self.pg7.get_capture(len(pkts))
2520 self.verify_capture_in(capture, self.pg7)
2523 pkts = self.create_stream_in(self.pg7, self.pg8)
2524 self.pg7.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg8.get_capture(len(pkts))
2528 self.verify_capture_out(capture, self.nat_addr, True)
2530 def test_static_with_port_ipless_interfaces(self):
2531 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2533 self.tcp_port_out = 30606
2534 self.udp_port_out = 30607
2535 self.icmp_id_out = 30608
2537 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2538 mactobinary(self.pg7.remote_mac),
2539 self.pg7.remote_ip4n,
2541 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2542 mactobinary(self.pg8.remote_mac),
2543 self.pg8.remote_ip4n,
2546 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2547 dst_address_length=32,
2548 next_hop_address=self.pg7.remote_ip4n,
2549 next_hop_sw_if_index=self.pg7.sw_if_index)
2550 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2551 dst_address_length=32,
2552 next_hop_address=self.pg8.remote_ip4n,
2553 next_hop_sw_if_index=self.pg8.sw_if_index)
2555 self.nat44_add_address(self.nat_addr)
2556 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2557 self.tcp_port_in, self.tcp_port_out,
2558 proto=IP_PROTOS.tcp)
2559 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2560 self.udp_port_in, self.udp_port_out,
2561 proto=IP_PROTOS.udp)
2562 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2563 self.icmp_id_in, self.icmp_id_out,
2564 proto=IP_PROTOS.icmp)
2565 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2566 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2570 pkts = self.create_stream_out(self.pg8)
2571 self.pg8.add_stream(pkts)
2572 self.pg_enable_capture(self.pg_interfaces)
2574 capture = self.pg7.get_capture(len(pkts))
2575 self.verify_capture_in(capture, self.pg7)
2578 pkts = self.create_stream_in(self.pg7, self.pg8)
2579 self.pg7.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg8.get_capture(len(pkts))
2583 self.verify_capture_out(capture)
2585 def test_static_unknown_proto(self):
2586 """ 1:1 NAT translate packet with unknown protocol """
2587 nat_ip = "10.0.0.10"
2588 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2589 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2590 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2594 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2595 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2597 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2598 TCP(sport=1234, dport=1234))
2599 self.pg0.add_stream(p)
2600 self.pg_enable_capture(self.pg_interfaces)
2602 p = self.pg1.get_capture(1)
2605 self.assertEqual(packet[IP].src, nat_ip)
2606 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2607 self.assertTrue(packet.haslayer(GRE))
2608 self.assert_packet_checksums_valid(packet)
2610 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2614 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2615 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2617 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2618 TCP(sport=1234, dport=1234))
2619 self.pg1.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 p = self.pg0.get_capture(1)
2625 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2626 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2627 self.assertTrue(packet.haslayer(GRE))
2628 self.assert_packet_checksums_valid(packet)
2630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2633 def test_hairpinning_static_unknown_proto(self):
2634 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2636 host = self.pg0.remote_hosts[0]
2637 server = self.pg0.remote_hosts[1]
2639 host_nat_ip = "10.0.0.10"
2640 server_nat_ip = "10.0.0.11"
2642 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2643 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2649 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2650 IP(src=host.ip4, dst=server_nat_ip) /
2652 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2653 TCP(sport=1234, dport=1234))
2654 self.pg0.add_stream(p)
2655 self.pg_enable_capture(self.pg_interfaces)
2657 p = self.pg0.get_capture(1)
2660 self.assertEqual(packet[IP].src, host_nat_ip)
2661 self.assertEqual(packet[IP].dst, server.ip4)
2662 self.assertTrue(packet.haslayer(GRE))
2663 self.assert_packet_checksums_valid(packet)
2665 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2669 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2670 IP(src=server.ip4, dst=host_nat_ip) /
2672 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2673 TCP(sport=1234, dport=1234))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 p = self.pg0.get_capture(1)
2680 self.assertEqual(packet[IP].src, server_nat_ip)
2681 self.assertEqual(packet[IP].dst, host.ip4)
2682 self.assertTrue(packet.haslayer(GRE))
2683 self.assert_packet_checksums_valid(packet)
2685 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2688 def test_output_feature(self):
2689 """ NAT44 interface output feature (in2out postrouting) """
2690 self.nat44_add_address(self.nat_addr)
2691 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2692 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2693 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2697 pkts = self.create_stream_in(self.pg0, self.pg3)
2698 self.pg0.add_stream(pkts)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg3.get_capture(len(pkts))
2702 self.verify_capture_out(capture)
2705 pkts = self.create_stream_out(self.pg3)
2706 self.pg3.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg0.get_capture(len(pkts))
2710 self.verify_capture_in(capture, self.pg0)
2712 # from non-NAT interface to NAT inside interface
2713 pkts = self.create_stream_in(self.pg2, self.pg0)
2714 self.pg2.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2720 def test_output_feature_vrf_aware(self):
2721 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2722 nat_ip_vrf10 = "10.0.0.10"
2723 nat_ip_vrf20 = "10.0.0.20"
2725 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2726 dst_address_length=32,
2727 next_hop_address=self.pg3.remote_ip4n,
2728 next_hop_sw_if_index=self.pg3.sw_if_index,
2730 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2731 dst_address_length=32,
2732 next_hop_address=self.pg3.remote_ip4n,
2733 next_hop_sw_if_index=self.pg3.sw_if_index,
2736 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2737 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2738 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2739 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2740 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2744 pkts = self.create_stream_in(self.pg4, self.pg3)
2745 self.pg4.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg3.get_capture(len(pkts))
2749 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2752 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2753 self.pg3.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg4.get_capture(len(pkts))
2757 self.verify_capture_in(capture, self.pg4)
2760 pkts = self.create_stream_in(self.pg6, self.pg3)
2761 self.pg6.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg3.get_capture(len(pkts))
2765 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2768 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2769 self.pg3.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg6.get_capture(len(pkts))
2773 self.verify_capture_in(capture, self.pg6)
2775 def test_output_feature_hairpinning(self):
2776 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2777 host = self.pg0.remote_hosts[0]
2778 server = self.pg0.remote_hosts[1]
2781 server_in_port = 5678
2782 server_out_port = 8765
2784 self.nat44_add_address(self.nat_addr)
2785 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2786 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2789 # add static mapping for server
2790 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2791 server_in_port, server_out_port,
2792 proto=IP_PROTOS.tcp)
2794 # send packet from host to server
2795 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2796 IP(src=host.ip4, dst=self.nat_addr) /
2797 TCP(sport=host_in_port, dport=server_out_port))
2798 self.pg0.add_stream(p)
2799 self.pg_enable_capture(self.pg_interfaces)
2801 capture = self.pg0.get_capture(1)
2806 self.assertEqual(ip.src, self.nat_addr)
2807 self.assertEqual(ip.dst, server.ip4)
2808 self.assertNotEqual(tcp.sport, host_in_port)
2809 self.assertEqual(tcp.dport, server_in_port)
2810 self.assert_packet_checksums_valid(p)
2811 host_out_port = tcp.sport
2813 self.logger.error(ppp("Unexpected or invalid packet:", p))
2816 # send reply from server to host
2817 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2818 IP(src=server.ip4, dst=self.nat_addr) /
2819 TCP(sport=server_in_port, dport=host_out_port))
2820 self.pg0.add_stream(p)
2821 self.pg_enable_capture(self.pg_interfaces)
2823 capture = self.pg0.get_capture(1)
2828 self.assertEqual(ip.src, self.nat_addr)
2829 self.assertEqual(ip.dst, host.ip4)
2830 self.assertEqual(tcp.sport, server_out_port)
2831 self.assertEqual(tcp.dport, host_in_port)
2832 self.assert_packet_checksums_valid(p)
2834 self.logger.error(ppp("Unexpected or invalid packet:", p))
2837 def test_one_armed_nat44(self):
2838 """ One armed NAT44 """
2839 remote_host = self.pg9.remote_hosts[0]
2840 local_host = self.pg9.remote_hosts[1]
2843 self.nat44_add_address(self.nat_addr)
2844 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2845 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2849 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2850 IP(src=local_host.ip4, dst=remote_host.ip4) /
2851 TCP(sport=12345, dport=80))
2852 self.pg9.add_stream(p)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg9.get_capture(1)
2860 self.assertEqual(ip.src, self.nat_addr)
2861 self.assertEqual(ip.dst, remote_host.ip4)
2862 self.assertNotEqual(tcp.sport, 12345)
2863 external_port = tcp.sport
2864 self.assertEqual(tcp.dport, 80)
2865 self.assert_packet_checksums_valid(p)
2867 self.logger.error(ppp("Unexpected or invalid packet:", p))
2871 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2872 IP(src=remote_host.ip4, dst=self.nat_addr) /
2873 TCP(sport=80, dport=external_port))
2874 self.pg9.add_stream(p)
2875 self.pg_enable_capture(self.pg_interfaces)
2877 capture = self.pg9.get_capture(1)
2882 self.assertEqual(ip.src, remote_host.ip4)
2883 self.assertEqual(ip.dst, local_host.ip4)
2884 self.assertEqual(tcp.sport, 80)
2885 self.assertEqual(tcp.dport, 12345)
2886 self.assert_packet_checksums_valid(p)
2888 self.logger.error(ppp("Unexpected or invalid packet:", p))
2891 def test_del_session(self):
2892 """ Delete NAT44 session """
2893 self.nat44_add_address(self.nat_addr)
2894 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2895 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2898 pkts = self.create_stream_in(self.pg0, self.pg1)
2899 self.pg0.add_stream(pkts)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 self.pg1.get_capture(len(pkts))
2904 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2905 nsessions = len(sessions)
2907 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2908 sessions[0].inside_port,
2909 sessions[0].protocol)
2910 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2911 sessions[1].outside_port,
2912 sessions[1].protocol,
2915 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2916 self.assertEqual(nsessions - len(sessions), 2)
2918 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2919 sessions[0].inside_port,
2920 sessions[0].protocol)
2922 self.verify_no_nat44_user()
2924 def test_set_get_reass(self):
2925 """ NAT44 set/get virtual fragmentation reassembly """
2926 reas_cfg1 = self.vapi.nat_get_reass()
2928 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2929 max_reass=reas_cfg1.ip4_max_reass * 2,
2930 max_frag=reas_cfg1.ip4_max_frag * 2)
2932 reas_cfg2 = self.vapi.nat_get_reass()
2934 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2935 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2936 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2938 self.vapi.nat_set_reass(drop_frag=1)
2939 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2941 def test_frag_in_order(self):
2942 """ NAT44 translate fragments arriving in order """
2943 self.nat44_add_address(self.nat_addr)
2944 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2945 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2948 data = "A" * 4 + "B" * 16 + "C" * 3
2949 self.tcp_port_in = random.randint(1025, 65535)
2951 reass = self.vapi.nat_reass_dump()
2952 reass_n_start = len(reass)
2955 pkts = self.create_stream_frag(self.pg0,
2956 self.pg1.remote_ip4,
2960 self.pg0.add_stream(pkts)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 frags = self.pg1.get_capture(len(pkts))
2964 p = self.reass_frags_and_verify(frags,
2966 self.pg1.remote_ip4)
2967 self.assertEqual(p[TCP].dport, 20)
2968 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2969 self.tcp_port_out = p[TCP].sport
2970 self.assertEqual(data, p[Raw].load)
2973 pkts = self.create_stream_frag(self.pg1,
2978 self.pg1.add_stream(pkts)
2979 self.pg_enable_capture(self.pg_interfaces)
2981 frags = self.pg0.get_capture(len(pkts))
2982 p = self.reass_frags_and_verify(frags,
2983 self.pg1.remote_ip4,
2984 self.pg0.remote_ip4)
2985 self.assertEqual(p[TCP].sport, 20)
2986 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2987 self.assertEqual(data, p[Raw].load)
2989 reass = self.vapi.nat_reass_dump()
2990 reass_n_end = len(reass)
2992 self.assertEqual(reass_n_end - reass_n_start, 2)
2994 def test_reass_hairpinning(self):
2995 """ NAT44 fragments hairpinning """
2996 server = self.pg0.remote_hosts[1]
2997 host_in_port = random.randint(1025, 65535)
2998 server_in_port = random.randint(1025, 65535)
2999 server_out_port = random.randint(1025, 65535)
3000 data = "A" * 4 + "B" * 16 + "C" * 3
3002 self.nat44_add_address(self.nat_addr)
3003 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3004 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3006 # add static mapping for server
3007 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3008 server_in_port, server_out_port,
3009 proto=IP_PROTOS.tcp)
3011 # send packet from host to server
3012 pkts = self.create_stream_frag(self.pg0,
3017 self.pg0.add_stream(pkts)
3018 self.pg_enable_capture(self.pg_interfaces)
3020 frags = self.pg0.get_capture(len(pkts))
3021 p = self.reass_frags_and_verify(frags,
3024 self.assertNotEqual(p[TCP].sport, host_in_port)
3025 self.assertEqual(p[TCP].dport, server_in_port)
3026 self.assertEqual(data, p[Raw].load)
3028 def test_frag_out_of_order(self):
3029 """ NAT44 translate fragments arriving out of order """
3030 self.nat44_add_address(self.nat_addr)
3031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3032 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3035 data = "A" * 4 + "B" * 16 + "C" * 3
3036 random.randint(1025, 65535)
3039 pkts = self.create_stream_frag(self.pg0,
3040 self.pg1.remote_ip4,
3045 self.pg0.add_stream(pkts)
3046 self.pg_enable_capture(self.pg_interfaces)
3048 frags = self.pg1.get_capture(len(pkts))
3049 p = self.reass_frags_and_verify(frags,
3051 self.pg1.remote_ip4)
3052 self.assertEqual(p[TCP].dport, 20)
3053 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3054 self.tcp_port_out = p[TCP].sport
3055 self.assertEqual(data, p[Raw].load)
3058 pkts = self.create_stream_frag(self.pg1,
3064 self.pg1.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 frags = self.pg0.get_capture(len(pkts))
3068 p = self.reass_frags_and_verify(frags,
3069 self.pg1.remote_ip4,
3070 self.pg0.remote_ip4)
3071 self.assertEqual(p[TCP].sport, 20)
3072 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3073 self.assertEqual(data, p[Raw].load)
3075 def test_port_restricted(self):
3076 """ Port restricted NAT44 (MAP-E CE) """
3077 self.nat44_add_address(self.nat_addr)
3078 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3079 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3081 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3082 "psid-offset 6 psid-len 6")
3084 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3085 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3086 TCP(sport=4567, dport=22))
3087 self.pg0.add_stream(p)
3088 self.pg_enable_capture(self.pg_interfaces)
3090 capture = self.pg1.get_capture(1)
3095 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3096 self.assertEqual(ip.src, self.nat_addr)
3097 self.assertEqual(tcp.dport, 22)
3098 self.assertNotEqual(tcp.sport, 4567)
3099 self.assertEqual((tcp.sport >> 6) & 63, 10)
3100 self.assert_packet_checksums_valid(p)
3102 self.logger.error(ppp("Unexpected or invalid packet:", p))
3105 def test_ipfix_max_frags(self):
3106 """ IPFIX logging maximum fragments pending reassembly exceeded """
3107 self.nat44_add_address(self.nat_addr)
3108 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3109 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3111 self.vapi.nat_set_reass(max_frag=0)
3112 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3113 src_address=self.pg3.local_ip4n,
3115 template_interval=10)
3116 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3117 src_port=self.ipfix_src_port)
3119 data = "A" * 4 + "B" * 16 + "C" * 3
3120 self.tcp_port_in = random.randint(1025, 65535)
3121 pkts = self.create_stream_frag(self.pg0,
3122 self.pg1.remote_ip4,
3126 self.pg0.add_stream(pkts[-1])
3127 self.pg_enable_capture(self.pg_interfaces)
3129 self.pg1.assert_nothing_captured()
3131 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3132 capture = self.pg3.get_capture(9)
3133 ipfix = IPFIXDecoder()
3134 # first load template
3136 self.assertTrue(p.haslayer(IPFIX))
3137 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3138 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3139 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3140 self.assertEqual(p[UDP].dport, 4739)
3141 self.assertEqual(p[IPFIX].observationDomainID,
3142 self.ipfix_domain_id)
3143 if p.haslayer(Template):
3144 ipfix.add_template(p.getlayer(Template))
3145 # verify events in data set
3147 if p.haslayer(Data):
3148 data = ipfix.decode_data_set(p.getlayer(Set))
3149 self.verify_ipfix_max_fragments_ip4(data, 0,
3150 self.pg0.remote_ip4n)
3152 def test_multiple_outside_vrf(self):
3153 """ Multiple outside VRF """
3157 self.pg1.unconfig_ip4()
3158 self.pg2.unconfig_ip4()
3159 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3160 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3161 self.pg1.set_table_ip4(vrf_id1)
3162 self.pg2.set_table_ip4(vrf_id2)
3163 self.pg1.config_ip4()
3164 self.pg2.config_ip4()
3165 self.pg1.resolve_arp()
3166 self.pg2.resolve_arp()
3168 self.nat44_add_address(self.nat_addr)
3169 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3170 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3172 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3177 pkts = self.create_stream_in(self.pg0, self.pg1)
3178 self.pg0.add_stream(pkts)
3179 self.pg_enable_capture(self.pg_interfaces)
3181 capture = self.pg1.get_capture(len(pkts))
3182 self.verify_capture_out(capture, self.nat_addr)
3184 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3185 self.pg1.add_stream(pkts)
3186 self.pg_enable_capture(self.pg_interfaces)
3188 capture = self.pg0.get_capture(len(pkts))
3189 self.verify_capture_in(capture, self.pg0)
3191 self.tcp_port_in = 60303
3192 self.udp_port_in = 60304
3193 self.icmp_id_in = 60305
3196 pkts = self.create_stream_in(self.pg0, self.pg2)
3197 self.pg0.add_stream(pkts)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg2.get_capture(len(pkts))
3201 self.verify_capture_out(capture, self.nat_addr)
3203 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3204 self.pg2.add_stream(pkts)
3205 self.pg_enable_capture(self.pg_interfaces)
3207 capture = self.pg0.get_capture(len(pkts))
3208 self.verify_capture_in(capture, self.pg0)
3211 self.pg1.unconfig_ip4()
3212 self.pg2.unconfig_ip4()
3213 self.pg1.set_table_ip4(0)
3214 self.pg2.set_table_ip4(0)
3215 self.pg1.config_ip4()
3216 self.pg2.config_ip4()
3217 self.pg1.resolve_arp()
3218 self.pg2.resolve_arp()
3221 super(TestNAT44, self).tearDown()
3222 if not self.vpp_dead:
3223 self.logger.info(self.vapi.cli("show nat44 addresses"))
3224 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3225 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3226 self.logger.info(self.vapi.cli("show nat44 interface address"))
3227 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3228 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3229 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3230 self.vapi.cli("nat addr-port-assignment-alg default")
3232 self.vapi.cli("clear logging")
3235 class TestNAT44EndpointDependent(MethodHolder):
3236 """ Endpoint-Dependent mapping and filtering test cases """
3239 def setUpConstants(cls):
3240 super(TestNAT44EndpointDependent, cls).setUpConstants()
3241 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3244 def setUpClass(cls):
3245 super(TestNAT44EndpointDependent, cls).setUpClass()
3246 cls.vapi.cli("set log class nat level debug")
3248 cls.tcp_port_in = 6303
3249 cls.tcp_port_out = 6303
3250 cls.udp_port_in = 6304
3251 cls.udp_port_out = 6304
3252 cls.icmp_id_in = 6305
3253 cls.icmp_id_out = 6305
3254 cls.nat_addr = '10.0.0.3'
3255 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3256 cls.ipfix_src_port = 4739
3257 cls.ipfix_domain_id = 1
3258 cls.tcp_external_port = 80
3260 cls.create_pg_interfaces(range(7))
3261 cls.interfaces = list(cls.pg_interfaces[0:3])
3263 for i in cls.interfaces:
3268 cls.pg0.generate_remote_hosts(3)
3269 cls.pg0.configure_ipv4_neighbors()
3273 cls.pg4.generate_remote_hosts(2)
3274 cls.pg4.config_ip4()
3275 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3276 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3280 cls.pg4.resolve_arp()
3281 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3282 cls.pg4.resolve_arp()
3284 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3285 cls.vapi.ip_table_add_del(1, is_add=1)
3287 cls.pg5._local_ip4 = "10.1.1.1"
3288 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3290 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3291 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3292 socket.AF_INET, cls.pg5.remote_ip4)
3293 cls.pg5.set_table_ip4(1)
3294 cls.pg5.config_ip4()
3296 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3297 dst_address_length=32,
3299 next_hop_sw_if_index=cls.pg5.sw_if_index,
3300 next_hop_address=zero_ip4n)
3302 cls.pg6._local_ip4 = "10.1.2.1"
3303 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3305 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3306 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3307 socket.AF_INET, cls.pg6.remote_ip4)
3308 cls.pg6.set_table_ip4(1)
3309 cls.pg6.config_ip4()
3311 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3312 dst_address_length=32,
3314 next_hop_sw_if_index=cls.pg6.sw_if_index,
3315 next_hop_address=zero_ip4n)
3317 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3318 dst_address_length=16,
3319 next_hop_address=zero_ip4n,
3321 next_hop_table_id=1)
3322 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3323 dst_address_length=0,
3324 next_hop_address=zero_ip4n,
3326 next_hop_table_id=0)
3327 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3328 dst_address_length=0,
3330 next_hop_sw_if_index=cls.pg1.sw_if_index,
3331 next_hop_address=cls.pg1.local_ip4n)
3333 cls.pg5.resolve_arp()
3334 cls.pg6.resolve_arp()
3337 super(TestNAT44EndpointDependent, cls).tearDownClass()
3340 def test_dynamic(self):
3341 """ NAT44 dynamic translation test """
3343 self.nat44_add_address(self.nat_addr)
3344 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3345 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3349 pkts = self.create_stream_in(self.pg0, self.pg1)
3350 self.pg0.add_stream(pkts)
3351 self.pg_enable_capture(self.pg_interfaces)
3353 capture = self.pg1.get_capture(len(pkts))
3354 self.verify_capture_out(capture)
3357 pkts = self.create_stream_out(self.pg1)
3358 self.pg1.add_stream(pkts)
3359 self.pg_enable_capture(self.pg_interfaces)
3361 capture = self.pg0.get_capture(len(pkts))
3362 self.verify_capture_in(capture, self.pg0)
3364 def test_forwarding(self):
3365 """ NAT44 forwarding test """
3367 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3368 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3370 self.vapi.nat44_forwarding_enable_disable(1)
3372 real_ip = self.pg0.remote_ip4n
3373 alias_ip = self.nat_addr_n
3374 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3375 external_ip=alias_ip)
3378 # in2out - static mapping match
3380 pkts = self.create_stream_out(self.pg1)
3381 self.pg1.add_stream(pkts)
3382 self.pg_enable_capture(self.pg_interfaces)
3384 capture = self.pg0.get_capture(len(pkts))
3385 self.verify_capture_in(capture, self.pg0)
3387 pkts = self.create_stream_in(self.pg0, self.pg1)
3388 self.pg0.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 capture = self.pg1.get_capture(len(pkts))
3392 self.verify_capture_out(capture, same_port=True)
3394 # in2out - no static mapping match
3396 host0 = self.pg0.remote_hosts[0]
3397 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3399 pkts = self.create_stream_out(self.pg1,
3400 dst_ip=self.pg0.remote_ip4,
3401 use_inside_ports=True)
3402 self.pg1.add_stream(pkts)
3403 self.pg_enable_capture(self.pg_interfaces)
3405 capture = self.pg0.get_capture(len(pkts))
3406 self.verify_capture_in(capture, self.pg0)
3408 pkts = self.create_stream_in(self.pg0, self.pg1)
3409 self.pg0.add_stream(pkts)
3410 self.pg_enable_capture(self.pg_interfaces)
3412 capture = self.pg1.get_capture(len(pkts))
3413 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3416 self.pg0.remote_hosts[0] = host0
3418 user = self.pg0.remote_hosts[1]
3419 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3420 self.assertEqual(len(sessions), 3)
3421 self.assertTrue(sessions[0].ext_host_valid)
3422 self.vapi.nat44_del_session(
3423 sessions[0].inside_ip_address,
3424 sessions[0].inside_port,
3425 sessions[0].protocol,
3426 ext_host_address=sessions[0].ext_host_address,
3427 ext_host_port=sessions[0].ext_host_port)
3428 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3429 self.assertEqual(len(sessions), 2)
3432 self.vapi.nat44_forwarding_enable_disable(0)
3433 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3434 external_ip=alias_ip,
3437 def test_static_lb(self):
3438 """ NAT44 local service load balancing """
3439 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3442 server1 = self.pg0.remote_hosts[0]
3443 server2 = self.pg0.remote_hosts[1]
3445 locals = [{'addr': server1.ip4n,
3449 {'addr': server2.ip4n,
3454 self.nat44_add_address(self.nat_addr)
3455 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3458 local_num=len(locals),
3460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3464 # from client to service
3465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3466 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3467 TCP(sport=12345, dport=external_port))
3468 self.pg1.add_stream(p)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(1)
3477 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3478 if ip.dst == server1.ip4:
3482 self.assertEqual(tcp.dport, local_port)
3483 self.assert_packet_checksums_valid(p)
3485 self.logger.error(ppp("Unexpected or invalid packet:", p))
3488 # from service back to client
3489 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3490 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3491 TCP(sport=local_port, dport=12345))
3492 self.pg0.add_stream(p)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 capture = self.pg1.get_capture(1)
3500 self.assertEqual(ip.src, self.nat_addr)
3501 self.assertEqual(tcp.sport, external_port)
3502 self.assert_packet_checksums_valid(p)
3504 self.logger.error(ppp("Unexpected or invalid packet:", p))
3507 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3508 self.assertEqual(len(sessions), 1)
3509 self.assertTrue(sessions[0].ext_host_valid)
3510 self.vapi.nat44_del_session(
3511 sessions[0].inside_ip_address,
3512 sessions[0].inside_port,
3513 sessions[0].protocol,
3514 ext_host_address=sessions[0].ext_host_address,
3515 ext_host_port=sessions[0].ext_host_port)
3516 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3517 self.assertEqual(len(sessions), 0)
3519 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3520 def test_static_lb_multi_clients(self):
3521 """ NAT44 local service load balancing - multiple clients"""
3523 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3526 server1 = self.pg0.remote_hosts[0]
3527 server2 = self.pg0.remote_hosts[1]
3529 locals = [{'addr': server1.ip4n,
3533 {'addr': server2.ip4n,
3538 self.nat44_add_address(self.nat_addr)
3539 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3542 local_num=len(locals),
3544 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3545 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3550 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3552 for client in clients:
3553 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3554 IP(src=client, dst=self.nat_addr) /
3555 TCP(sport=12345, dport=external_port))
3557 self.pg1.add_stream(pkts)
3558 self.pg_enable_capture(self.pg_interfaces)
3560 capture = self.pg0.get_capture(len(pkts))
3562 if p[IP].dst == server1.ip4:
3566 self.assertTrue(server1_n > server2_n)
3568 def test_static_lb_2(self):
3569 """ NAT44 local service load balancing (asymmetrical rule) """
3570 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3573 server1 = self.pg0.remote_hosts[0]
3574 server2 = self.pg0.remote_hosts[1]
3576 locals = [{'addr': server1.ip4n,
3580 {'addr': server2.ip4n,
3585 self.vapi.nat44_forwarding_enable_disable(1)
3586 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3590 local_num=len(locals),
3592 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3593 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3596 # from client to service
3597 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3598 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3599 TCP(sport=12345, dport=external_port))
3600 self.pg1.add_stream(p)
3601 self.pg_enable_capture(self.pg_interfaces)
3603 capture = self.pg0.get_capture(1)
3609 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3610 if ip.dst == server1.ip4:
3614 self.assertEqual(tcp.dport, local_port)
3615 self.assert_packet_checksums_valid(p)
3617 self.logger.error(ppp("Unexpected or invalid packet:", p))
3620 # from service back to client
3621 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3622 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3623 TCP(sport=local_port, dport=12345))
3624 self.pg0.add_stream(p)
3625 self.pg_enable_capture(self.pg_interfaces)
3627 capture = self.pg1.get_capture(1)
3632 self.assertEqual(ip.src, self.nat_addr)
3633 self.assertEqual(tcp.sport, external_port)
3634 self.assert_packet_checksums_valid(p)
3636 self.logger.error(ppp("Unexpected or invalid packet:", p))
3639 # from client to server (no translation)
3640 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3641 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3642 TCP(sport=12346, dport=local_port))
3643 self.pg1.add_stream(p)
3644 self.pg_enable_capture(self.pg_interfaces)
3646 capture = self.pg0.get_capture(1)
3652 self.assertEqual(ip.dst, server1.ip4)
3653 self.assertEqual(tcp.dport, local_port)
3654 self.assert_packet_checksums_valid(p)
3656 self.logger.error(ppp("Unexpected or invalid packet:", p))
3659 # from service back to client (no translation)
3660 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3661 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3662 TCP(sport=local_port, dport=12346))
3663 self.pg0.add_stream(p)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 capture = self.pg1.get_capture(1)
3671 self.assertEqual(ip.src, server1.ip4)
3672 self.assertEqual(tcp.sport, local_port)
3673 self.assert_packet_checksums_valid(p)
3675 self.logger.error(ppp("Unexpected or invalid packet:", p))
3678 def test_unknown_proto(self):
3679 """ NAT44 translate packet with unknown protocol """
3680 self.nat44_add_address(self.nat_addr)
3681 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3682 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3687 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3688 TCP(sport=self.tcp_port_in, dport=20))
3689 self.pg0.add_stream(p)
3690 self.pg_enable_capture(self.pg_interfaces)
3692 p = self.pg1.get_capture(1)
3694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3695 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3697 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3698 TCP(sport=1234, dport=1234))
3699 self.pg0.add_stream(p)
3700 self.pg_enable_capture(self.pg_interfaces)
3702 p = self.pg1.get_capture(1)
3705 self.assertEqual(packet[IP].src, self.nat_addr)
3706 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3707 self.assertTrue(packet.haslayer(GRE))
3708 self.assert_packet_checksums_valid(packet)
3710 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3714 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3715 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3717 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3718 TCP(sport=1234, dport=1234))
3719 self.pg1.add_stream(p)
3720 self.pg_enable_capture(self.pg_interfaces)
3722 p = self.pg0.get_capture(1)
3725 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3726 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3727 self.assertTrue(packet.haslayer(GRE))
3728 self.assert_packet_checksums_valid(packet)
3730 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3733 def test_hairpinning_unknown_proto(self):
3734 """ NAT44 translate packet with unknown protocol - hairpinning """
3735 host = self.pg0.remote_hosts[0]
3736 server = self.pg0.remote_hosts[1]
3738 server_out_port = 8765
3739 server_nat_ip = "10.0.0.11"
3741 self.nat44_add_address(self.nat_addr)
3742 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3743 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3746 # add static mapping for server
3747 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3750 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3751 IP(src=host.ip4, dst=server_nat_ip) /
3752 TCP(sport=host_in_port, dport=server_out_port))
3753 self.pg0.add_stream(p)
3754 self.pg_enable_capture(self.pg_interfaces)
3756 self.pg0.get_capture(1)
3758 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3759 IP(src=host.ip4, dst=server_nat_ip) /
3761 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3762 TCP(sport=1234, dport=1234))
3763 self.pg0.add_stream(p)
3764 self.pg_enable_capture(self.pg_interfaces)
3766 p = self.pg0.get_capture(1)
3769 self.assertEqual(packet[IP].src, self.nat_addr)
3770 self.assertEqual(packet[IP].dst, server.ip4)
3771 self.assertTrue(packet.haslayer(GRE))
3772 self.assert_packet_checksums_valid(packet)
3774 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3778 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3779 IP(src=server.ip4, dst=self.nat_addr) /
3781 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3782 TCP(sport=1234, dport=1234))
3783 self.pg0.add_stream(p)
3784 self.pg_enable_capture(self.pg_interfaces)
3786 p = self.pg0.get_capture(1)
3789 self.assertEqual(packet[IP].src, server_nat_ip)
3790 self.assertEqual(packet[IP].dst, host.ip4)
3791 self.assertTrue(packet.haslayer(GRE))
3792 self.assert_packet_checksums_valid(packet)
3794 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3797 def test_output_feature_and_service(self):
3798 """ NAT44 interface output feature and services """
3799 external_addr = '1.2.3.4'
3803 self.vapi.nat44_forwarding_enable_disable(1)
3804 self.nat44_add_address(self.nat_addr)
3805 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3806 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3807 local_port, external_port,
3808 proto=IP_PROTOS.tcp, out2in_only=1)
3809 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3810 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3812 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3815 # from client to service
3816 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3817 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3818 TCP(sport=12345, dport=external_port))
3819 self.pg1.add_stream(p)
3820 self.pg_enable_capture(self.pg_interfaces)
3822 capture = self.pg0.get_capture(1)
3827 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3828 self.assertEqual(tcp.dport, local_port)
3829 self.assert_packet_checksums_valid(p)
3831 self.logger.error(ppp("Unexpected or invalid packet:", p))
3834 # from service back to client
3835 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3836 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3837 TCP(sport=local_port, dport=12345))
3838 self.pg0.add_stream(p)
3839 self.pg_enable_capture(self.pg_interfaces)
3841 capture = self.pg1.get_capture(1)
3846 self.assertEqual(ip.src, external_addr)
3847 self.assertEqual(tcp.sport, external_port)
3848 self.assert_packet_checksums_valid(p)
3850 self.logger.error(ppp("Unexpected or invalid packet:", p))
3853 # from local network host to external network
3854 pkts = self.create_stream_in(self.pg0, self.pg1)
3855 self.pg0.add_stream(pkts)
3856 self.pg_enable_capture(self.pg_interfaces)
3858 capture = self.pg1.get_capture(len(pkts))
3859 self.verify_capture_out(capture)
3860 pkts = self.create_stream_in(self.pg0, self.pg1)
3861 self.pg0.add_stream(pkts)
3862 self.pg_enable_capture(self.pg_interfaces)
3864 capture = self.pg1.get_capture(len(pkts))
3865 self.verify_capture_out(capture)
3867 # from external network back to local network host
3868 pkts = self.create_stream_out(self.pg1)
3869 self.pg1.add_stream(pkts)
3870 self.pg_enable_capture(self.pg_interfaces)
3872 capture = self.pg0.get_capture(len(pkts))
3873 self.verify_capture_in(capture, self.pg0)
3875 def test_output_feature_and_service2(self):
3876 """ NAT44 interface output feature and service host direct access """
3877 self.vapi.nat44_forwarding_enable_disable(1)
3878 self.nat44_add_address(self.nat_addr)
3879 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3882 # session initiaded from service host - translate
3883 pkts = self.create_stream_in(self.pg0, self.pg1)
3884 self.pg0.add_stream(pkts)
3885 self.pg_enable_capture(self.pg_interfaces)
3887 capture = self.pg1.get_capture(len(pkts))
3888 self.verify_capture_out(capture)
3890 pkts = self.create_stream_out(self.pg1)
3891 self.pg1.add_stream(pkts)
3892 self.pg_enable_capture(self.pg_interfaces)
3894 capture = self.pg0.get_capture(len(pkts))
3895 self.verify_capture_in(capture, self.pg0)
3897 # session initiaded from remote host - do not translate
3898 self.tcp_port_in = 60303
3899 self.udp_port_in = 60304
3900 self.icmp_id_in = 60305
3901 pkts = self.create_stream_out(self.pg1,
3902 self.pg0.remote_ip4,
3903 use_inside_ports=True)
3904 self.pg1.add_stream(pkts)
3905 self.pg_enable_capture(self.pg_interfaces)
3907 capture = self.pg0.get_capture(len(pkts))
3908 self.verify_capture_in(capture, self.pg0)
3910 pkts = self.create_stream_in(self.pg0, self.pg1)
3911 self.pg0.add_stream(pkts)
3912 self.pg_enable_capture(self.pg_interfaces)
3914 capture = self.pg1.get_capture(len(pkts))
3915 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3918 def test_output_feature_and_service3(self):
3919 """ NAT44 interface output feature and DST NAT """
3920 external_addr = '1.2.3.4'
3924 self.vapi.nat44_forwarding_enable_disable(1)
3925 self.nat44_add_address(self.nat_addr)
3926 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3927 local_port, external_port,
3928 proto=IP_PROTOS.tcp, out2in_only=1)
3929 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3930 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3932 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3935 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3936 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3937 TCP(sport=12345, dport=external_port))
3938 self.pg0.add_stream(p)
3939 self.pg_enable_capture(self.pg_interfaces)
3941 capture = self.pg1.get_capture(1)
3946 self.assertEqual(ip.src, self.pg0.remote_ip4)
3947 self.assertEqual(tcp.sport, 12345)
3948 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3949 self.assertEqual(tcp.dport, local_port)
3950 self.assert_packet_checksums_valid(p)
3952 self.logger.error(ppp("Unexpected or invalid packet:", p))
3955 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3956 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3957 TCP(sport=local_port, dport=12345))
3958 self.pg1.add_stream(p)
3959 self.pg_enable_capture(self.pg_interfaces)
3961 capture = self.pg0.get_capture(1)
3966 self.assertEqual(ip.src, external_addr)
3967 self.assertEqual(tcp.sport, external_port)
3968 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3969 self.assertEqual(tcp.dport, 12345)
3970 self.assert_packet_checksums_valid(p)
3972 self.logger.error(ppp("Unexpected or invalid packet:", p))
3975 def test_next_src_nat(self):
3976 """ On way back forward packet to nat44-in2out node. """
3977 twice_nat_addr = '10.0.1.3'
3980 post_twice_nat_port = 0
3982 self.vapi.nat44_forwarding_enable_disable(1)
3983 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3984 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
3985 local_port, external_port,
3986 proto=IP_PROTOS.tcp, out2in_only=1,
3987 self_twice_nat=1, vrf_id=1)
3988 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
3991 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
3992 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
3993 TCP(sport=12345, dport=external_port))
3994 self.pg6.add_stream(p)
3995 self.pg_enable_capture(self.pg_interfaces)
3997 capture = self.pg6.get_capture(1)
4002 self.assertEqual(ip.src, twice_nat_addr)
4003 self.assertNotEqual(tcp.sport, 12345)
4004 post_twice_nat_port = tcp.sport
4005 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4006 self.assertEqual(tcp.dport, local_port)
4007 self.assert_packet_checksums_valid(p)
4009 self.logger.error(ppp("Unexpected or invalid packet:", p))
4012 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4013 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4014 TCP(sport=local_port, dport=post_twice_nat_port))
4015 self.pg6.add_stream(p)
4016 self.pg_enable_capture(self.pg_interfaces)
4018 capture = self.pg6.get_capture(1)
4023 self.assertEqual(ip.src, self.pg1.remote_ip4)
4024 self.assertEqual(tcp.sport, external_port)
4025 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4026 self.assertEqual(tcp.dport, 12345)
4027 self.assert_packet_checksums_valid(p)
4029 self.logger.error(ppp("Unexpected or invalid packet:", p))
4032 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4034 twice_nat_addr = '10.0.1.3'
4042 port_in1 = port_in+1
4043 port_in2 = port_in+2
4048 server1 = self.pg0.remote_hosts[0]
4049 server2 = self.pg0.remote_hosts[1]
4061 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4064 self.nat44_add_address(self.nat_addr)
4065 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4067 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4069 proto=IP_PROTOS.tcp,
4070 twice_nat=int(not self_twice_nat),
4071 self_twice_nat=int(self_twice_nat))
4073 locals = [{'addr': server1.ip4n,
4077 {'addr': server2.ip4n,
4081 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4082 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4086 not self_twice_nat),
4089 local_num=len(locals),
4091 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4092 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4099 assert client_id is not None
4101 client = self.pg0.remote_hosts[0]
4102 elif client_id == 2:
4103 client = self.pg0.remote_hosts[1]
4105 client = pg1.remote_hosts[0]
4106 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4107 IP(src=client.ip4, dst=self.nat_addr) /
4108 TCP(sport=eh_port_out, dport=port_out))
4110 self.pg_enable_capture(self.pg_interfaces)
4112 capture = pg0.get_capture(1)
4118 if ip.dst == server1.ip4:
4124 self.assertEqual(ip.dst, server.ip4)
4126 self.assertIn(tcp.dport, [port_in1, port_in2])
4128 self.assertEqual(tcp.dport, port_in)
4130 self.assertEqual(ip.src, twice_nat_addr)
4131 self.assertNotEqual(tcp.sport, eh_port_out)
4133 self.assertEqual(ip.src, client.ip4)
4134 self.assertEqual(tcp.sport, eh_port_out)
4136 eh_port_in = tcp.sport
4137 saved_port_in = tcp.dport
4138 self.assert_packet_checksums_valid(p)
4140 self.logger.error(ppp("Unexpected or invalid packet:", p))
4143 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4144 IP(src=server.ip4, dst=eh_addr_in) /
4145 TCP(sport=saved_port_in, dport=eh_port_in))
4147 self.pg_enable_capture(self.pg_interfaces)
4149 capture = pg1.get_capture(1)
4154 self.assertEqual(ip.dst, client.ip4)
4155 self.assertEqual(ip.src, self.nat_addr)
4156 self.assertEqual(tcp.dport, eh_port_out)
4157 self.assertEqual(tcp.sport, port_out)
4158 self.assert_packet_checksums_valid(p)
4160 self.logger.error(ppp("Unexpected or invalid packet:", p))
4164 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4165 self.assertEqual(len(sessions), 1)
4166 self.assertTrue(sessions[0].ext_host_valid)
4167 self.assertTrue(sessions[0].is_twicenat)
4168 self.vapi.nat44_del_session(
4169 sessions[0].inside_ip_address,
4170 sessions[0].inside_port,
4171 sessions[0].protocol,
4172 ext_host_address=sessions[0].ext_host_nat_address,
4173 ext_host_port=sessions[0].ext_host_nat_port)
4174 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4175 self.assertEqual(len(sessions), 0)
4177 def test_twice_nat(self):
4179 self.twice_nat_common()
4181 def test_self_twice_nat_positive(self):
4182 """ Self Twice NAT44 (positive test) """
4183 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4185 def test_self_twice_nat_negative(self):
4186 """ Self Twice NAT44 (negative test) """
4187 self.twice_nat_common(self_twice_nat=True)
4189 def test_twice_nat_lb(self):
4190 """ Twice NAT44 local service load balancing """
4191 self.twice_nat_common(lb=True)
4193 def test_self_twice_nat_lb_positive(self):
4194 """ Self Twice NAT44 local service load balancing (positive test) """
4195 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4198 def test_self_twice_nat_lb_negative(self):
4199 """ Self Twice NAT44 local service load balancing (negative test) """
4200 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4203 def test_twice_nat_interface_addr(self):
4204 """ Acquire twice NAT44 addresses from interface """
4205 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4207 # no address in NAT pool
4208 adresses = self.vapi.nat44_address_dump()
4209 self.assertEqual(0, len(adresses))
4211 # configure interface address and check NAT address pool
4212 self.pg3.config_ip4()
4213 adresses = self.vapi.nat44_address_dump()
4214 self.assertEqual(1, len(adresses))
4215 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4216 self.assertEqual(adresses[0].twice_nat, 1)
4218 # remove interface address and check NAT address pool
4219 self.pg3.unconfig_ip4()
4220 adresses = self.vapi.nat44_address_dump()
4221 self.assertEqual(0, len(adresses))
4223 def test_tcp_session_close_in(self):
4224 """ Close TCP session from inside network """
4225 self.tcp_port_out = 10505
4226 self.nat44_add_address(self.nat_addr)
4227 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4231 proto=IP_PROTOS.tcp,
4233 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4234 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4237 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4238 start_sessnum = len(sessions)
4240 self.initiate_tcp_session(self.pg0, self.pg1)
4242 # FIN packet in -> out
4243 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4244 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4245 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4246 flags="FA", seq=100, ack=300))
4247 self.pg0.add_stream(p)
4248 self.pg_enable_capture(self.pg_interfaces)
4250 self.pg1.get_capture(1)
4254 # ACK packet out -> in
4255 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4256 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4257 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4258 flags="A", seq=300, ack=101))
4261 # FIN packet out -> in
4262 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4263 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4264 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4265 flags="FA", seq=300, ack=101))
4268 self.pg1.add_stream(pkts)
4269 self.pg_enable_capture(self.pg_interfaces)
4271 self.pg0.get_capture(2)
4273 # ACK packet in -> out
4274 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4275 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4276 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4277 flags="A", seq=101, ack=301))
4278 self.pg0.add_stream(p)
4279 self.pg_enable_capture(self.pg_interfaces)
4281 self.pg1.get_capture(1)
4283 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4285 self.assertEqual(len(sessions) - start_sessnum, 0)
4287 def test_tcp_session_close_out(self):
4288 """ Close TCP session from outside network """
4289 self.tcp_port_out = 10505
4290 self.nat44_add_address(self.nat_addr)
4291 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4295 proto=IP_PROTOS.tcp,
4297 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4298 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4301 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4302 start_sessnum = len(sessions)
4304 self.initiate_tcp_session(self.pg0, self.pg1)
4306 # FIN packet out -> in
4307 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4308 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4309 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4310 flags="FA", seq=100, ack=300))
4311 self.pg1.add_stream(p)
4312 self.pg_enable_capture(self.pg_interfaces)
4314 self.pg0.get_capture(1)
4316 # FIN+ACK packet in -> out
4317 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4318 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4319 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4320 flags="FA", seq=300, ack=101))
4322 self.pg0.add_stream(p)
4323 self.pg_enable_capture(self.pg_interfaces)
4325 self.pg1.get_capture(1)
4327 # ACK packet out -> in
4328 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4329 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4330 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4331 flags="A", seq=101, ack=301))
4332 self.pg1.add_stream(p)
4333 self.pg_enable_capture(self.pg_interfaces)
4335 self.pg0.get_capture(1)
4337 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4339 self.assertEqual(len(sessions) - start_sessnum, 0)
4341 def test_tcp_session_close_simultaneous(self):
4342 """ Close TCP session from inside network """
4343 self.tcp_port_out = 10505
4344 self.nat44_add_address(self.nat_addr)
4345 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4349 proto=IP_PROTOS.tcp,
4351 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4352 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4355 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4356 start_sessnum = len(sessions)
4358 self.initiate_tcp_session(self.pg0, self.pg1)
4360 # FIN packet in -> out
4361 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4362 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4363 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4364 flags="FA", seq=100, ack=300))
4365 self.pg0.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 self.pg1.get_capture(1)
4370 # FIN packet out -> in
4371 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4372 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4373 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4374 flags="FA", seq=300, ack=100))
4375 self.pg1.add_stream(p)
4376 self.pg_enable_capture(self.pg_interfaces)
4378 self.pg0.get_capture(1)
4380 # ACK packet in -> out
4381 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4382 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4383 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4384 flags="A", seq=101, ack=301))
4385 self.pg0.add_stream(p)
4386 self.pg_enable_capture(self.pg_interfaces)
4388 self.pg1.get_capture(1)
4390 # ACK packet out -> in
4391 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4392 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4393 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4394 flags="A", seq=301, ack=101))
4395 self.pg1.add_stream(p)
4396 self.pg_enable_capture(self.pg_interfaces)
4398 self.pg0.get_capture(1)
4400 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4402 self.assertEqual(len(sessions) - start_sessnum, 0)
4404 def test_one_armed_nat44_static(self):
4405 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4406 remote_host = self.pg4.remote_hosts[0]
4407 local_host = self.pg4.remote_hosts[1]
4412 self.vapi.nat44_forwarding_enable_disable(1)
4413 self.nat44_add_address(self.nat_addr, twice_nat=1)
4414 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4415 local_port, external_port,
4416 proto=IP_PROTOS.tcp, out2in_only=1,
4418 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4419 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4422 # from client to service
4423 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4424 IP(src=remote_host.ip4, dst=self.nat_addr) /
4425 TCP(sport=12345, dport=external_port))
4426 self.pg4.add_stream(p)
4427 self.pg_enable_capture(self.pg_interfaces)
4429 capture = self.pg4.get_capture(1)
4434 self.assertEqual(ip.dst, local_host.ip4)
4435 self.assertEqual(ip.src, self.nat_addr)
4436 self.assertEqual(tcp.dport, local_port)
4437 self.assertNotEqual(tcp.sport, 12345)
4438 eh_port_in = tcp.sport
4439 self.assert_packet_checksums_valid(p)
4441 self.logger.error(ppp("Unexpected or invalid packet:", p))
4444 # from service back to client
4445 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4446 IP(src=local_host.ip4, dst=self.nat_addr) /
4447 TCP(sport=local_port, dport=eh_port_in))
4448 self.pg4.add_stream(p)
4449 self.pg_enable_capture(self.pg_interfaces)
4451 capture = self.pg4.get_capture(1)
4456 self.assertEqual(ip.src, self.nat_addr)
4457 self.assertEqual(ip.dst, remote_host.ip4)
4458 self.assertEqual(tcp.sport, external_port)
4459 self.assertEqual(tcp.dport, 12345)
4460 self.assert_packet_checksums_valid(p)
4462 self.logger.error(ppp("Unexpected or invalid packet:", p))
4465 def test_static_with_port_out2(self):
4466 """ 1:1 NAPT asymmetrical rule """
4471 self.vapi.nat44_forwarding_enable_disable(1)
4472 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4473 local_port, external_port,
4474 proto=IP_PROTOS.tcp, out2in_only=1)
4475 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4476 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4479 # from client to service
4480 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4481 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4482 TCP(sport=12345, dport=external_port))
4483 self.pg1.add_stream(p)
4484 self.pg_enable_capture(self.pg_interfaces)
4486 capture = self.pg0.get_capture(1)
4491 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4492 self.assertEqual(tcp.dport, local_port)
4493 self.assert_packet_checksums_valid(p)
4495 self.logger.error(ppp("Unexpected or invalid packet:", p))
4499 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4500 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4501 ICMP(type=11) / capture[0][IP])
4502 self.pg0.add_stream(p)
4503 self.pg_enable_capture(self.pg_interfaces)
4505 capture = self.pg1.get_capture(1)
4508 self.assertEqual(p[IP].src, self.nat_addr)
4510 self.assertEqual(inner.dst, self.nat_addr)
4511 self.assertEqual(inner[TCPerror].dport, external_port)
4513 self.logger.error(ppp("Unexpected or invalid packet:", p))
4516 # from service back to client
4517 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4518 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4519 TCP(sport=local_port, dport=12345))
4520 self.pg0.add_stream(p)
4521 self.pg_enable_capture(self.pg_interfaces)
4523 capture = self.pg1.get_capture(1)
4528 self.assertEqual(ip.src, self.nat_addr)
4529 self.assertEqual(tcp.sport, external_port)
4530 self.assert_packet_checksums_valid(p)
4532 self.logger.error(ppp("Unexpected or invalid packet:", p))
4536 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4537 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4538 ICMP(type=11) / capture[0][IP])
4539 self.pg1.add_stream(p)
4540 self.pg_enable_capture(self.pg_interfaces)
4542 capture = self.pg0.get_capture(1)
4545 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4547 self.assertEqual(inner.src, self.pg0.remote_ip4)
4548 self.assertEqual(inner[TCPerror].sport, local_port)
4550 self.logger.error(ppp("Unexpected or invalid packet:", p))
4553 # from client to server (no translation)
4554 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4555 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4556 TCP(sport=12346, dport=local_port))
4557 self.pg1.add_stream(p)
4558 self.pg_enable_capture(self.pg_interfaces)
4560 capture = self.pg0.get_capture(1)
4565 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4566 self.assertEqual(tcp.dport, local_port)
4567 self.assert_packet_checksums_valid(p)
4569 self.logger.error(ppp("Unexpected or invalid packet:", p))
4572 # from service back to client (no translation)
4573 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4574 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4575 TCP(sport=local_port, dport=12346))
4576 self.pg0.add_stream(p)
4577 self.pg_enable_capture(self.pg_interfaces)
4579 capture = self.pg1.get_capture(1)
4584 self.assertEqual(ip.src, self.pg0.remote_ip4)
4585 self.assertEqual(tcp.sport, local_port)
4586 self.assert_packet_checksums_valid(p)
4588 self.logger.error(ppp("Unexpected or invalid packet:", p))
4591 def test_output_feature(self):
4592 """ NAT44 interface output feature (in2out postrouting) """
4593 self.vapi.nat44_forwarding_enable_disable(1)
4594 self.nat44_add_address(self.nat_addr)
4595 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4597 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4601 pkts = self.create_stream_in(self.pg0, self.pg1)
4602 self.pg0.add_stream(pkts)
4603 self.pg_enable_capture(self.pg_interfaces)
4605 capture = self.pg1.get_capture(len(pkts))
4606 self.verify_capture_out(capture)
4609 pkts = self.create_stream_out(self.pg1)
4610 self.pg1.add_stream(pkts)
4611 self.pg_enable_capture(self.pg_interfaces)
4613 capture = self.pg0.get_capture(len(pkts))
4614 self.verify_capture_in(capture, self.pg0)
4616 def test_multiple_vrf(self):
4617 """ Multiple VRF setup """
4618 external_addr = '1.2.3.4'
4623 self.vapi.nat44_forwarding_enable_disable(1)
4624 self.nat44_add_address(self.nat_addr)
4625 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4626 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4628 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4630 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4631 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4633 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4635 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4636 local_port, external_port, vrf_id=1,
4637 proto=IP_PROTOS.tcp, out2in_only=1)
4638 self.nat44_add_static_mapping(
4639 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4640 local_port=local_port, vrf_id=0, external_port=external_port,
4641 proto=IP_PROTOS.tcp, out2in_only=1)
4643 # from client to service (both VRF1)
4644 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4645 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4646 TCP(sport=12345, dport=external_port))
4647 self.pg6.add_stream(p)
4648 self.pg_enable_capture(self.pg_interfaces)
4650 capture = self.pg5.get_capture(1)
4655 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4656 self.assertEqual(tcp.dport, local_port)
4657 self.assert_packet_checksums_valid(p)
4659 self.logger.error(ppp("Unexpected or invalid packet:", p))
4662 # from service back to client (both VRF1)
4663 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4664 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4665 TCP(sport=local_port, dport=12345))
4666 self.pg5.add_stream(p)
4667 self.pg_enable_capture(self.pg_interfaces)
4669 capture = self.pg6.get_capture(1)
4674 self.assertEqual(ip.src, external_addr)
4675 self.assertEqual(tcp.sport, external_port)
4676 self.assert_packet_checksums_valid(p)
4678 self.logger.error(ppp("Unexpected or invalid packet:", p))
4681 # dynamic NAT from VRF1 to VRF0 (output-feature)
4682 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4683 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4684 TCP(sport=2345, dport=22))
4685 self.pg5.add_stream(p)
4686 self.pg_enable_capture(self.pg_interfaces)
4688 capture = self.pg1.get_capture(1)
4693 self.assertEqual(ip.src, self.nat_addr)
4694 self.assertNotEqual(tcp.sport, 2345)
4695 self.assert_packet_checksums_valid(p)
4698 self.logger.error(ppp("Unexpected or invalid packet:", p))
4701 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4702 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4703 TCP(sport=22, dport=port))
4704 self.pg1.add_stream(p)
4705 self.pg_enable_capture(self.pg_interfaces)
4707 capture = self.pg5.get_capture(1)
4712 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4713 self.assertEqual(tcp.dport, 2345)
4714 self.assert_packet_checksums_valid(p)
4716 self.logger.error(ppp("Unexpected or invalid packet:", p))
4719 # from client VRF1 to service VRF0
4720 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4721 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4722 TCP(sport=12346, dport=external_port))
4723 self.pg6.add_stream(p)
4724 self.pg_enable_capture(self.pg_interfaces)
4726 capture = self.pg0.get_capture(1)
4731 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4732 self.assertEqual(tcp.dport, local_port)
4733 self.assert_packet_checksums_valid(p)
4735 self.logger.error(ppp("Unexpected or invalid packet:", p))
4738 # from service VRF0 back to client VRF1
4739 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4740 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4741 TCP(sport=local_port, dport=12346))
4742 self.pg0.add_stream(p)
4743 self.pg_enable_capture(self.pg_interfaces)
4745 capture = self.pg6.get_capture(1)
4750 self.assertEqual(ip.src, self.pg0.local_ip4)
4751 self.assertEqual(tcp.sport, external_port)
4752 self.assert_packet_checksums_valid(p)
4754 self.logger.error(ppp("Unexpected or invalid packet:", p))
4757 # from client VRF0 to service VRF1
4758 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4759 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4760 TCP(sport=12347, dport=external_port))
4761 self.pg0.add_stream(p)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 capture = self.pg5.get_capture(1)
4769 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4770 self.assertEqual(tcp.dport, local_port)
4771 self.assert_packet_checksums_valid(p)
4773 self.logger.error(ppp("Unexpected or invalid packet:", p))
4776 # from service VRF1 back to client VRF0
4777 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4778 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4779 TCP(sport=local_port, dport=12347))
4780 self.pg5.add_stream(p)
4781 self.pg_enable_capture(self.pg_interfaces)
4783 capture = self.pg0.get_capture(1)
4788 self.assertEqual(ip.src, external_addr)
4789 self.assertEqual(tcp.sport, external_port)
4790 self.assert_packet_checksums_valid(p)
4792 self.logger.error(ppp("Unexpected or invalid packet:", p))
4795 # from client to server (both VRF1, no translation)
4796 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4797 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4798 TCP(sport=12348, dport=local_port))
4799 self.pg6.add_stream(p)
4800 self.pg_enable_capture(self.pg_interfaces)
4802 capture = self.pg5.get_capture(1)
4807 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4808 self.assertEqual(tcp.dport, local_port)
4809 self.assert_packet_checksums_valid(p)
4811 self.logger.error(ppp("Unexpected or invalid packet:", p))
4814 # from server back to client (both VRF1, no translation)
4815 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4816 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4817 TCP(sport=local_port, dport=12348))
4818 self.pg5.add_stream(p)
4819 self.pg_enable_capture(self.pg_interfaces)
4821 capture = self.pg6.get_capture(1)
4826 self.assertEqual(ip.src, self.pg5.remote_ip4)
4827 self.assertEqual(tcp.sport, local_port)
4828 self.assert_packet_checksums_valid(p)
4830 self.logger.error(ppp("Unexpected or invalid packet:", p))
4833 # from client VRF1 to server VRF0 (no translation)
4834 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4835 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4836 TCP(sport=local_port, dport=12349))
4837 self.pg0.add_stream(p)
4838 self.pg_enable_capture(self.pg_interfaces)
4840 capture = self.pg6.get_capture(1)
4845 self.assertEqual(ip.src, self.pg0.remote_ip4)
4846 self.assertEqual(tcp.sport, local_port)
4847 self.assert_packet_checksums_valid(p)
4849 self.logger.error(ppp("Unexpected or invalid packet:", p))
4852 # from server VRF0 back to client VRF1 (no translation)
4853 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4854 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4855 TCP(sport=local_port, dport=12349))
4856 self.pg0.add_stream(p)
4857 self.pg_enable_capture(self.pg_interfaces)
4859 capture = self.pg6.get_capture(1)
4864 self.assertEqual(ip.src, self.pg0.remote_ip4)
4865 self.assertEqual(tcp.sport, local_port)
4866 self.assert_packet_checksums_valid(p)
4868 self.logger.error(ppp("Unexpected or invalid packet:", p))
4871 # from client VRF0 to server VRF1 (no translation)
4872 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4873 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4874 TCP(sport=12344, dport=local_port))
4875 self.pg0.add_stream(p)
4876 self.pg_enable_capture(self.pg_interfaces)
4878 capture = self.pg5.get_capture(1)
4883 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4884 self.assertEqual(tcp.dport, local_port)
4885 self.assert_packet_checksums_valid(p)
4887 self.logger.error(ppp("Unexpected or invalid packet:", p))
4890 # from server VRF1 back to client VRF0 (no translation)
4891 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4892 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4893 TCP(sport=local_port, dport=12344))
4894 self.pg5.add_stream(p)
4895 self.pg_enable_capture(self.pg_interfaces)
4897 capture = self.pg0.get_capture(1)
4902 self.assertEqual(ip.src, self.pg5.remote_ip4)
4903 self.assertEqual(tcp.sport, local_port)
4904 self.assert_packet_checksums_valid(p)
4906 self.logger.error(ppp("Unexpected or invalid packet:", p))
4910 super(TestNAT44EndpointDependent, self).tearDown()
4911 if not self.vpp_dead:
4912 self.logger.info(self.vapi.cli("show nat44 addresses"))
4913 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4914 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4915 self.logger.info(self.vapi.cli("show nat44 interface address"))
4916 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4917 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4919 self.vapi.cli("clear logging")
4922 class TestNAT44Out2InDPO(MethodHolder):
4923 """ NAT44 Test Cases using out2in DPO """
4926 def setUpConstants(cls):
4927 super(TestNAT44Out2InDPO, cls).setUpConstants()
4928 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4931 def setUpClass(cls):
4932 super(TestNAT44Out2InDPO, cls).setUpClass()
4933 cls.vapi.cli("set log class nat level debug")
4936 cls.tcp_port_in = 6303
4937 cls.tcp_port_out = 6303
4938 cls.udp_port_in = 6304
4939 cls.udp_port_out = 6304
4940 cls.icmp_id_in = 6305
4941 cls.icmp_id_out = 6305
4942 cls.nat_addr = '10.0.0.3'
4943 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4944 cls.dst_ip4 = '192.168.70.1'
4946 cls.create_pg_interfaces(range(2))
4949 cls.pg0.config_ip4()
4950 cls.pg0.resolve_arp()
4953 cls.pg1.config_ip6()
4954 cls.pg1.resolve_ndp()
4956 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4957 dst_address_length=0,
4958 next_hop_address=cls.pg1.remote_ip6n,
4959 next_hop_sw_if_index=cls.pg1.sw_if_index)
4962 super(TestNAT44Out2InDPO, cls).tearDownClass()
4965 def configure_xlat(self):
4966 self.dst_ip6_pfx = '1:2:3::'
4967 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4969 self.dst_ip6_pfx_len = 96
4970 self.src_ip6_pfx = '4:5:6::'
4971 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4973 self.src_ip6_pfx_len = 96
4974 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4975 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4976 '\x00\x00\x00\x00', 0, is_translation=1,
4979 def test_464xlat_ce(self):
4980 """ Test 464XLAT CE with NAT44 """
4982 self.configure_xlat()
4984 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4985 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4987 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4988 self.dst_ip6_pfx_len)
4989 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4990 self.src_ip6_pfx_len)
4993 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4994 self.pg0.add_stream(pkts)
4995 self.pg_enable_capture(self.pg_interfaces)
4997 capture = self.pg1.get_capture(len(pkts))
4998 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5001 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5003 self.pg1.add_stream(pkts)
5004 self.pg_enable_capture(self.pg_interfaces)
5006 capture = self.pg0.get_capture(len(pkts))
5007 self.verify_capture_in(capture, self.pg0)
5009 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5011 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5012 self.nat_addr_n, is_add=0)
5014 def test_464xlat_ce_no_nat(self):
5015 """ Test 464XLAT CE without NAT44 """
5017 self.configure_xlat()
5019 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5020 self.dst_ip6_pfx_len)
5021 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5022 self.src_ip6_pfx_len)
5024 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5025 self.pg0.add_stream(pkts)
5026 self.pg_enable_capture(self.pg_interfaces)
5028 capture = self.pg1.get_capture(len(pkts))
5029 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5030 nat_ip=out_dst_ip6, same_port=True)
5032 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5033 self.pg1.add_stream(pkts)
5034 self.pg_enable_capture(self.pg_interfaces)
5036 capture = self.pg0.get_capture(len(pkts))
5037 self.verify_capture_in(capture, self.pg0)
5040 class TestDeterministicNAT(MethodHolder):
5041 """ Deterministic NAT Test Cases """
5044 def setUpConstants(cls):
5045 super(TestDeterministicNAT, cls).setUpConstants()
5046 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5049 def setUpClass(cls):
5050 super(TestDeterministicNAT, cls).setUpClass()
5051 cls.vapi.cli("set log class nat level debug")
5054 cls.tcp_port_in = 6303
5055 cls.tcp_external_port = 6303
5056 cls.udp_port_in = 6304
5057 cls.udp_external_port = 6304
5058 cls.icmp_id_in = 6305
5059 cls.nat_addr = '10.0.0.3'
5061 cls.create_pg_interfaces(range(3))
5062 cls.interfaces = list(cls.pg_interfaces)
5064 for i in cls.interfaces:
5069 cls.pg0.generate_remote_hosts(2)
5070 cls.pg0.configure_ipv4_neighbors()
5073 super(TestDeterministicNAT, cls).tearDownClass()
5076 def create_stream_in(self, in_if, out_if, ttl=64):
5078 Create packet stream for inside network
5080 :param in_if: Inside interface
5081 :param out_if: Outside interface
5082 :param ttl: TTL of generated packets
5086 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5087 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5088 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5092 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5093 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5094 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5098 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5099 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5100 ICMP(id=self.icmp_id_in, type='echo-request'))
5105 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5107 Create packet stream for outside network
5109 :param out_if: Outside interface
5110 :param dst_ip: Destination IP address (Default use global NAT address)
5111 :param ttl: TTL of generated packets
5114 dst_ip = self.nat_addr
5117 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5118 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5119 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5123 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5124 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5125 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5129 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5130 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5131 ICMP(id=self.icmp_external_id, type='echo-reply'))
5136 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5138 Verify captured packets on outside network
5140 :param capture: Captured packets
5141 :param nat_ip: Translated IP address (Default use global NAT address)
5142 :param same_port: Sorce port number is not translated (Default False)
5143 :param packet_num: Expected number of packets (Default 3)
5146 nat_ip = self.nat_addr
5147 self.assertEqual(packet_num, len(capture))
5148 for packet in capture:
5150 self.assertEqual(packet[IP].src, nat_ip)
5151 if packet.haslayer(TCP):
5152 self.tcp_port_out = packet[TCP].sport
5153 elif packet.haslayer(UDP):
5154 self.udp_port_out = packet[UDP].sport
5156 self.icmp_external_id = packet[ICMP].id
5158 self.logger.error(ppp("Unexpected or invalid packet "
5159 "(outside network):", packet))
5162 def verify_ipfix_max_entries_per_user(self, data):
5164 Verify IPFIX maximum entries per user exceeded event
5166 :param data: Decoded IPFIX data records
5168 self.assertEqual(1, len(data))
5171 self.assertEqual(ord(record[230]), 13)
5172 # natQuotaExceededEvent
5173 self.assertEqual('\x03\x00\x00\x00', record[466])
5175 self.assertEqual('\xe8\x03\x00\x00', record[473])
5177 self.assertEqual(self.pg0.remote_ip4n, record[8])
5179 def test_deterministic_mode(self):
5180 """ NAT plugin run deterministic mode """
5181 in_addr = '172.16.255.0'
5182 out_addr = '172.17.255.50'
5183 in_addr_t = '172.16.255.20'
5184 in_addr_n = socket.inet_aton(in_addr)
5185 out_addr_n = socket.inet_aton(out_addr)
5186 in_addr_t_n = socket.inet_aton(in_addr_t)
5190 nat_config = self.vapi.nat_show_config()
5191 self.assertEqual(1, nat_config.deterministic)
5193 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5195 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5196 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5197 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5198 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5200 deterministic_mappings = self.vapi.nat_det_map_dump()
5201 self.assertEqual(len(deterministic_mappings), 1)
5202 dsm = deterministic_mappings[0]
5203 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5204 self.assertEqual(in_plen, dsm.in_plen)
5205 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5206 self.assertEqual(out_plen, dsm.out_plen)
5208 self.clear_nat_det()
5209 deterministic_mappings = self.vapi.nat_det_map_dump()
5210 self.assertEqual(len(deterministic_mappings), 0)
5212 def test_set_timeouts(self):
5213 """ Set deterministic NAT timeouts """
5214 timeouts_before = self.vapi.nat_det_get_timeouts()
5216 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
5217 timeouts_before.tcp_established + 10,
5218 timeouts_before.tcp_transitory + 10,
5219 timeouts_before.icmp + 10)
5221 timeouts_after = self.vapi.nat_det_get_timeouts()
5223 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5224 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5225 self.assertNotEqual(timeouts_before.tcp_established,
5226 timeouts_after.tcp_established)
5227 self.assertNotEqual(timeouts_before.tcp_transitory,
5228 timeouts_after.tcp_transitory)
5230 def test_det_in(self):
5231 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5233 nat_ip = "10.0.0.10"
5235 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5237 socket.inet_aton(nat_ip),
5239 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5240 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5244 pkts = self.create_stream_in(self.pg0, self.pg1)
5245 self.pg0.add_stream(pkts)
5246 self.pg_enable_capture(self.pg_interfaces)
5248 capture = self.pg1.get_capture(len(pkts))
5249 self.verify_capture_out(capture, nat_ip)
5252 pkts = self.create_stream_out(self.pg1, nat_ip)
5253 self.pg1.add_stream(pkts)
5254 self.pg_enable_capture(self.pg_interfaces)
5256 capture = self.pg0.get_capture(len(pkts))
5257 self.verify_capture_in(capture, self.pg0)
5260 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5261 self.assertEqual(len(sessions), 3)
5265 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5266 self.assertEqual(s.in_port, self.tcp_port_in)
5267 self.assertEqual(s.out_port, self.tcp_port_out)
5268 self.assertEqual(s.ext_port, self.tcp_external_port)
5272 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5273 self.assertEqual(s.in_port, self.udp_port_in)
5274 self.assertEqual(s.out_port, self.udp_port_out)
5275 self.assertEqual(s.ext_port, self.udp_external_port)
5279 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5280 self.assertEqual(s.in_port, self.icmp_id_in)
5281 self.assertEqual(s.out_port, self.icmp_external_id)
5283 def test_multiple_users(self):
5284 """ Deterministic NAT multiple users """
5286 nat_ip = "10.0.0.10"
5288 external_port = 6303
5290 host0 = self.pg0.remote_hosts[0]
5291 host1 = self.pg0.remote_hosts[1]
5293 self.vapi.nat_det_add_del_map(host0.ip4n,
5295 socket.inet_aton(nat_ip),
5297 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5298 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5302 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5303 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5304 TCP(sport=port_in, dport=external_port))
5305 self.pg0.add_stream(p)
5306 self.pg_enable_capture(self.pg_interfaces)
5308 capture = self.pg1.get_capture(1)
5313 self.assertEqual(ip.src, nat_ip)
5314 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5315 self.assertEqual(tcp.dport, external_port)
5316 port_out0 = tcp.sport
5318 self.logger.error(ppp("Unexpected or invalid packet:", p))
5322 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5323 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5324 TCP(sport=port_in, dport=external_port))
5325 self.pg0.add_stream(p)
5326 self.pg_enable_capture(self.pg_interfaces)
5328 capture = self.pg1.get_capture(1)
5333 self.assertEqual(ip.src, nat_ip)
5334 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5335 self.assertEqual(tcp.dport, external_port)
5336 port_out1 = tcp.sport
5338 self.logger.error(ppp("Unexpected or invalid packet:", p))
5341 dms = self.vapi.nat_det_map_dump()
5342 self.assertEqual(1, len(dms))
5343 self.assertEqual(2, dms[0].ses_num)
5346 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5347 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5348 TCP(sport=external_port, dport=port_out0))
5349 self.pg1.add_stream(p)
5350 self.pg_enable_capture(self.pg_interfaces)
5352 capture = self.pg0.get_capture(1)
5357 self.assertEqual(ip.src, self.pg1.remote_ip4)
5358 self.assertEqual(ip.dst, host0.ip4)
5359 self.assertEqual(tcp.dport, port_in)
5360 self.assertEqual(tcp.sport, external_port)
5362 self.logger.error(ppp("Unexpected or invalid packet:", p))
5366 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5367 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5368 TCP(sport=external_port, dport=port_out1))
5369 self.pg1.add_stream(p)
5370 self.pg_enable_capture(self.pg_interfaces)
5372 capture = self.pg0.get_capture(1)
5377 self.assertEqual(ip.src, self.pg1.remote_ip4)
5378 self.assertEqual(ip.dst, host1.ip4)
5379 self.assertEqual(tcp.dport, port_in)
5380 self.assertEqual(tcp.sport, external_port)
5382 self.logger.error(ppp("Unexpected or invalid packet", p))
5385 # session close api test
5386 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5388 self.pg1.remote_ip4n,
5390 dms = self.vapi.nat_det_map_dump()
5391 self.assertEqual(dms[0].ses_num, 1)
5393 self.vapi.nat_det_close_session_in(host0.ip4n,
5395 self.pg1.remote_ip4n,
5397 dms = self.vapi.nat_det_map_dump()
5398 self.assertEqual(dms[0].ses_num, 0)
5400 def test_tcp_session_close_detection_in(self):
5401 """ Deterministic NAT TCP session close from inside network """
5402 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5404 socket.inet_aton(self.nat_addr),
5406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5410 self.initiate_tcp_session(self.pg0, self.pg1)
5412 # close the session from inside
5414 # FIN packet in -> out
5415 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5416 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5417 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5419 self.pg0.add_stream(p)
5420 self.pg_enable_capture(self.pg_interfaces)
5422 self.pg1.get_capture(1)
5426 # ACK packet out -> in
5427 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5428 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5429 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5433 # FIN packet out -> in
5434 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5435 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5436 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5440 self.pg1.add_stream(pkts)
5441 self.pg_enable_capture(self.pg_interfaces)
5443 self.pg0.get_capture(2)
5445 # ACK packet in -> out
5446 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5447 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5448 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5450 self.pg0.add_stream(p)
5451 self.pg_enable_capture(self.pg_interfaces)
5453 self.pg1.get_capture(1)
5455 # Check if deterministic NAT44 closed the session
5456 dms = self.vapi.nat_det_map_dump()
5457 self.assertEqual(0, dms[0].ses_num)
5459 self.logger.error("TCP session termination failed")
5462 def test_tcp_session_close_detection_out(self):
5463 """ Deterministic NAT TCP session close from outside network """
5464 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5466 socket.inet_aton(self.nat_addr),
5468 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5469 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5472 self.initiate_tcp_session(self.pg0, self.pg1)
5474 # close the session from outside
5476 # FIN packet out -> in
5477 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5478 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5479 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5481 self.pg1.add_stream(p)
5482 self.pg_enable_capture(self.pg_interfaces)
5484 self.pg0.get_capture(1)
5488 # ACK packet in -> out
5489 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5490 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5491 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5495 # ACK packet in -> out
5496 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5497 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5498 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5502 self.pg0.add_stream(pkts)
5503 self.pg_enable_capture(self.pg_interfaces)
5505 self.pg1.get_capture(2)
5507 # ACK packet out -> in
5508 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5509 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5510 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5512 self.pg1.add_stream(p)
5513 self.pg_enable_capture(self.pg_interfaces)
5515 self.pg0.get_capture(1)
5517 # Check if deterministic NAT44 closed the session
5518 dms = self.vapi.nat_det_map_dump()
5519 self.assertEqual(0, dms[0].ses_num)
5521 self.logger.error("TCP session termination failed")
5524 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5525 def test_session_timeout(self):
5526 """ Deterministic NAT session timeouts """
5527 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5529 socket.inet_aton(self.nat_addr),
5531 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5532 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5535 self.initiate_tcp_session(self.pg0, self.pg1)
5536 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5537 pkts = self.create_stream_in(self.pg0, self.pg1)
5538 self.pg0.add_stream(pkts)
5539 self.pg_enable_capture(self.pg_interfaces)
5541 capture = self.pg1.get_capture(len(pkts))
5544 dms = self.vapi.nat_det_map_dump()
5545 self.assertEqual(0, dms[0].ses_num)
5547 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5548 def test_session_limit_per_user(self):
5549 """ Deterministic NAT maximum sessions per user limit """
5550 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5552 socket.inet_aton(self.nat_addr),
5554 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5555 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5557 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5558 src_address=self.pg2.local_ip4n,
5560 template_interval=10)
5561 self.vapi.nat_ipfix()
5564 for port in range(1025, 2025):
5565 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5566 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5567 UDP(sport=port, dport=port))
5570 self.pg0.add_stream(pkts)
5571 self.pg_enable_capture(self.pg_interfaces)
5573 capture = self.pg1.get_capture(len(pkts))
5575 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5576 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5577 UDP(sport=3001, dport=3002))
5578 self.pg0.add_stream(p)
5579 self.pg_enable_capture(self.pg_interfaces)
5581 capture = self.pg1.assert_nothing_captured()
5583 # verify ICMP error packet
5584 capture = self.pg0.get_capture(1)
5586 self.assertTrue(p.haslayer(ICMP))
5588 self.assertEqual(icmp.type, 3)
5589 self.assertEqual(icmp.code, 1)
5590 self.assertTrue(icmp.haslayer(IPerror))
5591 inner_ip = icmp[IPerror]
5592 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5593 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5595 dms = self.vapi.nat_det_map_dump()
5597 self.assertEqual(1000, dms[0].ses_num)
5599 # verify IPFIX logging
5600 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5602 capture = self.pg2.get_capture(2)
5603 ipfix = IPFIXDecoder()
5604 # first load template
5606 self.assertTrue(p.haslayer(IPFIX))
5607 if p.haslayer(Template):
5608 ipfix.add_template(p.getlayer(Template))
5609 # verify events in data set
5611 if p.haslayer(Data):
5612 data = ipfix.decode_data_set(p.getlayer(Set))
5613 self.verify_ipfix_max_entries_per_user(data)
5615 def clear_nat_det(self):
5617 Clear deterministic NAT configuration.
5619 self.vapi.nat_ipfix(enable=0)
5620 self.vapi.nat_det_set_timeouts()
5621 deterministic_mappings = self.vapi.nat_det_map_dump()
5622 for dsm in deterministic_mappings:
5623 self.vapi.nat_det_add_del_map(dsm.in_addr,
5629 interfaces = self.vapi.nat44_interface_dump()
5630 for intf in interfaces:
5631 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5636 super(TestDeterministicNAT, self).tearDown()
5637 if not self.vpp_dead:
5638 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5640 self.vapi.cli("show nat44 deterministic mappings"))
5642 self.vapi.cli("show nat44 deterministic timeouts"))
5644 self.vapi.cli("show nat44 deterministic sessions"))
5645 self.clear_nat_det()
5648 class TestNAT64(MethodHolder):
5649 """ NAT64 Test Cases """
5652 def setUpConstants(cls):
5653 super(TestNAT64, cls).setUpConstants()
5654 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5655 "nat64 st hash buckets 256", "}"])
5658 def setUpClass(cls):
5659 super(TestNAT64, cls).setUpClass()
5662 cls.tcp_port_in = 6303
5663 cls.tcp_port_out = 6303
5664 cls.udp_port_in = 6304
5665 cls.udp_port_out = 6304
5666 cls.icmp_id_in = 6305
5667 cls.icmp_id_out = 6305
5668 cls.nat_addr = '10.0.0.3'
5669 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5671 cls.vrf1_nat_addr = '10.0.10.3'
5672 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5674 cls.ipfix_src_port = 4739
5675 cls.ipfix_domain_id = 1
5677 cls.create_pg_interfaces(range(6))
5678 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5679 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5680 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5682 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5684 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5686 cls.pg0.generate_remote_hosts(2)
5688 for i in cls.ip6_interfaces:
5691 i.configure_ipv6_neighbors()
5693 for i in cls.ip4_interfaces:
5699 cls.pg3.config_ip4()
5700 cls.pg3.resolve_arp()
5701 cls.pg3.config_ip6()
5702 cls.pg3.configure_ipv6_neighbors()
5705 cls.pg5.config_ip6()
5708 super(TestNAT64, cls).tearDownClass()
5711 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5712 """ NAT64 inside interface handles Neighbor Advertisement """
5714 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5717 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5718 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5719 ICMPv6EchoRequest())
5721 self.pg5.add_stream(pkts)
5722 self.pg_enable_capture(self.pg_interfaces)
5725 # Wait for Neighbor Solicitation
5726 capture = self.pg5.get_capture(len(pkts))
5727 self.assertEqual(1, len(capture))
5730 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5731 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5732 tgt = packet[ICMPv6ND_NS].tgt
5734 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5737 # Send Neighbor Advertisement
5738 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5739 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5740 ICMPv6ND_NA(tgt=tgt) /
5741 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5743 self.pg5.add_stream(pkts)
5744 self.pg_enable_capture(self.pg_interfaces)
5747 # Try to send ping again
5749 self.pg5.add_stream(pkts)
5750 self.pg_enable_capture(self.pg_interfaces)
5753 # Wait for ping reply
5754 capture = self.pg5.get_capture(len(pkts))
5755 self.assertEqual(1, len(capture))
5758 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5759 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5760 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5762 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5765 def test_pool(self):
5766 """ Add/delete address to NAT64 pool """
5767 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5769 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5771 addresses = self.vapi.nat64_pool_addr_dump()
5772 self.assertEqual(len(addresses), 1)
5773 self.assertEqual(addresses[0].address, nat_addr)
5775 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5777 addresses = self.vapi.nat64_pool_addr_dump()
5778 self.assertEqual(len(addresses), 0)
5780 def test_interface(self):
5781 """ Enable/disable NAT64 feature on the interface """
5782 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5783 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5785 interfaces = self.vapi.nat64_interface_dump()
5786 self.assertEqual(len(interfaces), 2)
5789 for intf in interfaces:
5790 if intf.sw_if_index == self.pg0.sw_if_index:
5791 self.assertEqual(intf.is_inside, 1)
5793 elif intf.sw_if_index == self.pg1.sw_if_index:
5794 self.assertEqual(intf.is_inside, 0)
5796 self.assertTrue(pg0_found)
5797 self.assertTrue(pg1_found)
5799 features = self.vapi.cli("show interface features pg0")
5800 self.assertNotEqual(features.find('nat64-in2out'), -1)
5801 features = self.vapi.cli("show interface features pg1")
5802 self.assertNotEqual(features.find('nat64-out2in'), -1)
5804 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5805 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5807 interfaces = self.vapi.nat64_interface_dump()
5808 self.assertEqual(len(interfaces), 0)
5810 def test_static_bib(self):
5811 """ Add/delete static BIB entry """
5812 in_addr = socket.inet_pton(socket.AF_INET6,
5813 '2001:db8:85a3::8a2e:370:7334')
5814 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5817 proto = IP_PROTOS.tcp
5819 self.vapi.nat64_add_del_static_bib(in_addr,
5824 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5829 self.assertEqual(bibe.i_addr, in_addr)
5830 self.assertEqual(bibe.o_addr, out_addr)
5831 self.assertEqual(bibe.i_port, in_port)
5832 self.assertEqual(bibe.o_port, out_port)
5833 self.assertEqual(static_bib_num, 1)
5835 self.vapi.nat64_add_del_static_bib(in_addr,
5841 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5846 self.assertEqual(static_bib_num, 0)
5848 def test_set_timeouts(self):
5849 """ Set NAT64 timeouts """
5850 # verify default values
5851 timeouts = self.vapi.nat64_get_timeouts()
5852 self.assertEqual(timeouts.udp, 300)
5853 self.assertEqual(timeouts.icmp, 60)
5854 self.assertEqual(timeouts.tcp_trans, 240)
5855 self.assertEqual(timeouts.tcp_est, 7440)
5856 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5858 # set and verify custom values
5859 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5860 tcp_est=7450, tcp_incoming_syn=10)
5861 timeouts = self.vapi.nat64_get_timeouts()
5862 self.assertEqual(timeouts.udp, 200)
5863 self.assertEqual(timeouts.icmp, 30)
5864 self.assertEqual(timeouts.tcp_trans, 250)
5865 self.assertEqual(timeouts.tcp_est, 7450)
5866 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5868 def test_dynamic(self):
5869 """ NAT64 dynamic translation test """
5870 self.tcp_port_in = 6303
5871 self.udp_port_in = 6304
5872 self.icmp_id_in = 6305
5874 ses_num_start = self.nat64_get_ses_num()
5876 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5878 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5879 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5882 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5883 self.pg0.add_stream(pkts)
5884 self.pg_enable_capture(self.pg_interfaces)
5886 capture = self.pg1.get_capture(len(pkts))
5887 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5888 dst_ip=self.pg1.remote_ip4)
5891 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5892 self.pg1.add_stream(pkts)
5893 self.pg_enable_capture(self.pg_interfaces)
5895 capture = self.pg0.get_capture(len(pkts))
5896 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5897 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5900 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5901 self.pg0.add_stream(pkts)
5902 self.pg_enable_capture(self.pg_interfaces)
5904 capture = self.pg1.get_capture(len(pkts))
5905 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5906 dst_ip=self.pg1.remote_ip4)
5909 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5910 self.pg1.add_stream(pkts)
5911 self.pg_enable_capture(self.pg_interfaces)
5913 capture = self.pg0.get_capture(len(pkts))
5914 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5916 ses_num_end = self.nat64_get_ses_num()
5918 self.assertEqual(ses_num_end - ses_num_start, 3)
5920 # tenant with specific VRF
5921 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5922 self.vrf1_nat_addr_n,
5923 vrf_id=self.vrf1_id)
5924 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5926 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5927 self.pg2.add_stream(pkts)
5928 self.pg_enable_capture(self.pg_interfaces)
5930 capture = self.pg1.get_capture(len(pkts))
5931 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5932 dst_ip=self.pg1.remote_ip4)
5934 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5935 self.pg1.add_stream(pkts)
5936 self.pg_enable_capture(self.pg_interfaces)
5938 capture = self.pg2.get_capture(len(pkts))
5939 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5941 def test_static(self):
5942 """ NAT64 static translation test """
5943 self.tcp_port_in = 60303
5944 self.udp_port_in = 60304
5945 self.icmp_id_in = 60305
5946 self.tcp_port_out = 60303
5947 self.udp_port_out = 60304
5948 self.icmp_id_out = 60305
5950 ses_num_start = self.nat64_get_ses_num()
5952 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5954 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5955 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5957 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5962 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5967 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5974 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5975 self.pg0.add_stream(pkts)
5976 self.pg_enable_capture(self.pg_interfaces)
5978 capture = self.pg1.get_capture(len(pkts))
5979 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5980 dst_ip=self.pg1.remote_ip4, same_port=True)
5983 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5984 self.pg1.add_stream(pkts)
5985 self.pg_enable_capture(self.pg_interfaces)
5987 capture = self.pg0.get_capture(len(pkts))
5988 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5989 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5991 ses_num_end = self.nat64_get_ses_num()
5993 self.assertEqual(ses_num_end - ses_num_start, 3)
5995 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5996 def test_session_timeout(self):
5997 """ NAT64 session timeout """
5998 self.icmp_id_in = 1234
5999 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6001 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6002 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6003 self.vapi.nat64_set_timeouts(icmp=5, tcp_trans=5, tcp_est=5)
6005 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6006 self.pg0.add_stream(pkts)
6007 self.pg_enable_capture(self.pg_interfaces)
6009 capture = self.pg1.get_capture(len(pkts))
6011 ses_num_before_timeout = self.nat64_get_ses_num()
6015 # ICMP and TCP session after timeout
6016 ses_num_after_timeout = self.nat64_get_ses_num()
6017 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6019 def test_icmp_error(self):
6020 """ NAT64 ICMP Error message translation """
6021 self.tcp_port_in = 6303
6022 self.udp_port_in = 6304
6023 self.icmp_id_in = 6305
6025 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6027 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6028 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6030 # send some packets to create sessions
6031 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6032 self.pg0.add_stream(pkts)
6033 self.pg_enable_capture(self.pg_interfaces)
6035 capture_ip4 = self.pg1.get_capture(len(pkts))
6036 self.verify_capture_out(capture_ip4,
6037 nat_ip=self.nat_addr,
6038 dst_ip=self.pg1.remote_ip4)
6040 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6041 self.pg1.add_stream(pkts)
6042 self.pg_enable_capture(self.pg_interfaces)
6044 capture_ip6 = self.pg0.get_capture(len(pkts))
6045 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6046 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6047 self.pg0.remote_ip6)
6050 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6051 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6052 ICMPv6DestUnreach(code=1) /
6053 packet[IPv6] for packet in capture_ip6]
6054 self.pg0.add_stream(pkts)
6055 self.pg_enable_capture(self.pg_interfaces)
6057 capture = self.pg1.get_capture(len(pkts))
6058 for packet in capture:
6060 self.assertEqual(packet[IP].src, self.nat_addr)
6061 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6062 self.assertEqual(packet[ICMP].type, 3)
6063 self.assertEqual(packet[ICMP].code, 13)
6064 inner = packet[IPerror]
6065 self.assertEqual(inner.src, self.pg1.remote_ip4)
6066 self.assertEqual(inner.dst, self.nat_addr)
6067 self.assert_packet_checksums_valid(packet)
6068 if inner.haslayer(TCPerror):
6069 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6070 elif inner.haslayer(UDPerror):
6071 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6073 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6075 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6079 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6080 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6081 ICMP(type=3, code=13) /
6082 packet[IP] for packet in capture_ip4]
6083 self.pg1.add_stream(pkts)
6084 self.pg_enable_capture(self.pg_interfaces)
6086 capture = self.pg0.get_capture(len(pkts))
6087 for packet in capture:
6089 self.assertEqual(packet[IPv6].src, ip.src)
6090 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6091 icmp = packet[ICMPv6DestUnreach]
6092 self.assertEqual(icmp.code, 1)
6093 inner = icmp[IPerror6]
6094 self.assertEqual(inner.src, self.pg0.remote_ip6)
6095 self.assertEqual(inner.dst, ip.src)
6096 self.assert_icmpv6_checksum_valid(packet)
6097 if inner.haslayer(TCPerror):
6098 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6099 elif inner.haslayer(UDPerror):
6100 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6102 self.assertEqual(inner[ICMPv6EchoRequest].id,
6105 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6108 def test_hairpinning(self):
6109 """ NAT64 hairpinning """
6111 client = self.pg0.remote_hosts[0]
6112 server = self.pg0.remote_hosts[1]
6113 server_tcp_in_port = 22
6114 server_tcp_out_port = 4022
6115 server_udp_in_port = 23
6116 server_udp_out_port = 4023
6117 client_tcp_in_port = 1234
6118 client_udp_in_port = 1235
6119 client_tcp_out_port = 0
6120 client_udp_out_port = 0
6121 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6122 nat_addr_ip6 = ip.src
6124 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6126 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6127 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6129 self.vapi.nat64_add_del_static_bib(server.ip6n,
6132 server_tcp_out_port,
6134 self.vapi.nat64_add_del_static_bib(server.ip6n,
6137 server_udp_out_port,
6142 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6143 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6144 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6146 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6147 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6148 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6150 self.pg0.add_stream(pkts)
6151 self.pg_enable_capture(self.pg_interfaces)
6153 capture = self.pg0.get_capture(len(pkts))
6154 for packet in capture:
6156 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6157 self.assertEqual(packet[IPv6].dst, server.ip6)
6158 self.assert_packet_checksums_valid(packet)
6159 if packet.haslayer(TCP):
6160 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6161 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6162 client_tcp_out_port = packet[TCP].sport
6164 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6165 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6166 client_udp_out_port = packet[UDP].sport
6168 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6173 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6174 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6175 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6177 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6178 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6179 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6181 self.pg0.add_stream(pkts)
6182 self.pg_enable_capture(self.pg_interfaces)
6184 capture = self.pg0.get_capture(len(pkts))
6185 for packet in capture:
6187 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6188 self.assertEqual(packet[IPv6].dst, client.ip6)
6189 self.assert_packet_checksums_valid(packet)
6190 if packet.haslayer(TCP):
6191 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6192 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6194 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6195 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6197 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6202 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6203 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6204 ICMPv6DestUnreach(code=1) /
6205 packet[IPv6] for packet in capture]
6206 self.pg0.add_stream(pkts)
6207 self.pg_enable_capture(self.pg_interfaces)
6209 capture = self.pg0.get_capture(len(pkts))
6210 for packet in capture:
6212 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6213 self.assertEqual(packet[IPv6].dst, server.ip6)
6214 icmp = packet[ICMPv6DestUnreach]
6215 self.assertEqual(icmp.code, 1)
6216 inner = icmp[IPerror6]
6217 self.assertEqual(inner.src, server.ip6)
6218 self.assertEqual(inner.dst, nat_addr_ip6)
6219 self.assert_packet_checksums_valid(packet)
6220 if inner.haslayer(TCPerror):
6221 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6222 self.assertEqual(inner[TCPerror].dport,
6223 client_tcp_out_port)
6225 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6226 self.assertEqual(inner[UDPerror].dport,
6227 client_udp_out_port)
6229 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6232 def test_prefix(self):
6233 """ NAT64 Network-Specific Prefix """
6235 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6237 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6238 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6239 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6240 self.vrf1_nat_addr_n,
6241 vrf_id=self.vrf1_id)
6242 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6245 global_pref64 = "2001:db8::"
6246 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6247 global_pref64_len = 32
6248 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6250 prefix = self.vapi.nat64_prefix_dump()
6251 self.assertEqual(len(prefix), 1)
6252 self.assertEqual(prefix[0].prefix, global_pref64_n)
6253 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6254 self.assertEqual(prefix[0].vrf_id, 0)
6256 # Add tenant specific prefix
6257 vrf1_pref64 = "2001:db8:122:300::"
6258 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6259 vrf1_pref64_len = 56
6260 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6262 vrf_id=self.vrf1_id)
6263 prefix = self.vapi.nat64_prefix_dump()
6264 self.assertEqual(len(prefix), 2)
6267 pkts = self.create_stream_in_ip6(self.pg0,
6270 plen=global_pref64_len)
6271 self.pg0.add_stream(pkts)
6272 self.pg_enable_capture(self.pg_interfaces)
6274 capture = self.pg1.get_capture(len(pkts))
6275 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6276 dst_ip=self.pg1.remote_ip4)
6278 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6279 self.pg1.add_stream(pkts)
6280 self.pg_enable_capture(self.pg_interfaces)
6282 capture = self.pg0.get_capture(len(pkts))
6283 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6286 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6288 # Tenant specific prefix
6289 pkts = self.create_stream_in_ip6(self.pg2,
6292 plen=vrf1_pref64_len)
6293 self.pg2.add_stream(pkts)
6294 self.pg_enable_capture(self.pg_interfaces)
6296 capture = self.pg1.get_capture(len(pkts))
6297 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6298 dst_ip=self.pg1.remote_ip4)
6300 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6301 self.pg1.add_stream(pkts)
6302 self.pg_enable_capture(self.pg_interfaces)
6304 capture = self.pg2.get_capture(len(pkts))
6305 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6308 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6310 def test_unknown_proto(self):
6311 """ NAT64 translate packet with unknown protocol """
6313 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6315 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6316 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6317 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6320 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6321 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6322 TCP(sport=self.tcp_port_in, dport=20))
6323 self.pg0.add_stream(p)
6324 self.pg_enable_capture(self.pg_interfaces)
6326 p = self.pg1.get_capture(1)
6328 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6329 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6331 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6332 TCP(sport=1234, dport=1234))
6333 self.pg0.add_stream(p)
6334 self.pg_enable_capture(self.pg_interfaces)
6336 p = self.pg1.get_capture(1)
6339 self.assertEqual(packet[IP].src, self.nat_addr)
6340 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6341 self.assertTrue(packet.haslayer(GRE))
6342 self.assert_packet_checksums_valid(packet)
6344 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6348 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6349 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6351 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6352 TCP(sport=1234, dport=1234))
6353 self.pg1.add_stream(p)
6354 self.pg_enable_capture(self.pg_interfaces)
6356 p = self.pg0.get_capture(1)
6359 self.assertEqual(packet[IPv6].src, remote_ip6)
6360 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6361 self.assertEqual(packet[IPv6].nh, 47)
6363 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6366 def test_hairpinning_unknown_proto(self):
6367 """ NAT64 translate packet with unknown protocol - hairpinning """
6369 client = self.pg0.remote_hosts[0]
6370 server = self.pg0.remote_hosts[1]
6371 server_tcp_in_port = 22
6372 server_tcp_out_port = 4022
6373 client_tcp_in_port = 1234
6374 client_tcp_out_port = 1235
6375 server_nat_ip = "10.0.0.100"
6376 client_nat_ip = "10.0.0.110"
6377 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6378 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6379 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6380 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6382 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6384 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6385 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6387 self.vapi.nat64_add_del_static_bib(server.ip6n,
6390 server_tcp_out_port,
6393 self.vapi.nat64_add_del_static_bib(server.ip6n,
6399 self.vapi.nat64_add_del_static_bib(client.ip6n,
6402 client_tcp_out_port,
6406 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6407 IPv6(src=client.ip6, dst=server_nat_ip6) /
6408 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6409 self.pg0.add_stream(p)
6410 self.pg_enable_capture(self.pg_interfaces)
6412 p = self.pg0.get_capture(1)
6414 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6415 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6417 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6418 TCP(sport=1234, dport=1234))
6419 self.pg0.add_stream(p)
6420 self.pg_enable_capture(self.pg_interfaces)
6422 p = self.pg0.get_capture(1)
6425 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6426 self.assertEqual(packet[IPv6].dst, server.ip6)
6427 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6429 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6433 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6434 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6436 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6437 TCP(sport=1234, dport=1234))
6438 self.pg0.add_stream(p)
6439 self.pg_enable_capture(self.pg_interfaces)
6441 p = self.pg0.get_capture(1)
6444 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6445 self.assertEqual(packet[IPv6].dst, client.ip6)
6446 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6448 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6451 def test_one_armed_nat64(self):
6452 """ One armed NAT64 """
6454 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6458 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6460 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6461 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6464 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6465 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6466 TCP(sport=12345, dport=80))
6467 self.pg3.add_stream(p)
6468 self.pg_enable_capture(self.pg_interfaces)
6470 capture = self.pg3.get_capture(1)
6475 self.assertEqual(ip.src, self.nat_addr)
6476 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6477 self.assertNotEqual(tcp.sport, 12345)
6478 external_port = tcp.sport
6479 self.assertEqual(tcp.dport, 80)
6480 self.assert_packet_checksums_valid(p)
6482 self.logger.error(ppp("Unexpected or invalid packet:", p))
6486 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6487 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6488 TCP(sport=80, dport=external_port))
6489 self.pg3.add_stream(p)
6490 self.pg_enable_capture(self.pg_interfaces)
6492 capture = self.pg3.get_capture(1)
6497 self.assertEqual(ip.src, remote_host_ip6)
6498 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6499 self.assertEqual(tcp.sport, 80)
6500 self.assertEqual(tcp.dport, 12345)
6501 self.assert_packet_checksums_valid(p)
6503 self.logger.error(ppp("Unexpected or invalid packet:", p))
6506 def test_frag_in_order(self):
6507 """ NAT64 translate fragments arriving in order """
6508 self.tcp_port_in = random.randint(1025, 65535)
6510 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6512 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6513 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6515 reass = self.vapi.nat_reass_dump()
6516 reass_n_start = len(reass)
6520 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6521 self.tcp_port_in, 20, data)
6522 self.pg0.add_stream(pkts)
6523 self.pg_enable_capture(self.pg_interfaces)
6525 frags = self.pg1.get_capture(len(pkts))
6526 p = self.reass_frags_and_verify(frags,
6528 self.pg1.remote_ip4)
6529 self.assertEqual(p[TCP].dport, 20)
6530 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6531 self.tcp_port_out = p[TCP].sport
6532 self.assertEqual(data, p[Raw].load)
6535 data = "A" * 4 + "b" * 16 + "C" * 3
6536 pkts = self.create_stream_frag(self.pg1,
6541 self.pg1.add_stream(pkts)
6542 self.pg_enable_capture(self.pg_interfaces)
6544 frags = self.pg0.get_capture(len(pkts))
6545 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6546 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6547 self.assertEqual(p[TCP].sport, 20)
6548 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6549 self.assertEqual(data, p[Raw].load)
6551 reass = self.vapi.nat_reass_dump()
6552 reass_n_end = len(reass)
6554 self.assertEqual(reass_n_end - reass_n_start, 2)
6556 def test_reass_hairpinning(self):
6557 """ NAT64 fragments hairpinning """
6559 server = self.pg0.remote_hosts[1]
6560 server_in_port = random.randint(1025, 65535)
6561 server_out_port = random.randint(1025, 65535)
6562 client_in_port = random.randint(1025, 65535)
6563 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6564 nat_addr_ip6 = ip.src
6566 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6568 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6569 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6571 # add static BIB entry for server
6572 self.vapi.nat64_add_del_static_bib(server.ip6n,
6578 # send packet from host to server
6579 pkts = self.create_stream_frag_ip6(self.pg0,
6584 self.pg0.add_stream(pkts)
6585 self.pg_enable_capture(self.pg_interfaces)
6587 frags = self.pg0.get_capture(len(pkts))
6588 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6589 self.assertNotEqual(p[TCP].sport, client_in_port)
6590 self.assertEqual(p[TCP].dport, server_in_port)
6591 self.assertEqual(data, p[Raw].load)
6593 def test_frag_out_of_order(self):
6594 """ NAT64 translate fragments arriving out of order """
6595 self.tcp_port_in = random.randint(1025, 65535)
6597 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6599 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6600 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6604 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6605 self.tcp_port_in, 20, data)
6607 self.pg0.add_stream(pkts)
6608 self.pg_enable_capture(self.pg_interfaces)
6610 frags = self.pg1.get_capture(len(pkts))
6611 p = self.reass_frags_and_verify(frags,
6613 self.pg1.remote_ip4)
6614 self.assertEqual(p[TCP].dport, 20)
6615 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6616 self.tcp_port_out = p[TCP].sport
6617 self.assertEqual(data, p[Raw].load)
6620 data = "A" * 4 + "B" * 16 + "C" * 3
6621 pkts = self.create_stream_frag(self.pg1,
6627 self.pg1.add_stream(pkts)
6628 self.pg_enable_capture(self.pg_interfaces)
6630 frags = self.pg0.get_capture(len(pkts))
6631 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6632 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6633 self.assertEqual(p[TCP].sport, 20)
6634 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6635 self.assertEqual(data, p[Raw].load)
6637 def test_interface_addr(self):
6638 """ Acquire NAT64 pool addresses from interface """
6639 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6641 # no address in NAT64 pool
6642 adresses = self.vapi.nat44_address_dump()
6643 self.assertEqual(0, len(adresses))
6645 # configure interface address and check NAT64 address pool
6646 self.pg4.config_ip4()
6647 addresses = self.vapi.nat64_pool_addr_dump()
6648 self.assertEqual(len(addresses), 1)
6649 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6651 # remove interface address and check NAT64 address pool
6652 self.pg4.unconfig_ip4()
6653 addresses = self.vapi.nat64_pool_addr_dump()
6654 self.assertEqual(0, len(adresses))
6656 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6657 def test_ipfix_max_bibs_sessions(self):
6658 """ IPFIX logging maximum session and BIB entries exceeded """
6661 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6665 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6667 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6668 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6672 for i in range(0, max_bibs):
6673 src = "fd01:aa::%x" % (i)
6674 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6675 IPv6(src=src, dst=remote_host_ip6) /
6676 TCP(sport=12345, dport=80))
6678 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6679 IPv6(src=src, dst=remote_host_ip6) /
6680 TCP(sport=12345, dport=22))
6682 self.pg0.add_stream(pkts)
6683 self.pg_enable_capture(self.pg_interfaces)
6685 self.pg1.get_capture(max_sessions)
6687 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6688 src_address=self.pg3.local_ip4n,
6690 template_interval=10)
6691 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6692 src_port=self.ipfix_src_port)
6694 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6695 IPv6(src=src, dst=remote_host_ip6) /
6696 TCP(sport=12345, dport=25))
6697 self.pg0.add_stream(p)
6698 self.pg_enable_capture(self.pg_interfaces)
6700 self.pg1.assert_nothing_captured()
6702 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6703 capture = self.pg3.get_capture(9)
6704 ipfix = IPFIXDecoder()
6705 # first load template
6707 self.assertTrue(p.haslayer(IPFIX))
6708 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6709 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6710 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6711 self.assertEqual(p[UDP].dport, 4739)
6712 self.assertEqual(p[IPFIX].observationDomainID,
6713 self.ipfix_domain_id)
6714 if p.haslayer(Template):
6715 ipfix.add_template(p.getlayer(Template))
6716 # verify events in data set
6718 if p.haslayer(Data):
6719 data = ipfix.decode_data_set(p.getlayer(Set))
6720 self.verify_ipfix_max_sessions(data, max_sessions)
6722 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6723 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6724 TCP(sport=12345, dport=80))
6725 self.pg0.add_stream(p)
6726 self.pg_enable_capture(self.pg_interfaces)
6728 self.pg1.assert_nothing_captured()
6730 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6731 capture = self.pg3.get_capture(1)
6732 # verify events in data set
6734 self.assertTrue(p.haslayer(IPFIX))
6735 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6736 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6737 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6738 self.assertEqual(p[UDP].dport, 4739)
6739 self.assertEqual(p[IPFIX].observationDomainID,
6740 self.ipfix_domain_id)
6741 if p.haslayer(Data):
6742 data = ipfix.decode_data_set(p.getlayer(Set))
6743 self.verify_ipfix_max_bibs(data, max_bibs)
6745 def test_ipfix_max_frags(self):
6746 """ IPFIX logging maximum fragments pending reassembly exceeded """
6747 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6749 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6750 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6751 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6752 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6753 src_address=self.pg3.local_ip4n,
6755 template_interval=10)
6756 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6757 src_port=self.ipfix_src_port)
6760 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6761 self.tcp_port_in, 20, data)
6762 self.pg0.add_stream(pkts[-1])
6763 self.pg_enable_capture(self.pg_interfaces)
6765 self.pg1.assert_nothing_captured()
6767 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6768 capture = self.pg3.get_capture(9)
6769 ipfix = IPFIXDecoder()
6770 # first load template
6772 self.assertTrue(p.haslayer(IPFIX))
6773 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6774 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6775 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6776 self.assertEqual(p[UDP].dport, 4739)
6777 self.assertEqual(p[IPFIX].observationDomainID,
6778 self.ipfix_domain_id)
6779 if p.haslayer(Template):
6780 ipfix.add_template(p.getlayer(Template))
6781 # verify events in data set
6783 if p.haslayer(Data):
6784 data = ipfix.decode_data_set(p.getlayer(Set))
6785 self.verify_ipfix_max_fragments_ip6(data, 0,
6786 self.pg0.remote_ip6n)
6788 def test_ipfix_bib_ses(self):
6789 """ IPFIX logging NAT64 BIB/session create and delete events """
6790 self.tcp_port_in = random.randint(1025, 65535)
6791 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6795 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6797 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6798 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6799 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6800 src_address=self.pg3.local_ip4n,
6802 template_interval=10)
6803 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6804 src_port=self.ipfix_src_port)
6807 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6808 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6809 TCP(sport=self.tcp_port_in, dport=25))
6810 self.pg0.add_stream(p)
6811 self.pg_enable_capture(self.pg_interfaces)
6813 p = self.pg1.get_capture(1)
6814 self.tcp_port_out = p[0][TCP].sport
6815 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6816 capture = self.pg3.get_capture(10)
6817 ipfix = IPFIXDecoder()
6818 # first load template
6820 self.assertTrue(p.haslayer(IPFIX))
6821 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6822 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6823 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6824 self.assertEqual(p[UDP].dport, 4739)
6825 self.assertEqual(p[IPFIX].observationDomainID,
6826 self.ipfix_domain_id)
6827 if p.haslayer(Template):
6828 ipfix.add_template(p.getlayer(Template))
6829 # verify events in data set
6831 if p.haslayer(Data):
6832 data = ipfix.decode_data_set(p.getlayer(Set))
6833 if ord(data[0][230]) == 10:
6834 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6835 elif ord(data[0][230]) == 6:
6836 self.verify_ipfix_nat64_ses(data,
6838 self.pg0.remote_ip6n,
6839 self.pg1.remote_ip4,
6842 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6845 self.pg_enable_capture(self.pg_interfaces)
6846 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6849 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6850 capture = self.pg3.get_capture(2)
6851 # verify events in data set
6853 self.assertTrue(p.haslayer(IPFIX))
6854 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6855 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6856 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6857 self.assertEqual(p[UDP].dport, 4739)
6858 self.assertEqual(p[IPFIX].observationDomainID,
6859 self.ipfix_domain_id)
6860 if p.haslayer(Data):
6861 data = ipfix.decode_data_set(p.getlayer(Set))
6862 if ord(data[0][230]) == 11:
6863 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6864 elif ord(data[0][230]) == 7:
6865 self.verify_ipfix_nat64_ses(data,
6867 self.pg0.remote_ip6n,
6868 self.pg1.remote_ip4,
6871 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6873 def nat64_get_ses_num(self):
6875 Return number of active NAT64 sessions.
6877 st = self.vapi.nat64_st_dump()
6880 def clear_nat64(self):
6882 Clear NAT64 configuration.
6884 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6885 domain_id=self.ipfix_domain_id)
6886 self.ipfix_src_port = 4739
6887 self.ipfix_domain_id = 1
6889 self.vapi.nat64_set_timeouts()
6891 interfaces = self.vapi.nat64_interface_dump()
6892 for intf in interfaces:
6893 if intf.is_inside > 1:
6894 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6897 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6901 bib = self.vapi.nat64_bib_dump(255)
6904 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6912 adresses = self.vapi.nat64_pool_addr_dump()
6913 for addr in adresses:
6914 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6919 prefixes = self.vapi.nat64_prefix_dump()
6920 for prefix in prefixes:
6921 self.vapi.nat64_add_del_prefix(prefix.prefix,
6923 vrf_id=prefix.vrf_id,
6927 super(TestNAT64, self).tearDown()
6928 if not self.vpp_dead:
6929 self.logger.info(self.vapi.cli("show nat64 pool"))
6930 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6931 self.logger.info(self.vapi.cli("show nat64 prefix"))
6932 self.logger.info(self.vapi.cli("show nat64 bib all"))
6933 self.logger.info(self.vapi.cli("show nat64 session table all"))
6934 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6938 class TestDSlite(MethodHolder):
6939 """ DS-Lite Test Cases """
6942 def setUpClass(cls):
6943 super(TestDSlite, cls).setUpClass()
6946 cls.nat_addr = '10.0.0.3'
6947 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6949 cls.create_pg_interfaces(range(2))
6951 cls.pg0.config_ip4()
6952 cls.pg0.resolve_arp()
6954 cls.pg1.config_ip6()
6955 cls.pg1.generate_remote_hosts(2)
6956 cls.pg1.configure_ipv6_neighbors()
6959 super(TestDSlite, cls).tearDownClass()
6962 def test_dslite(self):
6963 """ Test DS-Lite """
6964 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6966 aftr_ip4 = '192.0.0.1'
6967 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6968 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6969 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6970 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6973 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6974 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6975 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6976 UDP(sport=20000, dport=10000))
6977 self.pg1.add_stream(p)
6978 self.pg_enable_capture(self.pg_interfaces)
6980 capture = self.pg0.get_capture(1)
6981 capture = capture[0]
6982 self.assertFalse(capture.haslayer(IPv6))
6983 self.assertEqual(capture[IP].src, self.nat_addr)
6984 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6985 self.assertNotEqual(capture[UDP].sport, 20000)
6986 self.assertEqual(capture[UDP].dport, 10000)
6987 self.assert_packet_checksums_valid(capture)
6988 out_port = capture[UDP].sport
6990 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6991 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6992 UDP(sport=10000, dport=out_port))
6993 self.pg0.add_stream(p)
6994 self.pg_enable_capture(self.pg_interfaces)
6996 capture = self.pg1.get_capture(1)
6997 capture = capture[0]
6998 self.assertEqual(capture[IPv6].src, aftr_ip6)
6999 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7000 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7001 self.assertEqual(capture[IP].dst, '192.168.1.1')
7002 self.assertEqual(capture[UDP].sport, 10000)
7003 self.assertEqual(capture[UDP].dport, 20000)
7004 self.assert_packet_checksums_valid(capture)
7007 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7008 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7009 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7010 TCP(sport=20001, dport=10001))
7011 self.pg1.add_stream(p)
7012 self.pg_enable_capture(self.pg_interfaces)
7014 capture = self.pg0.get_capture(1)
7015 capture = capture[0]
7016 self.assertFalse(capture.haslayer(IPv6))
7017 self.assertEqual(capture[IP].src, self.nat_addr)
7018 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7019 self.assertNotEqual(capture[TCP].sport, 20001)
7020 self.assertEqual(capture[TCP].dport, 10001)
7021 self.assert_packet_checksums_valid(capture)
7022 out_port = capture[TCP].sport
7024 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7025 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7026 TCP(sport=10001, dport=out_port))
7027 self.pg0.add_stream(p)
7028 self.pg_enable_capture(self.pg_interfaces)
7030 capture = self.pg1.get_capture(1)
7031 capture = capture[0]
7032 self.assertEqual(capture[IPv6].src, aftr_ip6)
7033 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7034 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7035 self.assertEqual(capture[IP].dst, '192.168.1.1')
7036 self.assertEqual(capture[TCP].sport, 10001)
7037 self.assertEqual(capture[TCP].dport, 20001)
7038 self.assert_packet_checksums_valid(capture)
7041 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7042 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7043 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7044 ICMP(id=4000, type='echo-request'))
7045 self.pg1.add_stream(p)
7046 self.pg_enable_capture(self.pg_interfaces)
7048 capture = self.pg0.get_capture(1)
7049 capture = capture[0]
7050 self.assertFalse(capture.haslayer(IPv6))
7051 self.assertEqual(capture[IP].src, self.nat_addr)
7052 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7053 self.assertNotEqual(capture[ICMP].id, 4000)
7054 self.assert_packet_checksums_valid(capture)
7055 out_id = capture[ICMP].id
7057 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7058 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7059 ICMP(id=out_id, type='echo-reply'))
7060 self.pg0.add_stream(p)
7061 self.pg_enable_capture(self.pg_interfaces)
7063 capture = self.pg1.get_capture(1)
7064 capture = capture[0]
7065 self.assertEqual(capture[IPv6].src, aftr_ip6)
7066 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7067 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7068 self.assertEqual(capture[IP].dst, '192.168.1.1')
7069 self.assertEqual(capture[ICMP].id, 4000)
7070 self.assert_packet_checksums_valid(capture)
7072 # ping DS-Lite AFTR tunnel endpoint address
7073 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7074 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7075 ICMPv6EchoRequest())
7076 self.pg1.add_stream(p)
7077 self.pg_enable_capture(self.pg_interfaces)
7079 capture = self.pg1.get_capture(1)
7080 self.assertEqual(1, len(capture))
7081 capture = capture[0]
7082 self.assertEqual(capture[IPv6].src, aftr_ip6)
7083 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7084 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7087 super(TestDSlite, self).tearDown()
7088 if not self.vpp_dead:
7089 self.logger.info(self.vapi.cli("show dslite pool"))
7091 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7092 self.logger.info(self.vapi.cli("show dslite sessions"))
7095 class TestDSliteCE(MethodHolder):
7096 """ DS-Lite CE Test Cases """
7099 def setUpConstants(cls):
7100 super(TestDSliteCE, cls).setUpConstants()
7101 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7104 def setUpClass(cls):
7105 super(TestDSliteCE, cls).setUpClass()
7108 cls.create_pg_interfaces(range(2))
7110 cls.pg0.config_ip4()
7111 cls.pg0.resolve_arp()
7113 cls.pg1.config_ip6()
7114 cls.pg1.generate_remote_hosts(1)
7115 cls.pg1.configure_ipv6_neighbors()
7118 super(TestDSliteCE, cls).tearDownClass()
7121 def test_dslite_ce(self):
7122 """ Test DS-Lite CE """
7124 b4_ip4 = '192.0.0.2'
7125 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7126 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7127 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7128 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7130 aftr_ip4 = '192.0.0.1'
7131 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7132 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7133 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7134 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7136 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7137 dst_address_length=128,
7138 next_hop_address=self.pg1.remote_ip6n,
7139 next_hop_sw_if_index=self.pg1.sw_if_index,
7143 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7144 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7145 UDP(sport=10000, dport=20000))
7146 self.pg0.add_stream(p)
7147 self.pg_enable_capture(self.pg_interfaces)
7149 capture = self.pg1.get_capture(1)
7150 capture = capture[0]
7151 self.assertEqual(capture[IPv6].src, b4_ip6)
7152 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7153 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7154 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7155 self.assertEqual(capture[UDP].sport, 10000)
7156 self.assertEqual(capture[UDP].dport, 20000)
7157 self.assert_packet_checksums_valid(capture)
7160 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7161 IPv6(dst=b4_ip6, src=aftr_ip6) /
7162 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7163 UDP(sport=20000, dport=10000))
7164 self.pg1.add_stream(p)
7165 self.pg_enable_capture(self.pg_interfaces)
7167 capture = self.pg0.get_capture(1)
7168 capture = capture[0]
7169 self.assertFalse(capture.haslayer(IPv6))
7170 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7171 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7172 self.assertEqual(capture[UDP].sport, 20000)
7173 self.assertEqual(capture[UDP].dport, 10000)
7174 self.assert_packet_checksums_valid(capture)
7176 # ping DS-Lite B4 tunnel endpoint address
7177 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7178 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7179 ICMPv6EchoRequest())
7180 self.pg1.add_stream(p)
7181 self.pg_enable_capture(self.pg_interfaces)
7183 capture = self.pg1.get_capture(1)
7184 self.assertEqual(1, len(capture))
7185 capture = capture[0]
7186 self.assertEqual(capture[IPv6].src, b4_ip6)
7187 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7188 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7191 super(TestDSliteCE, self).tearDown()
7192 if not self.vpp_dead:
7194 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7196 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7199 class TestNAT66(MethodHolder):
7200 """ NAT66 Test Cases """
7203 def setUpClass(cls):
7204 super(TestNAT66, cls).setUpClass()
7207 cls.nat_addr = 'fd01:ff::2'
7208 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7210 cls.create_pg_interfaces(range(2))
7211 cls.interfaces = list(cls.pg_interfaces)
7213 for i in cls.interfaces:
7216 i.configure_ipv6_neighbors()
7219 super(TestNAT66, cls).tearDownClass()
7222 def test_static(self):
7223 """ 1:1 NAT66 test """
7224 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7225 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7226 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7232 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7236 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7240 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7241 ICMPv6EchoRequest())
7243 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7244 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7245 GRE() / IP() / TCP())
7247 self.pg0.add_stream(pkts)
7248 self.pg_enable_capture(self.pg_interfaces)
7250 capture = self.pg1.get_capture(len(pkts))
7251 for packet in capture:
7253 self.assertEqual(packet[IPv6].src, self.nat_addr)
7254 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7255 self.assert_packet_checksums_valid(packet)
7257 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7262 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7263 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7266 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7267 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7270 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7271 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7274 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7275 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7276 GRE() / IP() / TCP())
7278 self.pg1.add_stream(pkts)
7279 self.pg_enable_capture(self.pg_interfaces)
7281 capture = self.pg0.get_capture(len(pkts))
7282 for packet in capture:
7284 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7285 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7286 self.assert_packet_checksums_valid(packet)
7288 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7291 sm = self.vapi.nat66_static_mapping_dump()
7292 self.assertEqual(len(sm), 1)
7293 self.assertEqual(sm[0].total_pkts, 8)
7295 def test_check_no_translate(self):
7296 """ NAT66 translate only when egress interface is outside interface """
7297 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7298 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7299 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7304 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7306 self.pg0.add_stream([p])
7307 self.pg_enable_capture(self.pg_interfaces)
7309 capture = self.pg1.get_capture(1)
7312 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7313 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7315 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7318 def clear_nat66(self):
7320 Clear NAT66 configuration.
7322 interfaces = self.vapi.nat66_interface_dump()
7323 for intf in interfaces:
7324 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7328 static_mappings = self.vapi.nat66_static_mapping_dump()
7329 for sm in static_mappings:
7330 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7331 sm.external_ip_address,
7336 super(TestNAT66, self).tearDown()
7337 if not self.vpp_dead:
7338 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7339 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7343 if __name__ == '__main__':
7344 unittest.main(testRunner=VppTestRunner)