9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
112 twice_nat=lb_sm.twice_nat,
113 self_twice_nat=lb_sm.self_twice_nat,
114 out2in_only=lb_sm.out2in_only,
120 identity_mappings = self.vapi.nat44_identity_mapping_dump()
121 for id_m in identity_mappings:
122 self.vapi.nat44_add_del_identity_mapping(
123 addr_only=id_m.addr_only,
126 sw_if_index=id_m.sw_if_index,
128 protocol=id_m.protocol,
131 adresses = self.vapi.nat44_address_dump()
132 for addr in adresses:
133 self.vapi.nat44_add_del_address_range(addr.ip_address,
135 twice_nat=addr.twice_nat,
138 self.vapi.nat_set_reass()
139 self.vapi.nat_set_reass(is_ip6=1)
141 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
142 local_port=0, external_port=0, vrf_id=0,
143 is_add=1, external_sw_if_index=0xFFFFFFFF,
144 proto=0, twice_nat=0, self_twice_nat=0,
145 out2in_only=0, tag=""):
147 Add/delete NAT44 static mapping
149 :param local_ip: Local IP address
150 :param external_ip: External IP address
151 :param local_port: Local port number (Optional)
152 :param external_port: External port number (Optional)
153 :param vrf_id: VRF ID (Default 0)
154 :param is_add: 1 if add, 0 if delete (Default add)
155 :param external_sw_if_index: External interface instead of IP address
156 :param proto: IP protocol (Mandatory if port specified)
157 :param twice_nat: 1 if translate external host address and port
158 :param self_twice_nat: 1 if translate external host address and port
159 whenever external host address equals
160 local address of internal host
161 :param out2in_only: if 1 rule is matching only out2in direction
162 :param tag: Opaque string tag
165 if local_port and external_port:
167 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
168 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
169 self.vapi.nat44_add_del_static_mapping(
172 external_sw_if_index,
184 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
186 Add/delete NAT44 address
188 :param ip: IP address
189 :param is_add: 1 if add, 0 if delete (Default add)
190 :param twice_nat: twice NAT address for extenal hosts
192 nat_addr = socket.inet_pton(socket.AF_INET, ip)
193 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
197 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
199 Create packet stream for inside network
201 :param in_if: Inside interface
202 :param out_if: Outside interface
203 :param dst_ip: Destination address
204 :param ttl: TTL of generated packets
207 dst_ip = out_if.remote_ip4
211 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
212 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 TCP(sport=self.tcp_port_in, dport=20))
217 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
218 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 UDP(sport=self.udp_port_in, dport=20))
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
225 ICMP(id=self.icmp_id_in, type='echo-request'))
230 def compose_ip6(self, ip4, pref, plen):
232 Compose IPv4-embedded IPv6 addresses
234 :param ip4: IPv4 address
235 :param pref: IPv6 prefix
236 :param plen: IPv6 prefix length
237 :returns: IPv4-embedded IPv6 addresses
239 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
240 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
255 pref_n[10] = ip4_n[3]
259 pref_n[10] = ip4_n[2]
260 pref_n[11] = ip4_n[3]
263 pref_n[10] = ip4_n[1]
264 pref_n[11] = ip4_n[2]
265 pref_n[12] = ip4_n[3]
267 pref_n[12] = ip4_n[0]
268 pref_n[13] = ip4_n[1]
269 pref_n[14] = ip4_n[2]
270 pref_n[15] = ip4_n[3]
271 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
273 def extract_ip4(self, ip6, plen):
275 Extract IPv4 address embedded in IPv6 addresses
277 :param ip6: IPv6 address
278 :param plen: IPv6 prefix length
279 :returns: extracted IPv4 address
281 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
313 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
315 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
317 Create IPv6 packet stream for inside network
319 :param in_if: Inside interface
320 :param out_if: Outside interface
321 :param ttl: Hop Limit of generated packets
322 :param pref: NAT64 prefix
323 :param plen: NAT64 prefix length
327 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
329 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
333 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
334 TCP(sport=self.tcp_port_in, dport=20))
338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
339 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
340 UDP(sport=self.udp_port_in, dport=20))
344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
345 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
346 ICMPv6EchoRequest(id=self.icmp_id_in))
351 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
352 use_inside_ports=False):
354 Create packet stream for outside network
356 :param out_if: Outside interface
357 :param dst_ip: Destination IP address (Default use global NAT address)
358 :param ttl: TTL of generated packets
359 :param use_inside_ports: Use inside NAT ports as destination ports
360 instead of outside ports
363 dst_ip = self.nat_addr
364 if not use_inside_ports:
365 tcp_port = self.tcp_port_out
366 udp_port = self.udp_port_out
367 icmp_id = self.icmp_id_out
369 tcp_port = self.tcp_port_in
370 udp_port = self.udp_port_in
371 icmp_id = self.icmp_id_in
374 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
375 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
376 TCP(dport=tcp_port, sport=20))
380 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
381 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
382 UDP(dport=udp_port, sport=20))
386 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
387 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
388 ICMP(id=icmp_id, type='echo-reply'))
393 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
395 Create packet stream for outside network
397 :param out_if: Outside interface
398 :param dst_ip: Destination IP address (Default use global NAT address)
399 :param hl: HL of generated packets
403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
404 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
405 TCP(dport=self.tcp_port_out, sport=20))
409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
410 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
411 UDP(dport=self.udp_port_out, sport=20))
415 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
416 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
417 ICMPv6EchoReply(id=self.icmp_id_out))
422 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
423 packet_num=3, dst_ip=None, is_ip6=False):
425 Verify captured packets on outside network
427 :param capture: Captured packets
428 :param nat_ip: Translated IP address (Default use global NAT address)
429 :param same_port: Sorce port number is not translated (Default False)
430 :param packet_num: Expected number of packets (Default 3)
431 :param dst_ip: Destination IP address (Default do not verify)
432 :param is_ip6: If L3 protocol is IPv6 (Default False)
436 ICMP46 = ICMPv6EchoRequest
441 nat_ip = self.nat_addr
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
478 packet_num=3, dst_ip=None):
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param packet_num: Expected number of packets (Default 3)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
491 def verify_capture_in(self, capture, in_if, packet_num=3):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
497 :param packet_num: Expected number of packets (Default 3)
499 self.assertEqual(packet_num, len(capture))
500 for packet in capture:
502 self.assert_packet_checksums_valid(packet)
503 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
504 if packet.haslayer(TCP):
505 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
506 elif packet.haslayer(UDP):
507 self.assertEqual(packet[UDP].dport, self.udp_port_in)
509 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511 self.logger.error(ppp("Unexpected or invalid packet "
512 "(inside network):", packet))
515 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
517 Verify captured IPv6 packets on inside network
519 :param capture: Captured packets
520 :param src_ip: Source IP
521 :param dst_ip: Destination IP address
522 :param packet_num: Expected number of packets (Default 3)
524 self.assertEqual(packet_num, len(capture))
525 for packet in capture:
527 self.assertEqual(packet[IPv6].src, src_ip)
528 self.assertEqual(packet[IPv6].dst, dst_ip)
529 self.assert_packet_checksums_valid(packet)
530 if packet.haslayer(TCP):
531 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
532 elif packet.haslayer(UDP):
533 self.assertEqual(packet[UDP].dport, self.udp_port_in)
535 self.assertEqual(packet[ICMPv6EchoReply].id,
538 self.logger.error(ppp("Unexpected or invalid packet "
539 "(inside network):", packet))
542 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
544 Verify captured packet that don't have to be translated
546 :param capture: Captured packets
547 :param ingress_if: Ingress interface
548 :param egress_if: Egress interface
550 for packet in capture:
552 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
553 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
554 if packet.haslayer(TCP):
555 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
556 elif packet.haslayer(UDP):
557 self.assertEqual(packet[UDP].sport, self.udp_port_in)
559 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
561 self.logger.error(ppp("Unexpected or invalid packet "
562 "(inside network):", packet))
565 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
566 packet_num=3, icmp_type=11):
568 Verify captured packets with ICMP errors on outside network
570 :param capture: Captured packets
571 :param src_ip: Translated IP address or IP address of VPP
572 (Default use global NAT address)
573 :param packet_num: Expected number of packets (Default 3)
574 :param icmp_type: Type of error ICMP packet
575 we are expecting (Default 11)
578 src_ip = self.nat_addr
579 self.assertEqual(packet_num, len(capture))
580 for packet in capture:
582 self.assertEqual(packet[IP].src, src_ip)
583 self.assertTrue(packet.haslayer(ICMP))
585 self.assertEqual(icmp.type, icmp_type)
586 self.assertTrue(icmp.haslayer(IPerror))
587 inner_ip = icmp[IPerror]
588 if inner_ip.haslayer(TCPerror):
589 self.assertEqual(inner_ip[TCPerror].dport,
591 elif inner_ip.haslayer(UDPerror):
592 self.assertEqual(inner_ip[UDPerror].dport,
595 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
597 self.logger.error(ppp("Unexpected or invalid packet "
598 "(outside network):", packet))
601 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
604 Verify captured packets with ICMP errors on inside network
606 :param capture: Captured packets
607 :param in_if: Inside interface
608 :param packet_num: Expected number of packets (Default 3)
609 :param icmp_type: Type of error ICMP packet
610 we are expecting (Default 11)
612 self.assertEqual(packet_num, len(capture))
613 for packet in capture:
615 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
616 self.assertTrue(packet.haslayer(ICMP))
618 self.assertEqual(icmp.type, icmp_type)
619 self.assertTrue(icmp.haslayer(IPerror))
620 inner_ip = icmp[IPerror]
621 if inner_ip.haslayer(TCPerror):
622 self.assertEqual(inner_ip[TCPerror].sport,
624 elif inner_ip.haslayer(UDPerror):
625 self.assertEqual(inner_ip[UDPerror].sport,
628 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
630 self.logger.error(ppp("Unexpected or invalid packet "
631 "(inside network):", packet))
634 def create_stream_frag(self, src_if, dst, sport, dport, data):
636 Create fragmented packet stream
638 :param src_if: Source interface
639 :param dst: Destination IPv4 address
640 :param sport: Source TCP port
641 :param dport: Destination TCP port
642 :param data: Payload data
645 id = random.randint(0, 65535)
646 p = (IP(src=src_if.remote_ip4, dst=dst) /
647 TCP(sport=sport, dport=dport) /
649 p = p.__class__(str(p))
650 chksum = p['TCP'].chksum
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
654 TCP(sport=sport, dport=dport, chksum=chksum) /
657 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
658 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
659 proto=IP_PROTOS.tcp) /
662 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
663 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
669 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
670 pref=None, plen=0, frag_size=128):
672 Create fragmented packet stream
674 :param src_if: Source interface
675 :param dst: Destination IPv4 address
676 :param sport: Source TCP port
677 :param dport: Destination TCP port
678 :param data: Payload data
679 :param pref: NAT64 prefix
680 :param plen: NAT64 prefix length
681 :param fragsize: size of fragments
685 dst_ip6 = ''.join(['64:ff9b::', dst])
687 dst_ip6 = self.compose_ip6(dst, pref, plen)
689 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
690 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
691 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
692 TCP(sport=sport, dport=dport) /
695 return fragment6(p, frag_size)
697 def reass_frags_and_verify(self, frags, src, dst):
699 Reassemble and verify fragmented packet
701 :param frags: Captured fragments
702 :param src: Source IPv4 address to verify
703 :param dst: Destination IPv4 address to verify
705 :returns: Reassembled IPv4 packet
707 buffer = StringIO.StringIO()
709 self.assertEqual(p[IP].src, src)
710 self.assertEqual(p[IP].dst, dst)
711 self.assert_ip_checksum_valid(p)
712 buffer.seek(p[IP].frag * 8)
713 buffer.write(p[IP].payload)
714 ip = frags[0].getlayer(IP)
715 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
716 proto=frags[0][IP].proto)
717 if ip.proto == IP_PROTOS.tcp:
718 p = (ip / TCP(buffer.getvalue()))
719 self.assert_tcp_checksum_valid(p)
720 elif ip.proto == IP_PROTOS.udp:
721 p = (ip / UDP(buffer.getvalue()))
724 def reass_frags_and_verify_ip6(self, frags, src, dst):
726 Reassemble and verify fragmented packet
728 :param frags: Captured fragments
729 :param src: Source IPv6 address to verify
730 :param dst: Destination IPv6 address to verify
732 :returns: Reassembled IPv6 packet
734 buffer = StringIO.StringIO()
736 self.assertEqual(p[IPv6].src, src)
737 self.assertEqual(p[IPv6].dst, dst)
738 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
739 buffer.write(p[IPv6ExtHdrFragment].payload)
740 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
741 nh=frags[0][IPv6ExtHdrFragment].nh)
742 if ip.nh == IP_PROTOS.tcp:
743 p = (ip / TCP(buffer.getvalue()))
744 elif ip.nh == IP_PROTOS.udp:
745 p = (ip / UDP(buffer.getvalue()))
746 self.assert_packet_checksums_valid(p)
749 def initiate_tcp_session(self, in_if, out_if):
751 Initiates TCP session
753 :param in_if: Inside interface
754 :param out_if: Outside interface
758 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
759 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
760 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
763 self.pg_enable_capture(self.pg_interfaces)
765 capture = out_if.get_capture(1)
767 self.tcp_port_out = p[TCP].sport
769 # SYN + ACK packet out->in
770 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
771 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
772 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
775 self.pg_enable_capture(self.pg_interfaces)
780 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
782 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
785 self.pg_enable_capture(self.pg_interfaces)
787 out_if.get_capture(1)
790 self.logger.error("TCP 3 way handshake failed")
793 def verify_ipfix_nat44_ses(self, data):
795 Verify IPFIX NAT44 session create/delete event
797 :param data: Decoded IPFIX data records
799 nat44_ses_create_num = 0
800 nat44_ses_delete_num = 0
801 self.assertEqual(6, len(data))
804 self.assertIn(ord(record[230]), [4, 5])
805 if ord(record[230]) == 4:
806 nat44_ses_create_num += 1
808 nat44_ses_delete_num += 1
810 self.assertEqual(self.pg0.remote_ip4n, record[8])
811 # postNATSourceIPv4Address
812 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
815 self.assertEqual(struct.pack("!I", 0), record[234])
816 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
817 if IP_PROTOS.icmp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
819 self.assertEqual(struct.pack("!H", self.icmp_id_out),
821 elif IP_PROTOS.tcp == ord(record[4]):
822 self.assertEqual(struct.pack("!H", self.tcp_port_in),
824 self.assertEqual(struct.pack("!H", self.tcp_port_out),
826 elif IP_PROTOS.udp == ord(record[4]):
827 self.assertEqual(struct.pack("!H", self.udp_port_in),
829 self.assertEqual(struct.pack("!H", self.udp_port_out),
832 self.fail("Invalid protocol")
833 self.assertEqual(3, nat44_ses_create_num)
834 self.assertEqual(3, nat44_ses_delete_num)
836 def verify_ipfix_addr_exhausted(self, data):
838 Verify IPFIX NAT addresses event
840 :param data: Decoded IPFIX data records
842 self.assertEqual(1, len(data))
845 self.assertEqual(ord(record[230]), 3)
847 self.assertEqual(struct.pack("!I", 0), record[283])
849 def verify_ipfix_max_sessions(self, data, limit):
851 Verify IPFIX maximum session entries exceeded event
853 :param data: Decoded IPFIX data records
854 :param limit: Number of maximum session entries that can be created.
856 self.assertEqual(1, len(data))
859 self.assertEqual(ord(record[230]), 13)
860 # natQuotaExceededEvent
861 self.assertEqual(struct.pack("I", 1), record[466])
863 self.assertEqual(struct.pack("I", limit), record[471])
865 def verify_ipfix_max_bibs(self, data, limit):
867 Verify IPFIX maximum BIB entries exceeded event
869 :param data: Decoded IPFIX data records
870 :param limit: Number of maximum BIB entries that can be created.
872 self.assertEqual(1, len(data))
875 self.assertEqual(ord(record[230]), 13)
876 # natQuotaExceededEvent
877 self.assertEqual(struct.pack("I", 2), record[466])
879 self.assertEqual(struct.pack("I", limit), record[472])
881 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
883 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
885 :param data: Decoded IPFIX data records
886 :param limit: Number of maximum fragments pending reassembly
887 :param src_addr: IPv6 source address
889 self.assertEqual(1, len(data))
892 self.assertEqual(ord(record[230]), 13)
893 # natQuotaExceededEvent
894 self.assertEqual(struct.pack("I", 5), record[466])
895 # maxFragmentsPendingReassembly
896 self.assertEqual(struct.pack("I", limit), record[475])
898 self.assertEqual(src_addr, record[27])
900 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
902 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
904 :param data: Decoded IPFIX data records
905 :param limit: Number of maximum fragments pending reassembly
906 :param src_addr: IPv4 source address
908 self.assertEqual(1, len(data))
911 self.assertEqual(ord(record[230]), 13)
912 # natQuotaExceededEvent
913 self.assertEqual(struct.pack("I", 5), record[466])
914 # maxFragmentsPendingReassembly
915 self.assertEqual(struct.pack("I", limit), record[475])
917 self.assertEqual(src_addr, record[8])
919 def verify_ipfix_bib(self, data, is_create, src_addr):
921 Verify IPFIX NAT64 BIB create and delete events
923 :param data: Decoded IPFIX data records
924 :param is_create: Create event if nonzero value otherwise delete event
925 :param src_addr: IPv6 source address
927 self.assertEqual(1, len(data))
931 self.assertEqual(ord(record[230]), 10)
933 self.assertEqual(ord(record[230]), 11)
935 self.assertEqual(src_addr, record[27])
936 # postNATSourceIPv4Address
937 self.assertEqual(self.nat_addr_n, record[225])
939 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
941 self.assertEqual(struct.pack("!I", 0), record[234])
942 # sourceTransportPort
943 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
944 # postNAPTSourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
947 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
950 Verify IPFIX NAT64 session create and delete events
952 :param data: Decoded IPFIX data records
953 :param is_create: Create event if nonzero value otherwise delete event
954 :param src_addr: IPv6 source address
955 :param dst_addr: IPv4 destination address
956 :param dst_port: destination TCP port
958 self.assertEqual(1, len(data))
962 self.assertEqual(ord(record[230]), 6)
964 self.assertEqual(ord(record[230]), 7)
966 self.assertEqual(src_addr, record[27])
967 # destinationIPv6Address
968 self.assertEqual(socket.inet_pton(socket.AF_INET6,
969 self.compose_ip6(dst_addr,
973 # postNATSourceIPv4Address
974 self.assertEqual(self.nat_addr_n, record[225])
975 # postNATDestinationIPv4Address
976 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
979 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
981 self.assertEqual(struct.pack("!I", 0), record[234])
982 # sourceTransportPort
983 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
984 # postNAPTSourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
986 # destinationTransportPort
987 self.assertEqual(struct.pack("!H", dst_port), record[11])
988 # postNAPTDestinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[228])
992 class TestNAT44(MethodHolder):
993 """ NAT44 Test Cases """
997 super(TestNAT44, cls).setUpClass()
998 cls.vapi.cli("set log class nat level debug")
1001 cls.tcp_port_in = 6303
1002 cls.tcp_port_out = 6303
1003 cls.udp_port_in = 6304
1004 cls.udp_port_out = 6304
1005 cls.icmp_id_in = 6305
1006 cls.icmp_id_out = 6305
1007 cls.nat_addr = '10.0.0.3'
1008 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1009 cls.ipfix_src_port = 4739
1010 cls.ipfix_domain_id = 1
1011 cls.tcp_external_port = 80
1013 cls.create_pg_interfaces(range(10))
1014 cls.interfaces = list(cls.pg_interfaces[0:4])
1016 for i in cls.interfaces:
1021 cls.pg0.generate_remote_hosts(3)
1022 cls.pg0.configure_ipv4_neighbors()
1024 cls.pg1.generate_remote_hosts(1)
1025 cls.pg1.configure_ipv4_neighbors()
1027 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1028 cls.vapi.ip_table_add_del(10, is_add=1)
1029 cls.vapi.ip_table_add_del(20, is_add=1)
1031 cls.pg4._local_ip4 = "172.16.255.1"
1032 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1033 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1034 cls.pg4.set_table_ip4(10)
1035 cls.pg5._local_ip4 = "172.17.255.3"
1036 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1037 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1038 cls.pg5.set_table_ip4(10)
1039 cls.pg6._local_ip4 = "172.16.255.1"
1040 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1041 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1042 cls.pg6.set_table_ip4(20)
1043 for i in cls.overlapping_interfaces:
1051 cls.pg9.generate_remote_hosts(2)
1052 cls.pg9.config_ip4()
1053 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1054 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1058 cls.pg9.resolve_arp()
1059 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1060 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1061 cls.pg9.resolve_arp()
1064 super(TestNAT44, cls).tearDownClass()
1067 def test_dynamic(self):
1068 """ NAT44 dynamic translation test """
1070 self.nat44_add_address(self.nat_addr)
1071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1076 pkts = self.create_stream_in(self.pg0, self.pg1)
1077 self.pg0.add_stream(pkts)
1078 self.pg_enable_capture(self.pg_interfaces)
1080 capture = self.pg1.get_capture(len(pkts))
1081 self.verify_capture_out(capture)
1084 pkts = self.create_stream_out(self.pg1)
1085 self.pg1.add_stream(pkts)
1086 self.pg_enable_capture(self.pg_interfaces)
1088 capture = self.pg0.get_capture(len(pkts))
1089 self.verify_capture_in(capture, self.pg0)
1091 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1092 """ NAT44 handling of client packets with TTL=1 """
1094 self.nat44_add_address(self.nat_addr)
1095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1099 # Client side - generate traffic
1100 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1101 self.pg0.add_stream(pkts)
1102 self.pg_enable_capture(self.pg_interfaces)
1105 # Client side - verify ICMP type 11 packets
1106 capture = self.pg0.get_capture(len(pkts))
1107 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1109 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1110 """ NAT44 handling of server packets with TTL=1 """
1112 self.nat44_add_address(self.nat_addr)
1113 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1114 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1117 # Client side - create sessions
1118 pkts = self.create_stream_in(self.pg0, self.pg1)
1119 self.pg0.add_stream(pkts)
1120 self.pg_enable_capture(self.pg_interfaces)
1123 # Server side - generate traffic
1124 capture = self.pg1.get_capture(len(pkts))
1125 self.verify_capture_out(capture)
1126 pkts = self.create_stream_out(self.pg1, ttl=1)
1127 self.pg1.add_stream(pkts)
1128 self.pg_enable_capture(self.pg_interfaces)
1131 # Server side - verify ICMP type 11 packets
1132 capture = self.pg1.get_capture(len(pkts))
1133 self.verify_capture_out_with_icmp_errors(capture,
1134 src_ip=self.pg1.local_ip4)
1136 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1137 """ NAT44 handling of error responses to client packets with TTL=2 """
1139 self.nat44_add_address(self.nat_addr)
1140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1144 # Client side - generate traffic
1145 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1146 self.pg0.add_stream(pkts)
1147 self.pg_enable_capture(self.pg_interfaces)
1150 # Server side - simulate ICMP type 11 response
1151 capture = self.pg1.get_capture(len(pkts))
1152 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1153 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1154 ICMP(type=11) / packet[IP] for packet in capture]
1155 self.pg1.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1159 # Client side - verify ICMP type 11 packets
1160 capture = self.pg0.get_capture(len(pkts))
1161 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1164 """ NAT44 handling of error responses to server packets with TTL=2 """
1166 self.nat44_add_address(self.nat_addr)
1167 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1171 # Client side - create sessions
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1177 # Server side - generate traffic
1178 capture = self.pg1.get_capture(len(pkts))
1179 self.verify_capture_out(capture)
1180 pkts = self.create_stream_out(self.pg1, ttl=2)
1181 self.pg1.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1185 # Client side - simulate ICMP type 11 response
1186 capture = self.pg0.get_capture(len(pkts))
1187 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1188 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1189 ICMP(type=11) / packet[IP] for packet in capture]
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1194 # Server side - verify ICMP type 11 packets
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_out_with_icmp_errors(capture)
1198 def test_ping_out_interface_from_outside(self):
1199 """ Ping NAT44 out interface from outside network """
1201 self.nat44_add_address(self.nat_addr)
1202 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1203 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1206 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1207 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1208 ICMP(id=self.icmp_id_out, type='echo-request'))
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1213 capture = self.pg1.get_capture(len(pkts))
1214 self.assertEqual(1, len(capture))
1217 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1218 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1219 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1220 self.assertEqual(packet[ICMP].type, 0) # echo reply
1222 self.logger.error(ppp("Unexpected or invalid packet "
1223 "(outside network):", packet))
1226 def test_ping_internal_host_from_outside(self):
1227 """ Ping internal host from outside network """
1229 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1230 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1231 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1235 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1236 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1237 ICMP(id=self.icmp_id_out, type='echo-request'))
1238 self.pg1.add_stream(pkt)
1239 self.pg_enable_capture(self.pg_interfaces)
1241 capture = self.pg0.get_capture(1)
1242 self.verify_capture_in(capture, self.pg0, packet_num=1)
1243 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1246 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1247 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1248 ICMP(id=self.icmp_id_in, type='echo-reply'))
1249 self.pg0.add_stream(pkt)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg1.get_capture(1)
1253 self.verify_capture_out(capture, same_port=True, packet_num=1)
1254 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1256 def _test_forwarding(self):
1257 """ NAT44 forwarding test """
1259 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1260 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1262 self.vapi.nat44_forwarding_enable_disable(1)
1264 real_ip = self.pg0.remote_ip4n
1265 alias_ip = self.nat_addr_n
1266 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1267 external_ip=alias_ip)
1270 # in2out - static mapping match
1272 pkts = self.create_stream_out(self.pg1)
1273 self.pg1.add_stream(pkts)
1274 self.pg_enable_capture(self.pg_interfaces)
1276 capture = self.pg0.get_capture(len(pkts))
1277 self.verify_capture_in(capture, self.pg0)
1279 pkts = self.create_stream_in(self.pg0, self.pg1)
1280 self.pg0.add_stream(pkts)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 capture = self.pg1.get_capture(len(pkts))
1284 self.verify_capture_out(capture, same_port=True)
1286 # in2out - no static mapping match
1288 host0 = self.pg0.remote_hosts[0]
1289 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1291 pkts = self.create_stream_out(self.pg1,
1292 dst_ip=self.pg0.remote_ip4,
1293 use_inside_ports=True)
1294 self.pg1.add_stream(pkts)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg0.get_capture(len(pkts))
1298 self.verify_capture_in(capture, self.pg0)
1300 pkts = self.create_stream_in(self.pg0, self.pg1)
1301 self.pg0.add_stream(pkts)
1302 self.pg_enable_capture(self.pg_interfaces)
1304 capture = self.pg1.get_capture(len(pkts))
1305 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1308 self.pg0.remote_hosts[0] = host0
1310 user = self.pg0.remote_hosts[1]
1311 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1312 self.assertEqual(len(sessions), 3)
1313 self.assertTrue(sessions[0].ext_host_valid)
1314 self.vapi.nat44_del_session(
1315 sessions[0].inside_ip_address,
1316 sessions[0].inside_port,
1317 sessions[0].protocol,
1318 ext_host_address=sessions[0].ext_host_address,
1319 ext_host_port=sessions[0].ext_host_port)
1320 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1321 self.assertEqual(len(sessions), 2)
1324 self.vapi.nat44_forwarding_enable_disable(0)
1325 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1326 external_ip=alias_ip,
1329 def test_static_in(self):
1330 """ 1:1 NAT initialized from inside network """
1332 nat_ip = "10.0.0.10"
1333 self.tcp_port_out = 6303
1334 self.udp_port_out = 6304
1335 self.icmp_id_out = 6305
1337 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1338 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1339 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1341 sm = self.vapi.nat44_static_mapping_dump()
1342 self.assertEqual(len(sm), 1)
1343 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1344 self.assertEqual(sm[0].protocol, 0)
1345 self.assertEqual(sm[0].local_port, 0)
1346 self.assertEqual(sm[0].external_port, 0)
1349 pkts = self.create_stream_in(self.pg0, self.pg1)
1350 self.pg0.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg1.get_capture(len(pkts))
1354 self.verify_capture_out(capture, nat_ip, True)
1357 pkts = self.create_stream_out(self.pg1, nat_ip)
1358 self.pg1.add_stream(pkts)
1359 self.pg_enable_capture(self.pg_interfaces)
1361 capture = self.pg0.get_capture(len(pkts))
1362 self.verify_capture_in(capture, self.pg0)
1364 def test_static_out(self):
1365 """ 1:1 NAT initialized from outside network """
1367 nat_ip = "10.0.0.20"
1368 self.tcp_port_out = 6303
1369 self.udp_port_out = 6304
1370 self.icmp_id_out = 6305
1373 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1374 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1375 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1377 sm = self.vapi.nat44_static_mapping_dump()
1378 self.assertEqual(len(sm), 1)
1379 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1382 pkts = self.create_stream_out(self.pg1, nat_ip)
1383 self.pg1.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg0.get_capture(len(pkts))
1387 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, nat_ip, True)
1397 def test_static_with_port_in(self):
1398 """ 1:1 NAPT initialized from inside network """
1400 self.tcp_port_out = 3606
1401 self.udp_port_out = 3607
1402 self.icmp_id_out = 3608
1404 self.nat44_add_address(self.nat_addr)
1405 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1406 self.tcp_port_in, self.tcp_port_out,
1407 proto=IP_PROTOS.tcp)
1408 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1409 self.udp_port_in, self.udp_port_out,
1410 proto=IP_PROTOS.udp)
1411 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1412 self.icmp_id_in, self.icmp_id_out,
1413 proto=IP_PROTOS.icmp)
1414 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1415 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 pkts = self.create_stream_in(self.pg0, self.pg1)
1420 self.pg0.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg1.get_capture(len(pkts))
1424 self.verify_capture_out(capture)
1427 pkts = self.create_stream_out(self.pg1)
1428 self.pg1.add_stream(pkts)
1429 self.pg_enable_capture(self.pg_interfaces)
1431 capture = self.pg0.get_capture(len(pkts))
1432 self.verify_capture_in(capture, self.pg0)
1434 def test_static_with_port_out(self):
1435 """ 1:1 NAPT initialized from outside network """
1437 self.tcp_port_out = 30606
1438 self.udp_port_out = 30607
1439 self.icmp_id_out = 30608
1441 self.nat44_add_address(self.nat_addr)
1442 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1443 self.tcp_port_in, self.tcp_port_out,
1444 proto=IP_PROTOS.tcp)
1445 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1446 self.udp_port_in, self.udp_port_out,
1447 proto=IP_PROTOS.udp)
1448 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1449 self.icmp_id_in, self.icmp_id_out,
1450 proto=IP_PROTOS.icmp)
1451 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1452 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1456 pkts = self.create_stream_out(self.pg1)
1457 self.pg1.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg0.get_capture(len(pkts))
1461 self.verify_capture_in(capture, self.pg0)
1464 pkts = self.create_stream_in(self.pg0, self.pg1)
1465 self.pg0.add_stream(pkts)
1466 self.pg_enable_capture(self.pg_interfaces)
1468 capture = self.pg1.get_capture(len(pkts))
1469 self.verify_capture_out(capture)
1471 def test_static_vrf_aware(self):
1472 """ 1:1 NAT VRF awareness """
1474 nat_ip1 = "10.0.0.30"
1475 nat_ip2 = "10.0.0.40"
1476 self.tcp_port_out = 6303
1477 self.udp_port_out = 6304
1478 self.icmp_id_out = 6305
1480 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1482 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1484 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1486 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1487 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1489 # inside interface VRF match NAT44 static mapping VRF
1490 pkts = self.create_stream_in(self.pg4, self.pg3)
1491 self.pg4.add_stream(pkts)
1492 self.pg_enable_capture(self.pg_interfaces)
1494 capture = self.pg3.get_capture(len(pkts))
1495 self.verify_capture_out(capture, nat_ip1, True)
1497 # inside interface VRF don't match NAT44 static mapping VRF (packets
1499 pkts = self.create_stream_in(self.pg0, self.pg3)
1500 self.pg0.add_stream(pkts)
1501 self.pg_enable_capture(self.pg_interfaces)
1503 self.pg3.assert_nothing_captured()
1505 def test_dynamic_to_static(self):
1506 """ Switch from dynamic translation to 1:1NAT """
1507 nat_ip = "10.0.0.10"
1508 self.tcp_port_out = 6303
1509 self.udp_port_out = 6304
1510 self.icmp_id_out = 6305
1512 self.nat44_add_address(self.nat_addr)
1513 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1514 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1518 pkts = self.create_stream_in(self.pg0, self.pg1)
1519 self.pg0.add_stream(pkts)
1520 self.pg_enable_capture(self.pg_interfaces)
1522 capture = self.pg1.get_capture(len(pkts))
1523 self.verify_capture_out(capture)
1526 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1527 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1528 self.assertEqual(len(sessions), 0)
1529 pkts = self.create_stream_in(self.pg0, self.pg1)
1530 self.pg0.add_stream(pkts)
1531 self.pg_enable_capture(self.pg_interfaces)
1533 capture = self.pg1.get_capture(len(pkts))
1534 self.verify_capture_out(capture, nat_ip, True)
1536 def test_identity_nat(self):
1537 """ Identity NAT """
1539 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1544 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1545 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1546 TCP(sport=12345, dport=56789))
1547 self.pg1.add_stream(p)
1548 self.pg_enable_capture(self.pg_interfaces)
1550 capture = self.pg0.get_capture(1)
1555 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1556 self.assertEqual(ip.src, self.pg1.remote_ip4)
1557 self.assertEqual(tcp.dport, 56789)
1558 self.assertEqual(tcp.sport, 12345)
1559 self.assert_packet_checksums_valid(p)
1561 self.logger.error(ppp("Unexpected or invalid packet:", p))
1564 def test_multiple_inside_interfaces(self):
1565 """ NAT44 multiple non-overlapping address space inside interfaces """
1567 self.nat44_add_address(self.nat_addr)
1568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1569 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1570 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1573 # between two NAT44 inside interfaces (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg1)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg1.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1581 # from NAT44 inside to interface without NAT44 feature (no translation)
1582 pkts = self.create_stream_in(self.pg0, self.pg2)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg2.get_capture(len(pkts))
1587 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1589 # in2out 1st interface
1590 pkts = self.create_stream_in(self.pg0, self.pg3)
1591 self.pg0.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg3.get_capture(len(pkts))
1595 self.verify_capture_out(capture)
1597 # out2in 1st interface
1598 pkts = self.create_stream_out(self.pg3)
1599 self.pg3.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg0.get_capture(len(pkts))
1603 self.verify_capture_in(capture, self.pg0)
1605 # in2out 2nd interface
1606 pkts = self.create_stream_in(self.pg1, self.pg3)
1607 self.pg1.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg3.get_capture(len(pkts))
1611 self.verify_capture_out(capture)
1613 # out2in 2nd interface
1614 pkts = self.create_stream_out(self.pg3)
1615 self.pg3.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg1.get_capture(len(pkts))
1619 self.verify_capture_in(capture, self.pg1)
1621 def test_inside_overlapping_interfaces(self):
1622 """ NAT44 multiple inside interfaces with overlapping address space """
1624 static_nat_ip = "10.0.0.10"
1625 self.nat44_add_address(self.nat_addr)
1626 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1628 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1629 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1630 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1631 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1634 # between NAT44 inside interfaces with same VRF (no translation)
1635 pkts = self.create_stream_in(self.pg4, self.pg5)
1636 self.pg4.add_stream(pkts)
1637 self.pg_enable_capture(self.pg_interfaces)
1639 capture = self.pg5.get_capture(len(pkts))
1640 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1642 # between NAT44 inside interfaces with different VRF (hairpinning)
1643 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1644 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1645 TCP(sport=1234, dport=5678))
1646 self.pg4.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg6.get_capture(1)
1654 self.assertEqual(ip.src, self.nat_addr)
1655 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1656 self.assertNotEqual(tcp.sport, 1234)
1657 self.assertEqual(tcp.dport, 5678)
1659 self.logger.error(ppp("Unexpected or invalid packet:", p))
1662 # in2out 1st interface
1663 pkts = self.create_stream_in(self.pg4, self.pg3)
1664 self.pg4.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg3.get_capture(len(pkts))
1668 self.verify_capture_out(capture)
1670 # out2in 1st interface
1671 pkts = self.create_stream_out(self.pg3)
1672 self.pg3.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg4.get_capture(len(pkts))
1676 self.verify_capture_in(capture, self.pg4)
1678 # in2out 2nd interface
1679 pkts = self.create_stream_in(self.pg5, self.pg3)
1680 self.pg5.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg3.get_capture(len(pkts))
1684 self.verify_capture_out(capture)
1686 # out2in 2nd interface
1687 pkts = self.create_stream_out(self.pg3)
1688 self.pg3.add_stream(pkts)
1689 self.pg_enable_capture(self.pg_interfaces)
1691 capture = self.pg5.get_capture(len(pkts))
1692 self.verify_capture_in(capture, self.pg5)
1695 addresses = self.vapi.nat44_address_dump()
1696 self.assertEqual(len(addresses), 1)
1697 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1698 self.assertEqual(len(sessions), 3)
1699 for session in sessions:
1700 self.assertFalse(session.is_static)
1701 self.assertEqual(session.inside_ip_address[0:4],
1702 self.pg5.remote_ip4n)
1703 self.assertEqual(session.outside_ip_address,
1704 addresses[0].ip_address)
1705 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1706 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1707 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1708 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1709 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1710 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1711 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1712 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1713 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1715 # in2out 3rd interface
1716 pkts = self.create_stream_in(self.pg6, self.pg3)
1717 self.pg6.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg3.get_capture(len(pkts))
1721 self.verify_capture_out(capture, static_nat_ip, True)
1723 # out2in 3rd interface
1724 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1725 self.pg3.add_stream(pkts)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg6.get_capture(len(pkts))
1729 self.verify_capture_in(capture, self.pg6)
1731 # general user and session dump verifications
1732 users = self.vapi.nat44_user_dump()
1733 self.assertTrue(len(users) >= 3)
1734 addresses = self.vapi.nat44_address_dump()
1735 self.assertEqual(len(addresses), 1)
1737 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1739 for session in sessions:
1740 self.assertEqual(user.ip_address, session.inside_ip_address)
1741 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1742 self.assertTrue(session.protocol in
1743 [IP_PROTOS.tcp, IP_PROTOS.udp,
1745 self.assertFalse(session.ext_host_valid)
1748 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1749 self.assertTrue(len(sessions) >= 4)
1750 for session in sessions:
1751 self.assertFalse(session.is_static)
1752 self.assertEqual(session.inside_ip_address[0:4],
1753 self.pg4.remote_ip4n)
1754 self.assertEqual(session.outside_ip_address,
1755 addresses[0].ip_address)
1758 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1759 self.assertTrue(len(sessions) >= 3)
1760 for session in sessions:
1761 self.assertTrue(session.is_static)
1762 self.assertEqual(session.inside_ip_address[0:4],
1763 self.pg6.remote_ip4n)
1764 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1765 map(int, static_nat_ip.split('.')))
1766 self.assertTrue(session.inside_port in
1767 [self.tcp_port_in, self.udp_port_in,
1770 def test_hairpinning(self):
1771 """ NAT44 hairpinning - 1:1 NAPT """
1773 host = self.pg0.remote_hosts[0]
1774 server = self.pg0.remote_hosts[1]
1777 server_in_port = 5678
1778 server_out_port = 8765
1780 self.nat44_add_address(self.nat_addr)
1781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1784 # add static mapping for server
1785 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1786 server_in_port, server_out_port,
1787 proto=IP_PROTOS.tcp)
1789 # send packet from host to server
1790 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1791 IP(src=host.ip4, dst=self.nat_addr) /
1792 TCP(sport=host_in_port, dport=server_out_port))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg0.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(ip.dst, server.ip4)
1803 self.assertNotEqual(tcp.sport, host_in_port)
1804 self.assertEqual(tcp.dport, server_in_port)
1805 self.assert_packet_checksums_valid(p)
1806 host_out_port = tcp.sport
1808 self.logger.error(ppp("Unexpected or invalid packet:", p))
1811 # send reply from server to host
1812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813 IP(src=server.ip4, dst=self.nat_addr) /
1814 TCP(sport=server_in_port, dport=host_out_port))
1815 self.pg0.add_stream(p)
1816 self.pg_enable_capture(self.pg_interfaces)
1818 capture = self.pg0.get_capture(1)
1823 self.assertEqual(ip.src, self.nat_addr)
1824 self.assertEqual(ip.dst, host.ip4)
1825 self.assertEqual(tcp.sport, server_out_port)
1826 self.assertEqual(tcp.dport, host_in_port)
1827 self.assert_packet_checksums_valid(p)
1829 self.logger.error(ppp("Unexpected or invalid packet:", p))
1832 def test_hairpinning2(self):
1833 """ NAT44 hairpinning - 1:1 NAT"""
1835 server1_nat_ip = "10.0.0.10"
1836 server2_nat_ip = "10.0.0.11"
1837 host = self.pg0.remote_hosts[0]
1838 server1 = self.pg0.remote_hosts[1]
1839 server2 = self.pg0.remote_hosts[2]
1840 server_tcp_port = 22
1841 server_udp_port = 20
1843 self.nat44_add_address(self.nat_addr)
1844 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1845 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1848 # add static mapping for servers
1849 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1850 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1858 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1859 IP(src=host.ip4, dst=server1_nat_ip) /
1860 UDP(sport=self.udp_port_in, dport=server_udp_port))
1862 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1863 IP(src=host.ip4, dst=server1_nat_ip) /
1864 ICMP(id=self.icmp_id_in, type='echo-request'))
1866 self.pg0.add_stream(pkts)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 capture = self.pg0.get_capture(len(pkts))
1870 for packet in capture:
1872 self.assertEqual(packet[IP].src, self.nat_addr)
1873 self.assertEqual(packet[IP].dst, server1.ip4)
1874 if packet.haslayer(TCP):
1875 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1876 self.assertEqual(packet[TCP].dport, server_tcp_port)
1877 self.tcp_port_out = packet[TCP].sport
1878 self.assert_packet_checksums_valid(packet)
1879 elif packet.haslayer(UDP):
1880 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1881 self.assertEqual(packet[UDP].dport, server_udp_port)
1882 self.udp_port_out = packet[UDP].sport
1884 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1885 self.icmp_id_out = packet[ICMP].id
1887 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1896 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1897 IP(src=server1.ip4, dst=self.nat_addr) /
1898 UDP(sport=server_udp_port, dport=self.udp_port_out))
1900 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1901 IP(src=server1.ip4, dst=self.nat_addr) /
1902 ICMP(id=self.icmp_id_out, type='echo-reply'))
1904 self.pg0.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg0.get_capture(len(pkts))
1908 for packet in capture:
1910 self.assertEqual(packet[IP].src, server1_nat_ip)
1911 self.assertEqual(packet[IP].dst, host.ip4)
1912 if packet.haslayer(TCP):
1913 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1914 self.assertEqual(packet[TCP].sport, server_tcp_port)
1915 self.assert_packet_checksums_valid(packet)
1916 elif packet.haslayer(UDP):
1917 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1918 self.assertEqual(packet[UDP].sport, server_udp_port)
1920 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1922 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1925 # server2 to server1
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1931 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1932 IP(src=server2.ip4, dst=server1_nat_ip) /
1933 UDP(sport=self.udp_port_in, dport=server_udp_port))
1935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1936 IP(src=server2.ip4, dst=server1_nat_ip) /
1937 ICMP(id=self.icmp_id_in, type='echo-request'))
1939 self.pg0.add_stream(pkts)
1940 self.pg_enable_capture(self.pg_interfaces)
1942 capture = self.pg0.get_capture(len(pkts))
1943 for packet in capture:
1945 self.assertEqual(packet[IP].src, server2_nat_ip)
1946 self.assertEqual(packet[IP].dst, server1.ip4)
1947 if packet.haslayer(TCP):
1948 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1949 self.assertEqual(packet[TCP].dport, server_tcp_port)
1950 self.tcp_port_out = packet[TCP].sport
1951 self.assert_packet_checksums_valid(packet)
1952 elif packet.haslayer(UDP):
1953 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1954 self.assertEqual(packet[UDP].dport, server_udp_port)
1955 self.udp_port_out = packet[UDP].sport
1957 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1958 self.icmp_id_out = packet[ICMP].id
1960 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1963 # server1 to server2
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1969 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1970 IP(src=server1.ip4, dst=server2_nat_ip) /
1971 UDP(sport=server_udp_port, dport=self.udp_port_out))
1973 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1974 IP(src=server1.ip4, dst=server2_nat_ip) /
1975 ICMP(id=self.icmp_id_out, type='echo-reply'))
1977 self.pg0.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 for packet in capture:
1983 self.assertEqual(packet[IP].src, server1_nat_ip)
1984 self.assertEqual(packet[IP].dst, server2.ip4)
1985 if packet.haslayer(TCP):
1986 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1987 self.assertEqual(packet[TCP].sport, server_tcp_port)
1988 self.assert_packet_checksums_valid(packet)
1989 elif packet.haslayer(UDP):
1990 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1991 self.assertEqual(packet[UDP].sport, server_udp_port)
1993 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1995 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1998 def test_max_translations_per_user(self):
1999 """ MAX translations per user - recycle the least recently used """
2001 self.nat44_add_address(self.nat_addr)
2002 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2003 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2006 # get maximum number of translations per user
2007 nat44_config = self.vapi.nat_show_config()
2009 # send more than maximum number of translations per user packets
2010 pkts_num = nat44_config.max_translations_per_user + 5
2012 for port in range(0, pkts_num):
2013 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2014 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2015 TCP(sport=1025 + port))
2017 self.pg0.add_stream(pkts)
2018 self.pg_enable_capture(self.pg_interfaces)
2021 # verify number of translated packet
2022 self.pg1.get_capture(pkts_num)
2024 users = self.vapi.nat44_user_dump()
2026 if user.ip_address == self.pg0.remote_ip4n:
2027 self.assertEqual(user.nsessions,
2028 nat44_config.max_translations_per_user)
2029 self.assertEqual(user.nstaticsessions, 0)
2032 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2034 proto=IP_PROTOS.tcp)
2035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2036 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2037 TCP(sport=tcp_port))
2038 self.pg0.add_stream(p)
2039 self.pg_enable_capture(self.pg_interfaces)
2041 self.pg1.get_capture(1)
2042 users = self.vapi.nat44_user_dump()
2044 if user.ip_address == self.pg0.remote_ip4n:
2045 self.assertEqual(user.nsessions,
2046 nat44_config.max_translations_per_user - 1)
2047 self.assertEqual(user.nstaticsessions, 1)
2049 def test_interface_addr(self):
2050 """ Acquire NAT44 addresses from interface """
2051 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2053 # no address in NAT pool
2054 adresses = self.vapi.nat44_address_dump()
2055 self.assertEqual(0, len(adresses))
2057 # configure interface address and check NAT address pool
2058 self.pg7.config_ip4()
2059 adresses = self.vapi.nat44_address_dump()
2060 self.assertEqual(1, len(adresses))
2061 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2063 # remove interface address and check NAT address pool
2064 self.pg7.unconfig_ip4()
2065 adresses = self.vapi.nat44_address_dump()
2066 self.assertEqual(0, len(adresses))
2068 def test_interface_addr_static_mapping(self):
2069 """ Static mapping with addresses from interface """
2072 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2073 self.nat44_add_static_mapping(
2075 external_sw_if_index=self.pg7.sw_if_index,
2078 # static mappings with external interface
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(1, len(static_mappings))
2081 self.assertEqual(self.pg7.sw_if_index,
2082 static_mappings[0].external_sw_if_index)
2083 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2085 # configure interface address and check static mappings
2086 self.pg7.config_ip4()
2087 static_mappings = self.vapi.nat44_static_mapping_dump()
2088 self.assertEqual(2, len(static_mappings))
2090 for sm in static_mappings:
2091 if sm.external_sw_if_index == 0xFFFFFFFF:
2092 self.assertEqual(sm.external_ip_address[0:4],
2093 self.pg7.local_ip4n)
2094 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2096 self.assertTrue(resolved)
2098 # remove interface address and check static mappings
2099 self.pg7.unconfig_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(1, len(static_mappings))
2102 self.assertEqual(self.pg7.sw_if_index,
2103 static_mappings[0].external_sw_if_index)
2104 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2106 # configure interface address again and check static mappings
2107 self.pg7.config_ip4()
2108 static_mappings = self.vapi.nat44_static_mapping_dump()
2109 self.assertEqual(2, len(static_mappings))
2111 for sm in static_mappings:
2112 if sm.external_sw_if_index == 0xFFFFFFFF:
2113 self.assertEqual(sm.external_ip_address[0:4],
2114 self.pg7.local_ip4n)
2115 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2117 self.assertTrue(resolved)
2119 # remove static mapping
2120 self.nat44_add_static_mapping(
2122 external_sw_if_index=self.pg7.sw_if_index,
2125 static_mappings = self.vapi.nat44_static_mapping_dump()
2126 self.assertEqual(0, len(static_mappings))
2128 def test_interface_addr_identity_nat(self):
2129 """ Identity NAT with addresses from interface """
2132 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2133 self.vapi.nat44_add_del_identity_mapping(
2134 sw_if_index=self.pg7.sw_if_index,
2136 protocol=IP_PROTOS.tcp,
2139 # identity mappings with external interface
2140 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(1, len(identity_mappings))
2142 self.assertEqual(self.pg7.sw_if_index,
2143 identity_mappings[0].sw_if_index)
2145 # configure interface address and check identity mappings
2146 self.pg7.config_ip4()
2147 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149 self.assertEqual(2, len(identity_mappings))
2150 for sm in identity_mappings:
2151 if sm.sw_if_index == 0xFFFFFFFF:
2152 self.assertEqual(identity_mappings[0].ip_address,
2153 self.pg7.local_ip4n)
2154 self.assertEqual(port, identity_mappings[0].port)
2155 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2157 self.assertTrue(resolved)
2159 # remove interface address and check identity mappings
2160 self.pg7.unconfig_ip4()
2161 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2162 self.assertEqual(1, len(identity_mappings))
2163 self.assertEqual(self.pg7.sw_if_index,
2164 identity_mappings[0].sw_if_index)
2166 def test_ipfix_nat44_sess(self):
2167 """ IPFIX logging NAT44 session created/delted """
2168 self.ipfix_domain_id = 10
2169 self.ipfix_src_port = 20202
2170 colector_port = 30303
2171 bind_layers(UDP, IPFIX, dport=30303)
2172 self.nat44_add_address(self.nat_addr)
2173 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2174 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2176 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2177 src_address=self.pg3.local_ip4n,
2179 template_interval=10,
2180 collector_port=colector_port)
2181 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2182 src_port=self.ipfix_src_port)
2184 pkts = self.create_stream_in(self.pg0, self.pg1)
2185 self.pg0.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg1.get_capture(len(pkts))
2189 self.verify_capture_out(capture)
2190 self.nat44_add_address(self.nat_addr, is_add=0)
2191 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2192 capture = self.pg3.get_capture(9)
2193 ipfix = IPFIXDecoder()
2194 # first load template
2196 self.assertTrue(p.haslayer(IPFIX))
2197 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2198 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2199 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2200 self.assertEqual(p[UDP].dport, colector_port)
2201 self.assertEqual(p[IPFIX].observationDomainID,
2202 self.ipfix_domain_id)
2203 if p.haslayer(Template):
2204 ipfix.add_template(p.getlayer(Template))
2205 # verify events in data set
2207 if p.haslayer(Data):
2208 data = ipfix.decode_data_set(p.getlayer(Set))
2209 self.verify_ipfix_nat44_ses(data)
2211 def test_ipfix_addr_exhausted(self):
2212 """ IPFIX logging NAT addresses exhausted """
2213 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2214 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2216 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2217 src_address=self.pg3.local_ip4n,
2219 template_interval=10)
2220 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2221 src_port=self.ipfix_src_port)
2223 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2224 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2226 self.pg0.add_stream(p)
2227 self.pg_enable_capture(self.pg_interfaces)
2229 self.pg1.assert_nothing_captured()
2231 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2232 capture = self.pg3.get_capture(9)
2233 ipfix = IPFIXDecoder()
2234 # first load template
2236 self.assertTrue(p.haslayer(IPFIX))
2237 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2238 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2239 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2240 self.assertEqual(p[UDP].dport, 4739)
2241 self.assertEqual(p[IPFIX].observationDomainID,
2242 self.ipfix_domain_id)
2243 if p.haslayer(Template):
2244 ipfix.add_template(p.getlayer(Template))
2245 # verify events in data set
2247 if p.haslayer(Data):
2248 data = ipfix.decode_data_set(p.getlayer(Set))
2249 self.verify_ipfix_addr_exhausted(data)
2251 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2252 def test_ipfix_max_sessions(self):
2253 """ IPFIX logging maximum session entries exceeded """
2254 self.nat44_add_address(self.nat_addr)
2255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2259 nat44_config = self.vapi.nat_show_config()
2260 max_sessions = 10 * nat44_config.translation_buckets
2263 for i in range(0, max_sessions):
2264 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=src, dst=self.pg1.remote_ip4) /
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2273 self.pg1.get_capture(max_sessions)
2274 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2275 src_address=self.pg3.local_ip4n,
2277 template_interval=10)
2278 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2279 src_port=self.ipfix_src_port)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2284 self.pg0.add_stream(p)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 self.pg1.assert_nothing_captured()
2289 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2290 capture = self.pg3.get_capture(9)
2291 ipfix = IPFIXDecoder()
2292 # first load template
2294 self.assertTrue(p.haslayer(IPFIX))
2295 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2296 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2297 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2298 self.assertEqual(p[UDP].dport, 4739)
2299 self.assertEqual(p[IPFIX].observationDomainID,
2300 self.ipfix_domain_id)
2301 if p.haslayer(Template):
2302 ipfix.add_template(p.getlayer(Template))
2303 # verify events in data set
2305 if p.haslayer(Data):
2306 data = ipfix.decode_data_set(p.getlayer(Set))
2307 self.verify_ipfix_max_sessions(data, max_sessions)
2309 def test_pool_addr_fib(self):
2310 """ NAT44 add pool addresses to FIB """
2311 static_addr = '10.0.0.10'
2312 self.nat44_add_address(self.nat_addr)
2313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2316 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2319 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2320 ARP(op=ARP.who_has, pdst=self.nat_addr,
2321 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2322 self.pg1.add_stream(p)
2323 self.pg_enable_capture(self.pg_interfaces)
2325 capture = self.pg1.get_capture(1)
2326 self.assertTrue(capture[0].haslayer(ARP))
2327 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2330 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2331 ARP(op=ARP.who_has, pdst=static_addr,
2332 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2333 self.pg1.add_stream(p)
2334 self.pg_enable_capture(self.pg_interfaces)
2336 capture = self.pg1.get_capture(1)
2337 self.assertTrue(capture[0].haslayer(ARP))
2338 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2340 # send ARP to non-NAT44 interface
2341 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2342 ARP(op=ARP.who_has, pdst=self.nat_addr,
2343 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2344 self.pg2.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.assert_nothing_captured()
2349 # remove addresses and verify
2350 self.nat44_add_address(self.nat_addr, is_add=0)
2351 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=self.nat_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2363 ARP(op=ARP.who_has, pdst=static_addr,
2364 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2365 self.pg1.add_stream(p)
2366 self.pg_enable_capture(self.pg_interfaces)
2368 self.pg1.assert_nothing_captured()
2370 def test_vrf_mode(self):
2371 """ NAT44 tenant VRF aware address pool mode """
2375 nat_ip1 = "10.0.0.10"
2376 nat_ip2 = "10.0.0.11"
2378 self.pg0.unconfig_ip4()
2379 self.pg1.unconfig_ip4()
2380 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2381 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2382 self.pg0.set_table_ip4(vrf_id1)
2383 self.pg1.set_table_ip4(vrf_id2)
2384 self.pg0.config_ip4()
2385 self.pg1.config_ip4()
2386 self.pg0.resolve_arp()
2387 self.pg1.resolve_arp()
2389 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2390 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2391 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2392 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2393 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2398 pkts = self.create_stream_in(self.pg0, self.pg2)
2399 self.pg0.add_stream(pkts)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 capture = self.pg2.get_capture(len(pkts))
2403 self.verify_capture_out(capture, nat_ip1)
2406 pkts = self.create_stream_in(self.pg1, self.pg2)
2407 self.pg1.add_stream(pkts)
2408 self.pg_enable_capture(self.pg_interfaces)
2410 capture = self.pg2.get_capture(len(pkts))
2411 self.verify_capture_out(capture, nat_ip2)
2414 self.pg0.unconfig_ip4()
2415 self.pg1.unconfig_ip4()
2416 self.pg0.set_table_ip4(0)
2417 self.pg1.set_table_ip4(0)
2418 self.pg0.config_ip4()
2419 self.pg1.config_ip4()
2420 self.pg0.resolve_arp()
2421 self.pg1.resolve_arp()
2422 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2423 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2425 def test_vrf_feature_independent(self):
2426 """ NAT44 tenant VRF independent address pool mode """
2428 nat_ip1 = "10.0.0.10"
2429 nat_ip2 = "10.0.0.11"
2431 self.nat44_add_address(nat_ip1)
2432 self.nat44_add_address(nat_ip2, vrf_id=99)
2433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2434 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2435 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2439 pkts = self.create_stream_in(self.pg0, self.pg2)
2440 self.pg0.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2447 pkts = self.create_stream_in(self.pg1, self.pg2)
2448 self.pg1.add_stream(pkts)
2449 self.pg_enable_capture(self.pg_interfaces)
2451 capture = self.pg2.get_capture(len(pkts))
2452 self.verify_capture_out(capture, nat_ip1)
2454 def test_dynamic_ipless_interfaces(self):
2455 """ NAT44 interfaces without configured IP address """
2457 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2458 mactobinary(self.pg7.remote_mac),
2459 self.pg7.remote_ip4n,
2461 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2462 mactobinary(self.pg8.remote_mac),
2463 self.pg8.remote_ip4n,
2466 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2467 dst_address_length=32,
2468 next_hop_address=self.pg7.remote_ip4n,
2469 next_hop_sw_if_index=self.pg7.sw_if_index)
2470 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2471 dst_address_length=32,
2472 next_hop_address=self.pg8.remote_ip4n,
2473 next_hop_sw_if_index=self.pg8.sw_if_index)
2475 self.nat44_add_address(self.nat_addr)
2476 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2477 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2481 pkts = self.create_stream_in(self.pg7, self.pg8)
2482 self.pg7.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg8.get_capture(len(pkts))
2486 self.verify_capture_out(capture)
2489 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2490 self.pg8.add_stream(pkts)
2491 self.pg_enable_capture(self.pg_interfaces)
2493 capture = self.pg7.get_capture(len(pkts))
2494 self.verify_capture_in(capture, self.pg7)
2496 def test_static_ipless_interfaces(self):
2497 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2499 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2500 mactobinary(self.pg7.remote_mac),
2501 self.pg7.remote_ip4n,
2503 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2504 mactobinary(self.pg8.remote_mac),
2505 self.pg8.remote_ip4n,
2508 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2509 dst_address_length=32,
2510 next_hop_address=self.pg7.remote_ip4n,
2511 next_hop_sw_if_index=self.pg7.sw_if_index)
2512 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2513 dst_address_length=32,
2514 next_hop_address=self.pg8.remote_ip4n,
2515 next_hop_sw_if_index=self.pg8.sw_if_index)
2517 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2518 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2519 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2523 pkts = self.create_stream_out(self.pg8)
2524 self.pg8.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg7.get_capture(len(pkts))
2528 self.verify_capture_in(capture, self.pg7)
2531 pkts = self.create_stream_in(self.pg7, self.pg8)
2532 self.pg7.add_stream(pkts)
2533 self.pg_enable_capture(self.pg_interfaces)
2535 capture = self.pg8.get_capture(len(pkts))
2536 self.verify_capture_out(capture, self.nat_addr, True)
2538 def test_static_with_port_ipless_interfaces(self):
2539 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2541 self.tcp_port_out = 30606
2542 self.udp_port_out = 30607
2543 self.icmp_id_out = 30608
2545 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2546 mactobinary(self.pg7.remote_mac),
2547 self.pg7.remote_ip4n,
2549 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2550 mactobinary(self.pg8.remote_mac),
2551 self.pg8.remote_ip4n,
2554 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2555 dst_address_length=32,
2556 next_hop_address=self.pg7.remote_ip4n,
2557 next_hop_sw_if_index=self.pg7.sw_if_index)
2558 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2559 dst_address_length=32,
2560 next_hop_address=self.pg8.remote_ip4n,
2561 next_hop_sw_if_index=self.pg8.sw_if_index)
2563 self.nat44_add_address(self.nat_addr)
2564 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2565 self.tcp_port_in, self.tcp_port_out,
2566 proto=IP_PROTOS.tcp)
2567 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2568 self.udp_port_in, self.udp_port_out,
2569 proto=IP_PROTOS.udp)
2570 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2571 self.icmp_id_in, self.icmp_id_out,
2572 proto=IP_PROTOS.icmp)
2573 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2574 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2578 pkts = self.create_stream_out(self.pg8)
2579 self.pg8.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg7.get_capture(len(pkts))
2583 self.verify_capture_in(capture, self.pg7)
2586 pkts = self.create_stream_in(self.pg7, self.pg8)
2587 self.pg7.add_stream(pkts)
2588 self.pg_enable_capture(self.pg_interfaces)
2590 capture = self.pg8.get_capture(len(pkts))
2591 self.verify_capture_out(capture)
2593 def test_static_unknown_proto(self):
2594 """ 1:1 NAT translate packet with unknown protocol """
2595 nat_ip = "10.0.0.10"
2596 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2602 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2603 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2605 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2606 TCP(sport=1234, dport=1234))
2607 self.pg0.add_stream(p)
2608 self.pg_enable_capture(self.pg_interfaces)
2610 p = self.pg1.get_capture(1)
2613 self.assertEqual(packet[IP].src, nat_ip)
2614 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2615 self.assertTrue(packet.haslayer(GRE))
2616 self.assert_packet_checksums_valid(packet)
2618 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2622 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2623 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2625 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2626 TCP(sport=1234, dport=1234))
2627 self.pg1.add_stream(p)
2628 self.pg_enable_capture(self.pg_interfaces)
2630 p = self.pg0.get_capture(1)
2633 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2634 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2635 self.assertTrue(packet.haslayer(GRE))
2636 self.assert_packet_checksums_valid(packet)
2638 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2641 def test_hairpinning_static_unknown_proto(self):
2642 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2644 host = self.pg0.remote_hosts[0]
2645 server = self.pg0.remote_hosts[1]
2647 host_nat_ip = "10.0.0.10"
2648 server_nat_ip = "10.0.0.11"
2650 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2651 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2657 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2658 IP(src=host.ip4, dst=server_nat_ip) /
2660 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2661 TCP(sport=1234, dport=1234))
2662 self.pg0.add_stream(p)
2663 self.pg_enable_capture(self.pg_interfaces)
2665 p = self.pg0.get_capture(1)
2668 self.assertEqual(packet[IP].src, host_nat_ip)
2669 self.assertEqual(packet[IP].dst, server.ip4)
2670 self.assertTrue(packet.haslayer(GRE))
2671 self.assert_packet_checksums_valid(packet)
2673 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2677 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2678 IP(src=server.ip4, dst=host_nat_ip) /
2680 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2681 TCP(sport=1234, dport=1234))
2682 self.pg0.add_stream(p)
2683 self.pg_enable_capture(self.pg_interfaces)
2685 p = self.pg0.get_capture(1)
2688 self.assertEqual(packet[IP].src, server_nat_ip)
2689 self.assertEqual(packet[IP].dst, host.ip4)
2690 self.assertTrue(packet.haslayer(GRE))
2691 self.assert_packet_checksums_valid(packet)
2693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2696 def test_output_feature(self):
2697 """ NAT44 interface output feature (in2out postrouting) """
2698 self.nat44_add_address(self.nat_addr)
2699 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2700 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2701 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2705 pkts = self.create_stream_in(self.pg0, self.pg3)
2706 self.pg0.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg3.get_capture(len(pkts))
2710 self.verify_capture_out(capture)
2713 pkts = self.create_stream_out(self.pg3)
2714 self.pg3.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_in(capture, self.pg0)
2720 # from non-NAT interface to NAT inside interface
2721 pkts = self.create_stream_in(self.pg2, self.pg0)
2722 self.pg2.add_stream(pkts)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 capture = self.pg0.get_capture(len(pkts))
2726 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2728 def test_output_feature_vrf_aware(self):
2729 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2730 nat_ip_vrf10 = "10.0.0.10"
2731 nat_ip_vrf20 = "10.0.0.20"
2733 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg3.remote_ip4n,
2736 next_hop_sw_if_index=self.pg3.sw_if_index,
2738 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2739 dst_address_length=32,
2740 next_hop_address=self.pg3.remote_ip4n,
2741 next_hop_sw_if_index=self.pg3.sw_if_index,
2744 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2745 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2746 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2747 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2748 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2752 pkts = self.create_stream_in(self.pg4, self.pg3)
2753 self.pg4.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg3.get_capture(len(pkts))
2757 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2760 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2761 self.pg3.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg4.get_capture(len(pkts))
2765 self.verify_capture_in(capture, self.pg4)
2768 pkts = self.create_stream_in(self.pg6, self.pg3)
2769 self.pg6.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg3.get_capture(len(pkts))
2773 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2776 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2777 self.pg3.add_stream(pkts)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 capture = self.pg6.get_capture(len(pkts))
2781 self.verify_capture_in(capture, self.pg6)
2783 def test_output_feature_hairpinning(self):
2784 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2785 host = self.pg0.remote_hosts[0]
2786 server = self.pg0.remote_hosts[1]
2789 server_in_port = 5678
2790 server_out_port = 8765
2792 self.nat44_add_address(self.nat_addr)
2793 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2794 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2797 # add static mapping for server
2798 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2799 server_in_port, server_out_port,
2800 proto=IP_PROTOS.tcp)
2802 # send packet from host to server
2803 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2804 IP(src=host.ip4, dst=self.nat_addr) /
2805 TCP(sport=host_in_port, dport=server_out_port))
2806 self.pg0.add_stream(p)
2807 self.pg_enable_capture(self.pg_interfaces)
2809 capture = self.pg0.get_capture(1)
2814 self.assertEqual(ip.src, self.nat_addr)
2815 self.assertEqual(ip.dst, server.ip4)
2816 self.assertNotEqual(tcp.sport, host_in_port)
2817 self.assertEqual(tcp.dport, server_in_port)
2818 self.assert_packet_checksums_valid(p)
2819 host_out_port = tcp.sport
2821 self.logger.error(ppp("Unexpected or invalid packet:", p))
2824 # send reply from server to host
2825 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2826 IP(src=server.ip4, dst=self.nat_addr) /
2827 TCP(sport=server_in_port, dport=host_out_port))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 capture = self.pg0.get_capture(1)
2836 self.assertEqual(ip.src, self.nat_addr)
2837 self.assertEqual(ip.dst, host.ip4)
2838 self.assertEqual(tcp.sport, server_out_port)
2839 self.assertEqual(tcp.dport, host_in_port)
2840 self.assert_packet_checksums_valid(p)
2842 self.logger.error(ppp("Unexpected or invalid packet:", p))
2845 def test_one_armed_nat44(self):
2846 """ One armed NAT44 """
2847 remote_host = self.pg9.remote_hosts[0]
2848 local_host = self.pg9.remote_hosts[1]
2851 self.nat44_add_address(self.nat_addr)
2852 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2853 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2857 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2858 IP(src=local_host.ip4, dst=remote_host.ip4) /
2859 TCP(sport=12345, dport=80))
2860 self.pg9.add_stream(p)
2861 self.pg_enable_capture(self.pg_interfaces)
2863 capture = self.pg9.get_capture(1)
2868 self.assertEqual(ip.src, self.nat_addr)
2869 self.assertEqual(ip.dst, remote_host.ip4)
2870 self.assertNotEqual(tcp.sport, 12345)
2871 external_port = tcp.sport
2872 self.assertEqual(tcp.dport, 80)
2873 self.assert_packet_checksums_valid(p)
2875 self.logger.error(ppp("Unexpected or invalid packet:", p))
2879 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2880 IP(src=remote_host.ip4, dst=self.nat_addr) /
2881 TCP(sport=80, dport=external_port))
2882 self.pg9.add_stream(p)
2883 self.pg_enable_capture(self.pg_interfaces)
2885 capture = self.pg9.get_capture(1)
2890 self.assertEqual(ip.src, remote_host.ip4)
2891 self.assertEqual(ip.dst, local_host.ip4)
2892 self.assertEqual(tcp.sport, 80)
2893 self.assertEqual(tcp.dport, 12345)
2894 self.assert_packet_checksums_valid(p)
2896 self.logger.error(ppp("Unexpected or invalid packet:", p))
2899 def test_del_session(self):
2900 """ Delete NAT44 session """
2901 self.nat44_add_address(self.nat_addr)
2902 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2903 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2906 pkts = self.create_stream_in(self.pg0, self.pg1)
2907 self.pg0.add_stream(pkts)
2908 self.pg_enable_capture(self.pg_interfaces)
2910 self.pg1.get_capture(len(pkts))
2912 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2913 nsessions = len(sessions)
2915 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2916 sessions[0].inside_port,
2917 sessions[0].protocol)
2918 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2919 sessions[1].outside_port,
2920 sessions[1].protocol,
2923 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2924 self.assertEqual(nsessions - len(sessions), 2)
2926 def test_set_get_reass(self):
2927 """ NAT44 set/get virtual fragmentation reassembly """
2928 reas_cfg1 = self.vapi.nat_get_reass()
2930 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2931 max_reass=reas_cfg1.ip4_max_reass * 2,
2932 max_frag=reas_cfg1.ip4_max_frag * 2)
2934 reas_cfg2 = self.vapi.nat_get_reass()
2936 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2937 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2938 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2940 self.vapi.nat_set_reass(drop_frag=1)
2941 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2943 def test_frag_in_order(self):
2944 """ NAT44 translate fragments arriving in order """
2945 self.nat44_add_address(self.nat_addr)
2946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2947 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2950 data = "A" * 4 + "B" * 16 + "C" * 3
2951 self.tcp_port_in = random.randint(1025, 65535)
2953 reass = self.vapi.nat_reass_dump()
2954 reass_n_start = len(reass)
2957 pkts = self.create_stream_frag(self.pg0,
2958 self.pg1.remote_ip4,
2962 self.pg0.add_stream(pkts)
2963 self.pg_enable_capture(self.pg_interfaces)
2965 frags = self.pg1.get_capture(len(pkts))
2966 p = self.reass_frags_and_verify(frags,
2968 self.pg1.remote_ip4)
2969 self.assertEqual(p[TCP].dport, 20)
2970 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2971 self.tcp_port_out = p[TCP].sport
2972 self.assertEqual(data, p[Raw].load)
2975 pkts = self.create_stream_frag(self.pg1,
2980 self.pg1.add_stream(pkts)
2981 self.pg_enable_capture(self.pg_interfaces)
2983 frags = self.pg0.get_capture(len(pkts))
2984 p = self.reass_frags_and_verify(frags,
2985 self.pg1.remote_ip4,
2986 self.pg0.remote_ip4)
2987 self.assertEqual(p[TCP].sport, 20)
2988 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2989 self.assertEqual(data, p[Raw].load)
2991 reass = self.vapi.nat_reass_dump()
2992 reass_n_end = len(reass)
2994 self.assertEqual(reass_n_end - reass_n_start, 2)
2996 def test_reass_hairpinning(self):
2997 """ NAT44 fragments hairpinning """
2998 server = self.pg0.remote_hosts[1]
2999 host_in_port = random.randint(1025, 65535)
3000 server_in_port = random.randint(1025, 65535)
3001 server_out_port = random.randint(1025, 65535)
3002 data = "A" * 4 + "B" * 16 + "C" * 3
3004 self.nat44_add_address(self.nat_addr)
3005 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3006 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3008 # add static mapping for server
3009 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3010 server_in_port, server_out_port,
3011 proto=IP_PROTOS.tcp)
3013 # send packet from host to server
3014 pkts = self.create_stream_frag(self.pg0,
3019 self.pg0.add_stream(pkts)
3020 self.pg_enable_capture(self.pg_interfaces)
3022 frags = self.pg0.get_capture(len(pkts))
3023 p = self.reass_frags_and_verify(frags,
3026 self.assertNotEqual(p[TCP].sport, host_in_port)
3027 self.assertEqual(p[TCP].dport, server_in_port)
3028 self.assertEqual(data, p[Raw].load)
3030 def test_frag_out_of_order(self):
3031 """ NAT44 translate fragments arriving out of order """
3032 self.nat44_add_address(self.nat_addr)
3033 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3034 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3037 data = "A" * 4 + "B" * 16 + "C" * 3
3038 random.randint(1025, 65535)
3041 pkts = self.create_stream_frag(self.pg0,
3042 self.pg1.remote_ip4,
3047 self.pg0.add_stream(pkts)
3048 self.pg_enable_capture(self.pg_interfaces)
3050 frags = self.pg1.get_capture(len(pkts))
3051 p = self.reass_frags_and_verify(frags,
3053 self.pg1.remote_ip4)
3054 self.assertEqual(p[TCP].dport, 20)
3055 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3056 self.tcp_port_out = p[TCP].sport
3057 self.assertEqual(data, p[Raw].load)
3060 pkts = self.create_stream_frag(self.pg1,
3066 self.pg1.add_stream(pkts)
3067 self.pg_enable_capture(self.pg_interfaces)
3069 frags = self.pg0.get_capture(len(pkts))
3070 p = self.reass_frags_and_verify(frags,
3071 self.pg1.remote_ip4,
3072 self.pg0.remote_ip4)
3073 self.assertEqual(p[TCP].sport, 20)
3074 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3075 self.assertEqual(data, p[Raw].load)
3077 def test_port_restricted(self):
3078 """ Port restricted NAT44 (MAP-E CE) """
3079 self.nat44_add_address(self.nat_addr)
3080 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3081 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3083 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3084 "psid-offset 6 psid-len 6")
3086 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3087 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3088 TCP(sport=4567, dport=22))
3089 self.pg0.add_stream(p)
3090 self.pg_enable_capture(self.pg_interfaces)
3092 capture = self.pg1.get_capture(1)
3097 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3098 self.assertEqual(ip.src, self.nat_addr)
3099 self.assertEqual(tcp.dport, 22)
3100 self.assertNotEqual(tcp.sport, 4567)
3101 self.assertEqual((tcp.sport >> 6) & 63, 10)
3102 self.assert_packet_checksums_valid(p)
3104 self.logger.error(ppp("Unexpected or invalid packet:", p))
3107 def test_ipfix_max_frags(self):
3108 """ IPFIX logging maximum fragments pending reassembly exceeded """
3109 self.nat44_add_address(self.nat_addr)
3110 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3111 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3113 self.vapi.nat_set_reass(max_frag=0)
3114 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3115 src_address=self.pg3.local_ip4n,
3117 template_interval=10)
3118 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3119 src_port=self.ipfix_src_port)
3121 data = "A" * 4 + "B" * 16 + "C" * 3
3122 self.tcp_port_in = random.randint(1025, 65535)
3123 pkts = self.create_stream_frag(self.pg0,
3124 self.pg1.remote_ip4,
3128 self.pg0.add_stream(pkts[-1])
3129 self.pg_enable_capture(self.pg_interfaces)
3131 self.pg1.assert_nothing_captured()
3133 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3134 capture = self.pg3.get_capture(9)
3135 ipfix = IPFIXDecoder()
3136 # first load template
3138 self.assertTrue(p.haslayer(IPFIX))
3139 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3140 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3141 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3142 self.assertEqual(p[UDP].dport, 4739)
3143 self.assertEqual(p[IPFIX].observationDomainID,
3144 self.ipfix_domain_id)
3145 if p.haslayer(Template):
3146 ipfix.add_template(p.getlayer(Template))
3147 # verify events in data set
3149 if p.haslayer(Data):
3150 data = ipfix.decode_data_set(p.getlayer(Set))
3151 self.verify_ipfix_max_fragments_ip4(data, 0,
3152 self.pg0.remote_ip4n)
3154 def test_multiple_outside_vrf(self):
3155 """ Multiple outside VRF """
3159 self.pg1.unconfig_ip4()
3160 self.pg2.unconfig_ip4()
3161 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3162 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3163 self.pg1.set_table_ip4(vrf_id1)
3164 self.pg2.set_table_ip4(vrf_id2)
3165 self.pg1.config_ip4()
3166 self.pg2.config_ip4()
3167 self.pg1.resolve_arp()
3168 self.pg2.resolve_arp()
3170 self.nat44_add_address(self.nat_addr)
3171 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3172 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3174 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3179 pkts = self.create_stream_in(self.pg0, self.pg1)
3180 self.pg0.add_stream(pkts)
3181 self.pg_enable_capture(self.pg_interfaces)
3183 capture = self.pg1.get_capture(len(pkts))
3184 self.verify_capture_out(capture, self.nat_addr)
3186 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3187 self.pg1.add_stream(pkts)
3188 self.pg_enable_capture(self.pg_interfaces)
3190 capture = self.pg0.get_capture(len(pkts))
3191 self.verify_capture_in(capture, self.pg0)
3193 self.tcp_port_in = 60303
3194 self.udp_port_in = 60304
3195 self.icmp_id_in = 60305
3198 pkts = self.create_stream_in(self.pg0, self.pg2)
3199 self.pg0.add_stream(pkts)
3200 self.pg_enable_capture(self.pg_interfaces)
3202 capture = self.pg2.get_capture(len(pkts))
3203 self.verify_capture_out(capture, self.nat_addr)
3205 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3206 self.pg2.add_stream(pkts)
3207 self.pg_enable_capture(self.pg_interfaces)
3209 capture = self.pg0.get_capture(len(pkts))
3210 self.verify_capture_in(capture, self.pg0)
3213 self.pg1.unconfig_ip4()
3214 self.pg2.unconfig_ip4()
3215 self.pg1.set_table_ip4(0)
3216 self.pg2.set_table_ip4(0)
3217 self.pg1.config_ip4()
3218 self.pg2.config_ip4()
3219 self.pg1.resolve_arp()
3220 self.pg2.resolve_arp()
3223 super(TestNAT44, self).tearDown()
3224 if not self.vpp_dead:
3225 self.logger.info(self.vapi.cli("show nat44 addresses"))
3226 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3227 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3228 self.logger.info(self.vapi.cli("show nat44 interface address"))
3229 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3230 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3231 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3232 self.vapi.cli("nat addr-port-assignment-alg default")
3234 self.vapi.cli("clear logging")
3237 class TestNAT44EndpointDependent(MethodHolder):
3238 """ Endpoint-Dependent mapping and filtering test cases """
3241 def setUpConstants(cls):
3242 super(TestNAT44EndpointDependent, cls).setUpConstants()
3243 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3246 def setUpClass(cls):
3247 super(TestNAT44EndpointDependent, cls).setUpClass()
3248 cls.vapi.cli("set log class nat level debug")
3250 cls.tcp_port_in = 6303
3251 cls.tcp_port_out = 6303
3252 cls.udp_port_in = 6304
3253 cls.udp_port_out = 6304
3254 cls.icmp_id_in = 6305
3255 cls.icmp_id_out = 6305
3256 cls.nat_addr = '10.0.0.3'
3257 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3258 cls.ipfix_src_port = 4739
3259 cls.ipfix_domain_id = 1
3260 cls.tcp_external_port = 80
3262 cls.create_pg_interfaces(range(7))
3263 cls.interfaces = list(cls.pg_interfaces[0:3])
3265 for i in cls.interfaces:
3270 cls.pg0.generate_remote_hosts(3)
3271 cls.pg0.configure_ipv4_neighbors()
3275 cls.pg4.generate_remote_hosts(2)
3276 cls.pg4.config_ip4()
3277 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3278 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3282 cls.pg4.resolve_arp()
3283 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3284 cls.pg4.resolve_arp()
3286 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3287 cls.vapi.ip_table_add_del(1, is_add=1)
3289 cls.pg5._local_ip4 = "10.1.1.1"
3290 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3292 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3293 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3294 socket.AF_INET, cls.pg5.remote_ip4)
3295 cls.pg5.set_table_ip4(1)
3296 cls.pg5.config_ip4()
3298 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3299 dst_address_length=32,
3301 next_hop_sw_if_index=cls.pg5.sw_if_index,
3302 next_hop_address=zero_ip4n)
3304 cls.pg6._local_ip4 = "10.1.2.1"
3305 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3307 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3308 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3309 socket.AF_INET, cls.pg6.remote_ip4)
3310 cls.pg6.set_table_ip4(1)
3311 cls.pg6.config_ip4()
3313 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3314 dst_address_length=32,
3316 next_hop_sw_if_index=cls.pg6.sw_if_index,
3317 next_hop_address=zero_ip4n)
3319 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3320 dst_address_length=16,
3321 next_hop_address=zero_ip4n,
3323 next_hop_table_id=1)
3324 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3325 dst_address_length=0,
3326 next_hop_address=zero_ip4n,
3328 next_hop_table_id=0)
3329 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3330 dst_address_length=0,
3332 next_hop_sw_if_index=cls.pg1.sw_if_index,
3333 next_hop_address=cls.pg1.local_ip4n)
3335 cls.pg5.resolve_arp()
3336 cls.pg6.resolve_arp()
3339 super(TestNAT44EndpointDependent, cls).tearDownClass()
3342 def test_dynamic(self):
3343 """ NAT44 dynamic translation test """
3345 self.nat44_add_address(self.nat_addr)
3346 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3347 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3351 pkts = self.create_stream_in(self.pg0, self.pg1)
3352 self.pg0.add_stream(pkts)
3353 self.pg_enable_capture(self.pg_interfaces)
3355 capture = self.pg1.get_capture(len(pkts))
3356 self.verify_capture_out(capture)
3359 pkts = self.create_stream_out(self.pg1)
3360 self.pg1.add_stream(pkts)
3361 self.pg_enable_capture(self.pg_interfaces)
3363 capture = self.pg0.get_capture(len(pkts))
3364 self.verify_capture_in(capture, self.pg0)
3366 def test_forwarding(self):
3367 """ NAT44 forwarding test """
3369 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3370 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3372 self.vapi.nat44_forwarding_enable_disable(1)
3374 real_ip = self.pg0.remote_ip4n
3375 alias_ip = self.nat_addr_n
3376 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3377 external_ip=alias_ip)
3380 # in2out - static mapping match
3382 pkts = self.create_stream_out(self.pg1)
3383 self.pg1.add_stream(pkts)
3384 self.pg_enable_capture(self.pg_interfaces)
3386 capture = self.pg0.get_capture(len(pkts))
3387 self.verify_capture_in(capture, self.pg0)
3389 pkts = self.create_stream_in(self.pg0, self.pg1)
3390 self.pg0.add_stream(pkts)
3391 self.pg_enable_capture(self.pg_interfaces)
3393 capture = self.pg1.get_capture(len(pkts))
3394 self.verify_capture_out(capture, same_port=True)
3396 # in2out - no static mapping match
3398 host0 = self.pg0.remote_hosts[0]
3399 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3401 pkts = self.create_stream_out(self.pg1,
3402 dst_ip=self.pg0.remote_ip4,
3403 use_inside_ports=True)
3404 self.pg1.add_stream(pkts)
3405 self.pg_enable_capture(self.pg_interfaces)
3407 capture = self.pg0.get_capture(len(pkts))
3408 self.verify_capture_in(capture, self.pg0)
3410 pkts = self.create_stream_in(self.pg0, self.pg1)
3411 self.pg0.add_stream(pkts)
3412 self.pg_enable_capture(self.pg_interfaces)
3414 capture = self.pg1.get_capture(len(pkts))
3415 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3418 self.pg0.remote_hosts[0] = host0
3420 user = self.pg0.remote_hosts[1]
3421 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3422 self.assertEqual(len(sessions), 3)
3423 self.assertTrue(sessions[0].ext_host_valid)
3424 self.vapi.nat44_del_session(
3425 sessions[0].inside_ip_address,
3426 sessions[0].inside_port,
3427 sessions[0].protocol,
3428 ext_host_address=sessions[0].ext_host_address,
3429 ext_host_port=sessions[0].ext_host_port)
3430 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3431 self.assertEqual(len(sessions), 2)
3434 self.vapi.nat44_forwarding_enable_disable(0)
3435 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3436 external_ip=alias_ip,
3439 def test_static_lb(self):
3440 """ NAT44 local service load balancing """
3441 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3444 server1 = self.pg0.remote_hosts[0]
3445 server2 = self.pg0.remote_hosts[1]
3447 locals = [{'addr': server1.ip4n,
3450 {'addr': server2.ip4n,
3454 self.nat44_add_address(self.nat_addr)
3455 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3458 local_num=len(locals),
3460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3464 # from client to service
3465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3466 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3467 TCP(sport=12345, dport=external_port))
3468 self.pg1.add_stream(p)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(1)
3477 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3478 if ip.dst == server1.ip4:
3482 self.assertEqual(tcp.dport, local_port)
3483 self.assert_packet_checksums_valid(p)
3485 self.logger.error(ppp("Unexpected or invalid packet:", p))
3488 # from service back to client
3489 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3490 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3491 TCP(sport=local_port, dport=12345))
3492 self.pg0.add_stream(p)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 capture = self.pg1.get_capture(1)
3500 self.assertEqual(ip.src, self.nat_addr)
3501 self.assertEqual(tcp.sport, external_port)
3502 self.assert_packet_checksums_valid(p)
3504 self.logger.error(ppp("Unexpected or invalid packet:", p))
3507 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3508 self.assertEqual(len(sessions), 1)
3509 self.assertTrue(sessions[0].ext_host_valid)
3510 self.vapi.nat44_del_session(
3511 sessions[0].inside_ip_address,
3512 sessions[0].inside_port,
3513 sessions[0].protocol,
3514 ext_host_address=sessions[0].ext_host_address,
3515 ext_host_port=sessions[0].ext_host_port)
3516 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3517 self.assertEqual(len(sessions), 0)
3519 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3520 def test_static_lb_multi_clients(self):
3521 """ NAT44 local service load balancing - multiple clients"""
3523 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3526 server1 = self.pg0.remote_hosts[0]
3527 server2 = self.pg0.remote_hosts[1]
3529 locals = [{'addr': server1.ip4n,
3532 {'addr': server2.ip4n,
3536 self.nat44_add_address(self.nat_addr)
3537 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3540 local_num=len(locals),
3542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3548 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3550 for client in clients:
3551 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3552 IP(src=client, dst=self.nat_addr) /
3553 TCP(sport=12345, dport=external_port))
3555 self.pg1.add_stream(pkts)
3556 self.pg_enable_capture(self.pg_interfaces)
3558 capture = self.pg0.get_capture(len(pkts))
3560 if p[IP].dst == server1.ip4:
3564 self.assertTrue(server1_n > server2_n)
3566 def test_static_lb_2(self):
3567 """ NAT44 local service load balancing (asymmetrical rule) """
3568 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3571 server1 = self.pg0.remote_hosts[0]
3572 server2 = self.pg0.remote_hosts[1]
3574 locals = [{'addr': server1.ip4n,
3577 {'addr': server2.ip4n,
3581 self.vapi.nat44_forwarding_enable_disable(1)
3582 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3586 local_num=len(locals),
3588 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3589 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3592 # from client to service
3593 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3594 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3595 TCP(sport=12345, dport=external_port))
3596 self.pg1.add_stream(p)
3597 self.pg_enable_capture(self.pg_interfaces)
3599 capture = self.pg0.get_capture(1)
3605 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3606 if ip.dst == server1.ip4:
3610 self.assertEqual(tcp.dport, local_port)
3611 self.assert_packet_checksums_valid(p)
3613 self.logger.error(ppp("Unexpected or invalid packet:", p))
3616 # from service back to client
3617 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3618 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3619 TCP(sport=local_port, dport=12345))
3620 self.pg0.add_stream(p)
3621 self.pg_enable_capture(self.pg_interfaces)
3623 capture = self.pg1.get_capture(1)
3628 self.assertEqual(ip.src, self.nat_addr)
3629 self.assertEqual(tcp.sport, external_port)
3630 self.assert_packet_checksums_valid(p)
3632 self.logger.error(ppp("Unexpected or invalid packet:", p))
3635 # from client to server (no translation)
3636 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3637 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3638 TCP(sport=12346, dport=local_port))
3639 self.pg1.add_stream(p)
3640 self.pg_enable_capture(self.pg_interfaces)
3642 capture = self.pg0.get_capture(1)
3648 self.assertEqual(ip.dst, server1.ip4)
3649 self.assertEqual(tcp.dport, local_port)
3650 self.assert_packet_checksums_valid(p)
3652 self.logger.error(ppp("Unexpected or invalid packet:", p))
3655 # from service back to client (no translation)
3656 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3657 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3658 TCP(sport=local_port, dport=12346))
3659 self.pg0.add_stream(p)
3660 self.pg_enable_capture(self.pg_interfaces)
3662 capture = self.pg1.get_capture(1)
3667 self.assertEqual(ip.src, server1.ip4)
3668 self.assertEqual(tcp.sport, local_port)
3669 self.assert_packet_checksums_valid(p)
3671 self.logger.error(ppp("Unexpected or invalid packet:", p))
3674 def test_unknown_proto(self):
3675 """ NAT44 translate packet with unknown protocol """
3676 self.nat44_add_address(self.nat_addr)
3677 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3678 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3682 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3684 TCP(sport=self.tcp_port_in, dport=20))
3685 self.pg0.add_stream(p)
3686 self.pg_enable_capture(self.pg_interfaces)
3688 p = self.pg1.get_capture(1)
3690 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3691 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3693 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3694 TCP(sport=1234, dport=1234))
3695 self.pg0.add_stream(p)
3696 self.pg_enable_capture(self.pg_interfaces)
3698 p = self.pg1.get_capture(1)
3701 self.assertEqual(packet[IP].src, self.nat_addr)
3702 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3703 self.assertTrue(packet.haslayer(GRE))
3704 self.assert_packet_checksums_valid(packet)
3706 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3710 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3711 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3713 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3714 TCP(sport=1234, dport=1234))
3715 self.pg1.add_stream(p)
3716 self.pg_enable_capture(self.pg_interfaces)
3718 p = self.pg0.get_capture(1)
3721 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3722 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3723 self.assertTrue(packet.haslayer(GRE))
3724 self.assert_packet_checksums_valid(packet)
3726 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3729 def test_hairpinning_unknown_proto(self):
3730 """ NAT44 translate packet with unknown protocol - hairpinning """
3731 host = self.pg0.remote_hosts[0]
3732 server = self.pg0.remote_hosts[1]
3734 server_out_port = 8765
3735 server_nat_ip = "10.0.0.11"
3737 self.nat44_add_address(self.nat_addr)
3738 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3739 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3742 # add static mapping for server
3743 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3746 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3747 IP(src=host.ip4, dst=server_nat_ip) /
3748 TCP(sport=host_in_port, dport=server_out_port))
3749 self.pg0.add_stream(p)
3750 self.pg_enable_capture(self.pg_interfaces)
3752 self.pg0.get_capture(1)
3754 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3755 IP(src=host.ip4, dst=server_nat_ip) /
3757 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3758 TCP(sport=1234, dport=1234))
3759 self.pg0.add_stream(p)
3760 self.pg_enable_capture(self.pg_interfaces)
3762 p = self.pg0.get_capture(1)
3765 self.assertEqual(packet[IP].src, self.nat_addr)
3766 self.assertEqual(packet[IP].dst, server.ip4)
3767 self.assertTrue(packet.haslayer(GRE))
3768 self.assert_packet_checksums_valid(packet)
3770 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3774 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3775 IP(src=server.ip4, dst=self.nat_addr) /
3777 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3778 TCP(sport=1234, dport=1234))
3779 self.pg0.add_stream(p)
3780 self.pg_enable_capture(self.pg_interfaces)
3782 p = self.pg0.get_capture(1)
3785 self.assertEqual(packet[IP].src, server_nat_ip)
3786 self.assertEqual(packet[IP].dst, host.ip4)
3787 self.assertTrue(packet.haslayer(GRE))
3788 self.assert_packet_checksums_valid(packet)
3790 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3793 def test_output_feature_and_service(self):
3794 """ NAT44 interface output feature and services """
3795 external_addr = '1.2.3.4'
3799 self.vapi.nat44_forwarding_enable_disable(1)
3800 self.nat44_add_address(self.nat_addr)
3801 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3802 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3803 local_port, external_port,
3804 proto=IP_PROTOS.tcp, out2in_only=1)
3805 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3806 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3808 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3811 # from client to service
3812 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3813 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3814 TCP(sport=12345, dport=external_port))
3815 self.pg1.add_stream(p)
3816 self.pg_enable_capture(self.pg_interfaces)
3818 capture = self.pg0.get_capture(1)
3823 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3824 self.assertEqual(tcp.dport, local_port)
3825 self.assert_packet_checksums_valid(p)
3827 self.logger.error(ppp("Unexpected or invalid packet:", p))
3830 # from service back to client
3831 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3832 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3833 TCP(sport=local_port, dport=12345))
3834 self.pg0.add_stream(p)
3835 self.pg_enable_capture(self.pg_interfaces)
3837 capture = self.pg1.get_capture(1)
3842 self.assertEqual(ip.src, external_addr)
3843 self.assertEqual(tcp.sport, external_port)
3844 self.assert_packet_checksums_valid(p)
3846 self.logger.error(ppp("Unexpected or invalid packet:", p))
3849 # from local network host to external network
3850 pkts = self.create_stream_in(self.pg0, self.pg1)
3851 self.pg0.add_stream(pkts)
3852 self.pg_enable_capture(self.pg_interfaces)
3854 capture = self.pg1.get_capture(len(pkts))
3855 self.verify_capture_out(capture)
3856 pkts = self.create_stream_in(self.pg0, self.pg1)
3857 self.pg0.add_stream(pkts)
3858 self.pg_enable_capture(self.pg_interfaces)
3860 capture = self.pg1.get_capture(len(pkts))
3861 self.verify_capture_out(capture)
3863 # from external network back to local network host
3864 pkts = self.create_stream_out(self.pg1)
3865 self.pg1.add_stream(pkts)
3866 self.pg_enable_capture(self.pg_interfaces)
3868 capture = self.pg0.get_capture(len(pkts))
3869 self.verify_capture_in(capture, self.pg0)
3871 def test_output_feature_and_service2(self):
3872 """ NAT44 interface output feature and service host direct access """
3873 self.vapi.nat44_forwarding_enable_disable(1)
3874 self.nat44_add_address(self.nat_addr)
3875 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3878 # session initiaded from service host - translate
3879 pkts = self.create_stream_in(self.pg0, self.pg1)
3880 self.pg0.add_stream(pkts)
3881 self.pg_enable_capture(self.pg_interfaces)
3883 capture = self.pg1.get_capture(len(pkts))
3884 self.verify_capture_out(capture)
3886 pkts = self.create_stream_out(self.pg1)
3887 self.pg1.add_stream(pkts)
3888 self.pg_enable_capture(self.pg_interfaces)
3890 capture = self.pg0.get_capture(len(pkts))
3891 self.verify_capture_in(capture, self.pg0)
3893 # session initiaded from remote host - do not translate
3894 self.tcp_port_in = 60303
3895 self.udp_port_in = 60304
3896 self.icmp_id_in = 60305
3897 pkts = self.create_stream_out(self.pg1,
3898 self.pg0.remote_ip4,
3899 use_inside_ports=True)
3900 self.pg1.add_stream(pkts)
3901 self.pg_enable_capture(self.pg_interfaces)
3903 capture = self.pg0.get_capture(len(pkts))
3904 self.verify_capture_in(capture, self.pg0)
3906 pkts = self.create_stream_in(self.pg0, self.pg1)
3907 self.pg0.add_stream(pkts)
3908 self.pg_enable_capture(self.pg_interfaces)
3910 capture = self.pg1.get_capture(len(pkts))
3911 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3914 def test_output_feature_and_service3(self):
3915 """ NAT44 interface output feature and DST NAT """
3916 external_addr = '1.2.3.4'
3920 self.vapi.nat44_forwarding_enable_disable(1)
3921 self.nat44_add_address(self.nat_addr)
3922 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3923 local_port, external_port,
3924 proto=IP_PROTOS.tcp, out2in_only=1)
3925 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3926 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3928 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3931 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3932 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3933 TCP(sport=12345, dport=external_port))
3934 self.pg0.add_stream(p)
3935 self.pg_enable_capture(self.pg_interfaces)
3937 capture = self.pg1.get_capture(1)
3942 self.assertEqual(ip.src, self.pg0.remote_ip4)
3943 self.assertEqual(tcp.sport, 12345)
3944 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3945 self.assertEqual(tcp.dport, local_port)
3946 self.assert_packet_checksums_valid(p)
3948 self.logger.error(ppp("Unexpected or invalid packet:", p))
3951 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3952 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3953 TCP(sport=local_port, dport=12345))
3954 self.pg1.add_stream(p)
3955 self.pg_enable_capture(self.pg_interfaces)
3957 capture = self.pg0.get_capture(1)
3962 self.assertEqual(ip.src, external_addr)
3963 self.assertEqual(tcp.sport, external_port)
3964 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3965 self.assertEqual(tcp.dport, 12345)
3966 self.assert_packet_checksums_valid(p)
3968 self.logger.error(ppp("Unexpected or invalid packet:", p))
3971 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3973 twice_nat_addr = '10.0.1.3'
3981 port_in1 = port_in+1
3982 port_in2 = port_in+2
3987 server1 = self.pg0.remote_hosts[0]
3988 server2 = self.pg0.remote_hosts[1]
4000 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4003 self.nat44_add_address(self.nat_addr)
4004 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4006 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4008 proto=IP_PROTOS.tcp,
4009 twice_nat=int(not self_twice_nat),
4010 self_twice_nat=int(self_twice_nat))
4012 locals = [{'addr': server1.ip4n,
4015 {'addr': server2.ip4n,
4018 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4019 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4023 not self_twice_nat),
4026 local_num=len(locals),
4028 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4029 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4036 assert client_id is not None
4038 client = self.pg0.remote_hosts[0]
4039 elif client_id == 2:
4040 client = self.pg0.remote_hosts[1]
4042 client = pg1.remote_hosts[0]
4043 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4044 IP(src=client.ip4, dst=self.nat_addr) /
4045 TCP(sport=eh_port_out, dport=port_out))
4047 self.pg_enable_capture(self.pg_interfaces)
4049 capture = pg0.get_capture(1)
4055 if ip.dst == server1.ip4:
4061 self.assertEqual(ip.dst, server.ip4)
4063 self.assertIn(tcp.dport, [port_in1, port_in2])
4065 self.assertEqual(tcp.dport, port_in)
4067 self.assertEqual(ip.src, twice_nat_addr)
4068 self.assertNotEqual(tcp.sport, eh_port_out)
4070 self.assertEqual(ip.src, client.ip4)
4071 self.assertEqual(tcp.sport, eh_port_out)
4073 eh_port_in = tcp.sport
4074 saved_port_in = tcp.dport
4075 self.assert_packet_checksums_valid(p)
4077 self.logger.error(ppp("Unexpected or invalid packet:", p))
4080 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4081 IP(src=server.ip4, dst=eh_addr_in) /
4082 TCP(sport=saved_port_in, dport=eh_port_in))
4084 self.pg_enable_capture(self.pg_interfaces)
4086 capture = pg1.get_capture(1)
4091 self.assertEqual(ip.dst, client.ip4)
4092 self.assertEqual(ip.src, self.nat_addr)
4093 self.assertEqual(tcp.dport, eh_port_out)
4094 self.assertEqual(tcp.sport, port_out)
4095 self.assert_packet_checksums_valid(p)
4097 self.logger.error(ppp("Unexpected or invalid packet:", p))
4101 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4102 self.assertEqual(len(sessions), 1)
4103 self.assertTrue(sessions[0].ext_host_valid)
4104 self.assertTrue(sessions[0].is_twicenat)
4105 self.vapi.nat44_del_session(
4106 sessions[0].inside_ip_address,
4107 sessions[0].inside_port,
4108 sessions[0].protocol,
4109 ext_host_address=sessions[0].ext_host_nat_address,
4110 ext_host_port=sessions[0].ext_host_nat_port)
4111 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4112 self.assertEqual(len(sessions), 0)
4114 def test_twice_nat(self):
4116 self.twice_nat_common()
4118 def test_self_twice_nat_positive(self):
4119 """ Self Twice NAT44 (positive test) """
4120 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4122 def test_self_twice_nat_negative(self):
4123 """ Self Twice NAT44 (negative test) """
4124 self.twice_nat_common(self_twice_nat=True)
4126 def test_twice_nat_lb(self):
4127 """ Twice NAT44 local service load balancing """
4128 self.twice_nat_common(lb=True)
4130 def test_self_twice_nat_lb_positive(self):
4131 """ Self Twice NAT44 local service load balancing (positive test) """
4132 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4135 def test_self_twice_nat_lb_negative(self):
4136 """ Self Twice NAT44 local service load balancing (negative test) """
4137 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4140 def test_twice_nat_interface_addr(self):
4141 """ Acquire twice NAT44 addresses from interface """
4142 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4144 # no address in NAT pool
4145 adresses = self.vapi.nat44_address_dump()
4146 self.assertEqual(0, len(adresses))
4148 # configure interface address and check NAT address pool
4149 self.pg3.config_ip4()
4150 adresses = self.vapi.nat44_address_dump()
4151 self.assertEqual(1, len(adresses))
4152 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4153 self.assertEqual(adresses[0].twice_nat, 1)
4155 # remove interface address and check NAT address pool
4156 self.pg3.unconfig_ip4()
4157 adresses = self.vapi.nat44_address_dump()
4158 self.assertEqual(0, len(adresses))
4160 def test_tcp_session_close_in(self):
4161 """ Close TCP session from inside network """
4162 self.tcp_port_out = 10505
4163 self.nat44_add_address(self.nat_addr)
4164 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4168 proto=IP_PROTOS.tcp,
4170 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4171 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4174 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4175 start_sessnum = len(sessions)
4177 self.initiate_tcp_session(self.pg0, self.pg1)
4179 # FIN packet in -> out
4180 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4181 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4182 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4183 flags="FA", seq=100, ack=300))
4184 self.pg0.add_stream(p)
4185 self.pg_enable_capture(self.pg_interfaces)
4187 self.pg1.get_capture(1)
4191 # ACK packet out -> in
4192 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4193 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4194 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4195 flags="A", seq=300, ack=101))
4198 # FIN packet out -> in
4199 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4200 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4201 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4202 flags="FA", seq=300, ack=101))
4205 self.pg1.add_stream(pkts)
4206 self.pg_enable_capture(self.pg_interfaces)
4208 self.pg0.get_capture(2)
4210 # ACK packet in -> out
4211 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4212 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4213 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4214 flags="A", seq=101, ack=301))
4215 self.pg0.add_stream(p)
4216 self.pg_enable_capture(self.pg_interfaces)
4218 self.pg1.get_capture(1)
4220 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4222 self.assertEqual(len(sessions) - start_sessnum, 0)
4224 def test_tcp_session_close_out(self):
4225 """ Close TCP session from outside network """
4226 self.tcp_port_out = 10505
4227 self.nat44_add_address(self.nat_addr)
4228 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4232 proto=IP_PROTOS.tcp,
4234 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4235 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4238 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4239 start_sessnum = len(sessions)
4241 self.initiate_tcp_session(self.pg0, self.pg1)
4243 # FIN packet out -> in
4244 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4245 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4246 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4247 flags="FA", seq=100, ack=300))
4248 self.pg1.add_stream(p)
4249 self.pg_enable_capture(self.pg_interfaces)
4251 self.pg0.get_capture(1)
4253 # FIN+ACK packet in -> out
4254 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4255 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4256 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4257 flags="FA", seq=300, ack=101))
4259 self.pg0.add_stream(p)
4260 self.pg_enable_capture(self.pg_interfaces)
4262 self.pg1.get_capture(1)
4264 # ACK packet out -> in
4265 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4266 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4267 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4268 flags="A", seq=101, ack=301))
4269 self.pg1.add_stream(p)
4270 self.pg_enable_capture(self.pg_interfaces)
4272 self.pg0.get_capture(1)
4274 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4276 self.assertEqual(len(sessions) - start_sessnum, 0)
4278 def test_tcp_session_close_simultaneous(self):
4279 """ Close TCP session from inside network """
4280 self.tcp_port_out = 10505
4281 self.nat44_add_address(self.nat_addr)
4282 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4286 proto=IP_PROTOS.tcp,
4288 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4289 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4292 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4293 start_sessnum = len(sessions)
4295 self.initiate_tcp_session(self.pg0, self.pg1)
4297 # FIN packet in -> out
4298 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4299 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4300 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4301 flags="FA", seq=100, ack=300))
4302 self.pg0.add_stream(p)
4303 self.pg_enable_capture(self.pg_interfaces)
4305 self.pg1.get_capture(1)
4307 # FIN packet out -> in
4308 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4309 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4310 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4311 flags="FA", seq=300, ack=100))
4312 self.pg1.add_stream(p)
4313 self.pg_enable_capture(self.pg_interfaces)
4315 self.pg0.get_capture(1)
4317 # ACK packet in -> out
4318 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4319 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4320 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4321 flags="A", seq=101, ack=301))
4322 self.pg0.add_stream(p)
4323 self.pg_enable_capture(self.pg_interfaces)
4325 self.pg1.get_capture(1)
4327 # ACK packet out -> in
4328 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4329 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4330 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4331 flags="A", seq=301, ack=101))
4332 self.pg1.add_stream(p)
4333 self.pg_enable_capture(self.pg_interfaces)
4335 self.pg0.get_capture(1)
4337 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4339 self.assertEqual(len(sessions) - start_sessnum, 0)
4341 def test_one_armed_nat44_static(self):
4342 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4343 remote_host = self.pg4.remote_hosts[0]
4344 local_host = self.pg4.remote_hosts[1]
4349 self.vapi.nat44_forwarding_enable_disable(1)
4350 self.nat44_add_address(self.nat_addr, twice_nat=1)
4351 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4352 local_port, external_port,
4353 proto=IP_PROTOS.tcp, out2in_only=1,
4355 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4356 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4359 # from client to service
4360 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4361 IP(src=remote_host.ip4, dst=self.nat_addr) /
4362 TCP(sport=12345, dport=external_port))
4363 self.pg4.add_stream(p)
4364 self.pg_enable_capture(self.pg_interfaces)
4366 capture = self.pg4.get_capture(1)
4371 self.assertEqual(ip.dst, local_host.ip4)
4372 self.assertEqual(ip.src, self.nat_addr)
4373 self.assertEqual(tcp.dport, local_port)
4374 self.assertNotEqual(tcp.sport, 12345)
4375 eh_port_in = tcp.sport
4376 self.assert_packet_checksums_valid(p)
4378 self.logger.error(ppp("Unexpected or invalid packet:", p))
4381 # from service back to client
4382 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4383 IP(src=local_host.ip4, dst=self.nat_addr) /
4384 TCP(sport=local_port, dport=eh_port_in))
4385 self.pg4.add_stream(p)
4386 self.pg_enable_capture(self.pg_interfaces)
4388 capture = self.pg4.get_capture(1)
4393 self.assertEqual(ip.src, self.nat_addr)
4394 self.assertEqual(ip.dst, remote_host.ip4)
4395 self.assertEqual(tcp.sport, external_port)
4396 self.assertEqual(tcp.dport, 12345)
4397 self.assert_packet_checksums_valid(p)
4399 self.logger.error(ppp("Unexpected or invalid packet:", p))
4402 def test_static_with_port_out2(self):
4403 """ 1:1 NAPT asymmetrical rule """
4408 self.vapi.nat44_forwarding_enable_disable(1)
4409 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4410 local_port, external_port,
4411 proto=IP_PROTOS.tcp, out2in_only=1)
4412 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4413 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4416 # from client to service
4417 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4418 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4419 TCP(sport=12345, dport=external_port))
4420 self.pg1.add_stream(p)
4421 self.pg_enable_capture(self.pg_interfaces)
4423 capture = self.pg0.get_capture(1)
4428 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4429 self.assertEqual(tcp.dport, local_port)
4430 self.assert_packet_checksums_valid(p)
4432 self.logger.error(ppp("Unexpected or invalid packet:", p))
4436 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4437 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4438 ICMP(type=11) / capture[0][IP])
4439 self.pg0.add_stream(p)
4440 self.pg_enable_capture(self.pg_interfaces)
4442 capture = self.pg1.get_capture(1)
4445 self.assertEqual(p[IP].src, self.nat_addr)
4447 self.assertEqual(inner.dst, self.nat_addr)
4448 self.assertEqual(inner[TCPerror].dport, external_port)
4450 self.logger.error(ppp("Unexpected or invalid packet:", p))
4453 # from service back to client
4454 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4455 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4456 TCP(sport=local_port, dport=12345))
4457 self.pg0.add_stream(p)
4458 self.pg_enable_capture(self.pg_interfaces)
4460 capture = self.pg1.get_capture(1)
4465 self.assertEqual(ip.src, self.nat_addr)
4466 self.assertEqual(tcp.sport, external_port)
4467 self.assert_packet_checksums_valid(p)
4469 self.logger.error(ppp("Unexpected or invalid packet:", p))
4473 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4474 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4475 ICMP(type=11) / capture[0][IP])
4476 self.pg1.add_stream(p)
4477 self.pg_enable_capture(self.pg_interfaces)
4479 capture = self.pg0.get_capture(1)
4482 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4484 self.assertEqual(inner.src, self.pg0.remote_ip4)
4485 self.assertEqual(inner[TCPerror].sport, local_port)
4487 self.logger.error(ppp("Unexpected or invalid packet:", p))
4490 # from client to server (no translation)
4491 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4492 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4493 TCP(sport=12346, dport=local_port))
4494 self.pg1.add_stream(p)
4495 self.pg_enable_capture(self.pg_interfaces)
4497 capture = self.pg0.get_capture(1)
4502 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4503 self.assertEqual(tcp.dport, local_port)
4504 self.assert_packet_checksums_valid(p)
4506 self.logger.error(ppp("Unexpected or invalid packet:", p))
4509 # from service back to client (no translation)
4510 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4511 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4512 TCP(sport=local_port, dport=12346))
4513 self.pg0.add_stream(p)
4514 self.pg_enable_capture(self.pg_interfaces)
4516 capture = self.pg1.get_capture(1)
4521 self.assertEqual(ip.src, self.pg0.remote_ip4)
4522 self.assertEqual(tcp.sport, local_port)
4523 self.assert_packet_checksums_valid(p)
4525 self.logger.error(ppp("Unexpected or invalid packet:", p))
4528 def test_output_feature(self):
4529 """ NAT44 interface output feature (in2out postrouting) """
4530 self.vapi.nat44_forwarding_enable_disable(1)
4531 self.nat44_add_address(self.nat_addr)
4532 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4534 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4538 pkts = self.create_stream_in(self.pg0, self.pg1)
4539 self.pg0.add_stream(pkts)
4540 self.pg_enable_capture(self.pg_interfaces)
4542 capture = self.pg1.get_capture(len(pkts))
4543 self.verify_capture_out(capture)
4546 pkts = self.create_stream_out(self.pg1)
4547 self.pg1.add_stream(pkts)
4548 self.pg_enable_capture(self.pg_interfaces)
4550 capture = self.pg0.get_capture(len(pkts))
4551 self.verify_capture_in(capture, self.pg0)
4553 def test_multiple_vrf(self):
4554 """ Multiple VRF setup """
4555 external_addr = '1.2.3.4'
4560 self.vapi.nat44_forwarding_enable_disable(1)
4561 self.nat44_add_address(self.nat_addr)
4562 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4563 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4565 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4567 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4568 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4570 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4572 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4573 local_port, external_port, vrf_id=1,
4574 proto=IP_PROTOS.tcp, out2in_only=1)
4575 self.nat44_add_static_mapping(
4576 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4577 local_port=local_port, vrf_id=0, external_port=external_port,
4578 proto=IP_PROTOS.tcp, out2in_only=1)
4580 # from client to service (both VRF1)
4581 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4582 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4583 TCP(sport=12345, dport=external_port))
4584 self.pg6.add_stream(p)
4585 self.pg_enable_capture(self.pg_interfaces)
4587 capture = self.pg5.get_capture(1)
4592 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4593 self.assertEqual(tcp.dport, local_port)
4594 self.assert_packet_checksums_valid(p)
4596 self.logger.error(ppp("Unexpected or invalid packet:", p))
4599 # from service back to client (both VRF1)
4600 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4601 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4602 TCP(sport=local_port, dport=12345))
4603 self.pg5.add_stream(p)
4604 self.pg_enable_capture(self.pg_interfaces)
4606 capture = self.pg6.get_capture(1)
4611 self.assertEqual(ip.src, external_addr)
4612 self.assertEqual(tcp.sport, external_port)
4613 self.assert_packet_checksums_valid(p)
4615 self.logger.error(ppp("Unexpected or invalid packet:", p))
4618 # dynamic NAT from VRF1 to VRF0 (output-feature)
4619 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4620 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4621 TCP(sport=2345, dport=22))
4622 self.pg5.add_stream(p)
4623 self.pg_enable_capture(self.pg_interfaces)
4625 capture = self.pg1.get_capture(1)
4630 self.assertEqual(ip.src, self.nat_addr)
4631 self.assertNotEqual(tcp.sport, 2345)
4632 self.assert_packet_checksums_valid(p)
4635 self.logger.error(ppp("Unexpected or invalid packet:", p))
4638 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4639 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4640 TCP(sport=22, dport=port))
4641 self.pg1.add_stream(p)
4642 self.pg_enable_capture(self.pg_interfaces)
4644 capture = self.pg5.get_capture(1)
4649 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4650 self.assertEqual(tcp.dport, 2345)
4651 self.assert_packet_checksums_valid(p)
4653 self.logger.error(ppp("Unexpected or invalid packet:", p))
4656 # from client VRF1 to service VRF0
4657 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4658 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4659 TCP(sport=12346, dport=external_port))
4660 self.pg6.add_stream(p)
4661 self.pg_enable_capture(self.pg_interfaces)
4663 capture = self.pg0.get_capture(1)
4668 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4669 self.assertEqual(tcp.dport, local_port)
4670 self.assert_packet_checksums_valid(p)
4672 self.logger.error(ppp("Unexpected or invalid packet:", p))
4675 # from service VRF0 back to client VRF1
4676 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4677 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4678 TCP(sport=local_port, dport=12346))
4679 self.pg0.add_stream(p)
4680 self.pg_enable_capture(self.pg_interfaces)
4682 capture = self.pg6.get_capture(1)
4687 self.assertEqual(ip.src, self.pg0.local_ip4)
4688 self.assertEqual(tcp.sport, external_port)
4689 self.assert_packet_checksums_valid(p)
4691 self.logger.error(ppp("Unexpected or invalid packet:", p))
4694 # from client VRF0 to service VRF1
4695 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4696 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4697 TCP(sport=12347, dport=external_port))
4698 self.pg0.add_stream(p)
4699 self.pg_enable_capture(self.pg_interfaces)
4701 capture = self.pg5.get_capture(1)
4706 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4707 self.assertEqual(tcp.dport, local_port)
4708 self.assert_packet_checksums_valid(p)
4710 self.logger.error(ppp("Unexpected or invalid packet:", p))
4713 # from service VRF1 back to client VRF0
4714 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4715 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4716 TCP(sport=local_port, dport=12347))
4717 self.pg5.add_stream(p)
4718 self.pg_enable_capture(self.pg_interfaces)
4720 capture = self.pg0.get_capture(1)
4725 self.assertEqual(ip.src, external_addr)
4726 self.assertEqual(tcp.sport, external_port)
4727 self.assert_packet_checksums_valid(p)
4729 self.logger.error(ppp("Unexpected or invalid packet:", p))
4732 # from client to server (both VRF1, no translation)
4733 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4734 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4735 TCP(sport=12348, dport=local_port))
4736 self.pg6.add_stream(p)
4737 self.pg_enable_capture(self.pg_interfaces)
4739 capture = self.pg5.get_capture(1)
4744 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4745 self.assertEqual(tcp.dport, local_port)
4746 self.assert_packet_checksums_valid(p)
4748 self.logger.error(ppp("Unexpected or invalid packet:", p))
4751 # from server back to client (both VRF1, no translation)
4752 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4753 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4754 TCP(sport=local_port, dport=12348))
4755 self.pg5.add_stream(p)
4756 self.pg_enable_capture(self.pg_interfaces)
4758 capture = self.pg6.get_capture(1)
4763 self.assertEqual(ip.src, self.pg5.remote_ip4)
4764 self.assertEqual(tcp.sport, local_port)
4765 self.assert_packet_checksums_valid(p)
4767 self.logger.error(ppp("Unexpected or invalid packet:", p))
4770 # from client VRF1 to server VRF0 (no translation)
4771 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4772 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4773 TCP(sport=local_port, dport=12349))
4774 self.pg0.add_stream(p)
4775 self.pg_enable_capture(self.pg_interfaces)
4777 capture = self.pg6.get_capture(1)
4782 self.assertEqual(ip.src, self.pg0.remote_ip4)
4783 self.assertEqual(tcp.sport, local_port)
4784 self.assert_packet_checksums_valid(p)
4786 self.logger.error(ppp("Unexpected or invalid packet:", p))
4789 # from server VRF0 back to client VRF1 (no translation)
4790 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4791 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4792 TCP(sport=local_port, dport=12349))
4793 self.pg0.add_stream(p)
4794 self.pg_enable_capture(self.pg_interfaces)
4796 capture = self.pg6.get_capture(1)
4801 self.assertEqual(ip.src, self.pg0.remote_ip4)
4802 self.assertEqual(tcp.sport, local_port)
4803 self.assert_packet_checksums_valid(p)
4805 self.logger.error(ppp("Unexpected or invalid packet:", p))
4808 # from client VRF0 to server VRF1 (no translation)
4809 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4810 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4811 TCP(sport=12344, dport=local_port))
4812 self.pg0.add_stream(p)
4813 self.pg_enable_capture(self.pg_interfaces)
4815 capture = self.pg5.get_capture(1)
4820 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4821 self.assertEqual(tcp.dport, local_port)
4822 self.assert_packet_checksums_valid(p)
4824 self.logger.error(ppp("Unexpected or invalid packet:", p))
4827 # from server VRF1 back to client VRF0 (no translation)
4828 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4829 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4830 TCP(sport=local_port, dport=12344))
4831 self.pg5.add_stream(p)
4832 self.pg_enable_capture(self.pg_interfaces)
4834 capture = self.pg0.get_capture(1)
4839 self.assertEqual(ip.src, self.pg5.remote_ip4)
4840 self.assertEqual(tcp.sport, local_port)
4841 self.assert_packet_checksums_valid(p)
4843 self.logger.error(ppp("Unexpected or invalid packet:", p))
4847 super(TestNAT44EndpointDependent, self).tearDown()
4848 if not self.vpp_dead:
4849 self.logger.info(self.vapi.cli("show nat44 addresses"))
4850 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4851 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4852 self.logger.info(self.vapi.cli("show nat44 interface address"))
4853 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4854 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4856 self.vapi.cli("clear logging")
4859 class TestNAT44Out2InDPO(MethodHolder):
4860 """ NAT44 Test Cases using out2in DPO """
4863 def setUpConstants(cls):
4864 super(TestNAT44Out2InDPO, cls).setUpConstants()
4865 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4868 def setUpClass(cls):
4869 super(TestNAT44Out2InDPO, cls).setUpClass()
4870 cls.vapi.cli("set log class nat level debug")
4873 cls.tcp_port_in = 6303
4874 cls.tcp_port_out = 6303
4875 cls.udp_port_in = 6304
4876 cls.udp_port_out = 6304
4877 cls.icmp_id_in = 6305
4878 cls.icmp_id_out = 6305
4879 cls.nat_addr = '10.0.0.3'
4880 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4881 cls.dst_ip4 = '192.168.70.1'
4883 cls.create_pg_interfaces(range(2))
4886 cls.pg0.config_ip4()
4887 cls.pg0.resolve_arp()
4890 cls.pg1.config_ip6()
4891 cls.pg1.resolve_ndp()
4893 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4894 dst_address_length=0,
4895 next_hop_address=cls.pg1.remote_ip6n,
4896 next_hop_sw_if_index=cls.pg1.sw_if_index)
4899 super(TestNAT44Out2InDPO, cls).tearDownClass()
4902 def configure_xlat(self):
4903 self.dst_ip6_pfx = '1:2:3::'
4904 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4906 self.dst_ip6_pfx_len = 96
4907 self.src_ip6_pfx = '4:5:6::'
4908 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4910 self.src_ip6_pfx_len = 96
4911 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4912 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4913 '\x00\x00\x00\x00', 0, is_translation=1,
4916 def test_464xlat_ce(self):
4917 """ Test 464XLAT CE with NAT44 """
4919 self.configure_xlat()
4921 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4922 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4924 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4925 self.dst_ip6_pfx_len)
4926 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4927 self.src_ip6_pfx_len)
4930 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4931 self.pg0.add_stream(pkts)
4932 self.pg_enable_capture(self.pg_interfaces)
4934 capture = self.pg1.get_capture(len(pkts))
4935 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4938 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4940 self.pg1.add_stream(pkts)
4941 self.pg_enable_capture(self.pg_interfaces)
4943 capture = self.pg0.get_capture(len(pkts))
4944 self.verify_capture_in(capture, self.pg0)
4946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4948 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4949 self.nat_addr_n, is_add=0)
4951 def test_464xlat_ce_no_nat(self):
4952 """ Test 464XLAT CE without NAT44 """
4954 self.configure_xlat()
4956 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4957 self.dst_ip6_pfx_len)
4958 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4959 self.src_ip6_pfx_len)
4961 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4962 self.pg0.add_stream(pkts)
4963 self.pg_enable_capture(self.pg_interfaces)
4965 capture = self.pg1.get_capture(len(pkts))
4966 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4967 nat_ip=out_dst_ip6, same_port=True)
4969 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4970 self.pg1.add_stream(pkts)
4971 self.pg_enable_capture(self.pg_interfaces)
4973 capture = self.pg0.get_capture(len(pkts))
4974 self.verify_capture_in(capture, self.pg0)
4977 class TestDeterministicNAT(MethodHolder):
4978 """ Deterministic NAT Test Cases """
4981 def setUpConstants(cls):
4982 super(TestDeterministicNAT, cls).setUpConstants()
4983 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4986 def setUpClass(cls):
4987 super(TestDeterministicNAT, cls).setUpClass()
4988 cls.vapi.cli("set log class nat level debug")
4991 cls.tcp_port_in = 6303
4992 cls.tcp_external_port = 6303
4993 cls.udp_port_in = 6304
4994 cls.udp_external_port = 6304
4995 cls.icmp_id_in = 6305
4996 cls.nat_addr = '10.0.0.3'
4998 cls.create_pg_interfaces(range(3))
4999 cls.interfaces = list(cls.pg_interfaces)
5001 for i in cls.interfaces:
5006 cls.pg0.generate_remote_hosts(2)
5007 cls.pg0.configure_ipv4_neighbors()
5010 super(TestDeterministicNAT, cls).tearDownClass()
5013 def create_stream_in(self, in_if, out_if, ttl=64):
5015 Create packet stream for inside network
5017 :param in_if: Inside interface
5018 :param out_if: Outside interface
5019 :param ttl: TTL of generated packets
5023 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5024 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5025 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5029 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5030 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5031 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5035 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5036 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5037 ICMP(id=self.icmp_id_in, type='echo-request'))
5042 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5044 Create packet stream for outside network
5046 :param out_if: Outside interface
5047 :param dst_ip: Destination IP address (Default use global NAT address)
5048 :param ttl: TTL of generated packets
5051 dst_ip = self.nat_addr
5054 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5055 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5056 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5060 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5061 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5062 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5066 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5067 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5068 ICMP(id=self.icmp_external_id, type='echo-reply'))
5073 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5075 Verify captured packets on outside network
5077 :param capture: Captured packets
5078 :param nat_ip: Translated IP address (Default use global NAT address)
5079 :param same_port: Sorce port number is not translated (Default False)
5080 :param packet_num: Expected number of packets (Default 3)
5083 nat_ip = self.nat_addr
5084 self.assertEqual(packet_num, len(capture))
5085 for packet in capture:
5087 self.assertEqual(packet[IP].src, nat_ip)
5088 if packet.haslayer(TCP):
5089 self.tcp_port_out = packet[TCP].sport
5090 elif packet.haslayer(UDP):
5091 self.udp_port_out = packet[UDP].sport
5093 self.icmp_external_id = packet[ICMP].id
5095 self.logger.error(ppp("Unexpected or invalid packet "
5096 "(outside network):", packet))
5099 def verify_ipfix_max_entries_per_user(self, data):
5101 Verify IPFIX maximum entries per user exceeded event
5103 :param data: Decoded IPFIX data records
5105 self.assertEqual(1, len(data))
5108 self.assertEqual(ord(record[230]), 13)
5109 # natQuotaExceededEvent
5110 self.assertEqual('\x03\x00\x00\x00', record[466])
5112 self.assertEqual('\xe8\x03\x00\x00', record[473])
5114 self.assertEqual(self.pg0.remote_ip4n, record[8])
5116 def test_deterministic_mode(self):
5117 """ NAT plugin run deterministic mode """
5118 in_addr = '172.16.255.0'
5119 out_addr = '172.17.255.50'
5120 in_addr_t = '172.16.255.20'
5121 in_addr_n = socket.inet_aton(in_addr)
5122 out_addr_n = socket.inet_aton(out_addr)
5123 in_addr_t_n = socket.inet_aton(in_addr_t)
5127 nat_config = self.vapi.nat_show_config()
5128 self.assertEqual(1, nat_config.deterministic)
5130 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5132 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5133 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5134 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5135 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5137 deterministic_mappings = self.vapi.nat_det_map_dump()
5138 self.assertEqual(len(deterministic_mappings), 1)
5139 dsm = deterministic_mappings[0]
5140 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5141 self.assertEqual(in_plen, dsm.in_plen)
5142 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5143 self.assertEqual(out_plen, dsm.out_plen)
5145 self.clear_nat_det()
5146 deterministic_mappings = self.vapi.nat_det_map_dump()
5147 self.assertEqual(len(deterministic_mappings), 0)
5149 def test_set_timeouts(self):
5150 """ Set deterministic NAT timeouts """
5151 timeouts_before = self.vapi.nat_det_get_timeouts()
5153 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
5154 timeouts_before.tcp_established + 10,
5155 timeouts_before.tcp_transitory + 10,
5156 timeouts_before.icmp + 10)
5158 timeouts_after = self.vapi.nat_det_get_timeouts()
5160 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5161 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5162 self.assertNotEqual(timeouts_before.tcp_established,
5163 timeouts_after.tcp_established)
5164 self.assertNotEqual(timeouts_before.tcp_transitory,
5165 timeouts_after.tcp_transitory)
5167 def test_det_in(self):
5168 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5170 nat_ip = "10.0.0.10"
5172 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5174 socket.inet_aton(nat_ip),
5176 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5177 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5181 pkts = self.create_stream_in(self.pg0, self.pg1)
5182 self.pg0.add_stream(pkts)
5183 self.pg_enable_capture(self.pg_interfaces)
5185 capture = self.pg1.get_capture(len(pkts))
5186 self.verify_capture_out(capture, nat_ip)
5189 pkts = self.create_stream_out(self.pg1, nat_ip)
5190 self.pg1.add_stream(pkts)
5191 self.pg_enable_capture(self.pg_interfaces)
5193 capture = self.pg0.get_capture(len(pkts))
5194 self.verify_capture_in(capture, self.pg0)
5197 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5198 self.assertEqual(len(sessions), 3)
5202 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5203 self.assertEqual(s.in_port, self.tcp_port_in)
5204 self.assertEqual(s.out_port, self.tcp_port_out)
5205 self.assertEqual(s.ext_port, self.tcp_external_port)
5209 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5210 self.assertEqual(s.in_port, self.udp_port_in)
5211 self.assertEqual(s.out_port, self.udp_port_out)
5212 self.assertEqual(s.ext_port, self.udp_external_port)
5216 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5217 self.assertEqual(s.in_port, self.icmp_id_in)
5218 self.assertEqual(s.out_port, self.icmp_external_id)
5220 def test_multiple_users(self):
5221 """ Deterministic NAT multiple users """
5223 nat_ip = "10.0.0.10"
5225 external_port = 6303
5227 host0 = self.pg0.remote_hosts[0]
5228 host1 = self.pg0.remote_hosts[1]
5230 self.vapi.nat_det_add_del_map(host0.ip4n,
5232 socket.inet_aton(nat_ip),
5234 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5235 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5239 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5240 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5241 TCP(sport=port_in, dport=external_port))
5242 self.pg0.add_stream(p)
5243 self.pg_enable_capture(self.pg_interfaces)
5245 capture = self.pg1.get_capture(1)
5250 self.assertEqual(ip.src, nat_ip)
5251 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5252 self.assertEqual(tcp.dport, external_port)
5253 port_out0 = tcp.sport
5255 self.logger.error(ppp("Unexpected or invalid packet:", p))
5259 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5260 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5261 TCP(sport=port_in, dport=external_port))
5262 self.pg0.add_stream(p)
5263 self.pg_enable_capture(self.pg_interfaces)
5265 capture = self.pg1.get_capture(1)
5270 self.assertEqual(ip.src, nat_ip)
5271 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5272 self.assertEqual(tcp.dport, external_port)
5273 port_out1 = tcp.sport
5275 self.logger.error(ppp("Unexpected or invalid packet:", p))
5278 dms = self.vapi.nat_det_map_dump()
5279 self.assertEqual(1, len(dms))
5280 self.assertEqual(2, dms[0].ses_num)
5283 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5284 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5285 TCP(sport=external_port, dport=port_out0))
5286 self.pg1.add_stream(p)
5287 self.pg_enable_capture(self.pg_interfaces)
5289 capture = self.pg0.get_capture(1)
5294 self.assertEqual(ip.src, self.pg1.remote_ip4)
5295 self.assertEqual(ip.dst, host0.ip4)
5296 self.assertEqual(tcp.dport, port_in)
5297 self.assertEqual(tcp.sport, external_port)
5299 self.logger.error(ppp("Unexpected or invalid packet:", p))
5303 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5304 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5305 TCP(sport=external_port, dport=port_out1))
5306 self.pg1.add_stream(p)
5307 self.pg_enable_capture(self.pg_interfaces)
5309 capture = self.pg0.get_capture(1)
5314 self.assertEqual(ip.src, self.pg1.remote_ip4)
5315 self.assertEqual(ip.dst, host1.ip4)
5316 self.assertEqual(tcp.dport, port_in)
5317 self.assertEqual(tcp.sport, external_port)
5319 self.logger.error(ppp("Unexpected or invalid packet", p))
5322 # session close api test
5323 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5325 self.pg1.remote_ip4n,
5327 dms = self.vapi.nat_det_map_dump()
5328 self.assertEqual(dms[0].ses_num, 1)
5330 self.vapi.nat_det_close_session_in(host0.ip4n,
5332 self.pg1.remote_ip4n,
5334 dms = self.vapi.nat_det_map_dump()
5335 self.assertEqual(dms[0].ses_num, 0)
5337 def test_tcp_session_close_detection_in(self):
5338 """ Deterministic NAT TCP session close from inside network """
5339 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5341 socket.inet_aton(self.nat_addr),
5343 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5344 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5347 self.initiate_tcp_session(self.pg0, self.pg1)
5349 # close the session from inside
5351 # FIN packet in -> out
5352 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5353 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5354 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5356 self.pg0.add_stream(p)
5357 self.pg_enable_capture(self.pg_interfaces)
5359 self.pg1.get_capture(1)
5363 # ACK packet out -> in
5364 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5365 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5366 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5370 # FIN packet out -> in
5371 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5372 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5373 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5377 self.pg1.add_stream(pkts)
5378 self.pg_enable_capture(self.pg_interfaces)
5380 self.pg0.get_capture(2)
5382 # ACK packet in -> out
5383 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5384 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5385 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5387 self.pg0.add_stream(p)
5388 self.pg_enable_capture(self.pg_interfaces)
5390 self.pg1.get_capture(1)
5392 # Check if deterministic NAT44 closed the session
5393 dms = self.vapi.nat_det_map_dump()
5394 self.assertEqual(0, dms[0].ses_num)
5396 self.logger.error("TCP session termination failed")
5399 def test_tcp_session_close_detection_out(self):
5400 """ Deterministic NAT TCP session close from outside network """
5401 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5403 socket.inet_aton(self.nat_addr),
5405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5409 self.initiate_tcp_session(self.pg0, self.pg1)
5411 # close the session from outside
5413 # FIN packet out -> in
5414 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5415 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5416 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5418 self.pg1.add_stream(p)
5419 self.pg_enable_capture(self.pg_interfaces)
5421 self.pg0.get_capture(1)
5425 # ACK packet in -> out
5426 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5427 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5428 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5432 # ACK packet in -> out
5433 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5434 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5435 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5439 self.pg0.add_stream(pkts)
5440 self.pg_enable_capture(self.pg_interfaces)
5442 self.pg1.get_capture(2)
5444 # ACK packet out -> in
5445 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5446 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5447 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5449 self.pg1.add_stream(p)
5450 self.pg_enable_capture(self.pg_interfaces)
5452 self.pg0.get_capture(1)
5454 # Check if deterministic NAT44 closed the session
5455 dms = self.vapi.nat_det_map_dump()
5456 self.assertEqual(0, dms[0].ses_num)
5458 self.logger.error("TCP session termination failed")
5461 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5462 def test_session_timeout(self):
5463 """ Deterministic NAT session timeouts """
5464 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5466 socket.inet_aton(self.nat_addr),
5468 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5469 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5472 self.initiate_tcp_session(self.pg0, self.pg1)
5473 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5474 pkts = self.create_stream_in(self.pg0, self.pg1)
5475 self.pg0.add_stream(pkts)
5476 self.pg_enable_capture(self.pg_interfaces)
5478 capture = self.pg1.get_capture(len(pkts))
5481 dms = self.vapi.nat_det_map_dump()
5482 self.assertEqual(0, dms[0].ses_num)
5484 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5485 def test_session_limit_per_user(self):
5486 """ Deterministic NAT maximum sessions per user limit """
5487 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5489 socket.inet_aton(self.nat_addr),
5491 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5492 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5494 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5495 src_address=self.pg2.local_ip4n,
5497 template_interval=10)
5498 self.vapi.nat_ipfix()
5501 for port in range(1025, 2025):
5502 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5503 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5504 UDP(sport=port, dport=port))
5507 self.pg0.add_stream(pkts)
5508 self.pg_enable_capture(self.pg_interfaces)
5510 capture = self.pg1.get_capture(len(pkts))
5512 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5513 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5514 UDP(sport=3001, dport=3002))
5515 self.pg0.add_stream(p)
5516 self.pg_enable_capture(self.pg_interfaces)
5518 capture = self.pg1.assert_nothing_captured()
5520 # verify ICMP error packet
5521 capture = self.pg0.get_capture(1)
5523 self.assertTrue(p.haslayer(ICMP))
5525 self.assertEqual(icmp.type, 3)
5526 self.assertEqual(icmp.code, 1)
5527 self.assertTrue(icmp.haslayer(IPerror))
5528 inner_ip = icmp[IPerror]
5529 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5530 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5532 dms = self.vapi.nat_det_map_dump()
5534 self.assertEqual(1000, dms[0].ses_num)
5536 # verify IPFIX logging
5537 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5539 capture = self.pg2.get_capture(2)
5540 ipfix = IPFIXDecoder()
5541 # first load template
5543 self.assertTrue(p.haslayer(IPFIX))
5544 if p.haslayer(Template):
5545 ipfix.add_template(p.getlayer(Template))
5546 # verify events in data set
5548 if p.haslayer(Data):
5549 data = ipfix.decode_data_set(p.getlayer(Set))
5550 self.verify_ipfix_max_entries_per_user(data)
5552 def clear_nat_det(self):
5554 Clear deterministic NAT configuration.
5556 self.vapi.nat_ipfix(enable=0)
5557 self.vapi.nat_det_set_timeouts()
5558 deterministic_mappings = self.vapi.nat_det_map_dump()
5559 for dsm in deterministic_mappings:
5560 self.vapi.nat_det_add_del_map(dsm.in_addr,
5566 interfaces = self.vapi.nat44_interface_dump()
5567 for intf in interfaces:
5568 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5573 super(TestDeterministicNAT, self).tearDown()
5574 if not self.vpp_dead:
5575 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5577 self.vapi.cli("show nat44 deterministic mappings"))
5579 self.vapi.cli("show nat44 deterministic timeouts"))
5581 self.vapi.cli("show nat44 deterministic sessions"))
5582 self.clear_nat_det()
5585 class TestNAT64(MethodHolder):
5586 """ NAT64 Test Cases """
5589 def setUpConstants(cls):
5590 super(TestNAT64, cls).setUpConstants()
5591 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5592 "nat64 st hash buckets 256", "}"])
5595 def setUpClass(cls):
5596 super(TestNAT64, cls).setUpClass()
5599 cls.tcp_port_in = 6303
5600 cls.tcp_port_out = 6303
5601 cls.udp_port_in = 6304
5602 cls.udp_port_out = 6304
5603 cls.icmp_id_in = 6305
5604 cls.icmp_id_out = 6305
5605 cls.nat_addr = '10.0.0.3'
5606 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5608 cls.vrf1_nat_addr = '10.0.10.3'
5609 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5611 cls.ipfix_src_port = 4739
5612 cls.ipfix_domain_id = 1
5614 cls.create_pg_interfaces(range(6))
5615 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5616 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5617 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5619 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5621 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5623 cls.pg0.generate_remote_hosts(2)
5625 for i in cls.ip6_interfaces:
5628 i.configure_ipv6_neighbors()
5630 for i in cls.ip4_interfaces:
5636 cls.pg3.config_ip4()
5637 cls.pg3.resolve_arp()
5638 cls.pg3.config_ip6()
5639 cls.pg3.configure_ipv6_neighbors()
5642 cls.pg5.config_ip6()
5645 super(TestNAT64, cls).tearDownClass()
5648 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5649 """ NAT64 inside interface handles Neighbor Advertisement """
5651 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5654 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5655 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5656 ICMPv6EchoRequest())
5658 self.pg5.add_stream(pkts)
5659 self.pg_enable_capture(self.pg_interfaces)
5662 # Wait for Neighbor Solicitation
5663 capture = self.pg5.get_capture(len(pkts))
5664 self.assertEqual(1, len(capture))
5667 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5668 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5669 tgt = packet[ICMPv6ND_NS].tgt
5671 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5674 # Send Neighbor Advertisement
5675 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5676 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5677 ICMPv6ND_NA(tgt=tgt) /
5678 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5680 self.pg5.add_stream(pkts)
5681 self.pg_enable_capture(self.pg_interfaces)
5684 # Try to send ping again
5686 self.pg5.add_stream(pkts)
5687 self.pg_enable_capture(self.pg_interfaces)
5690 # Wait for ping reply
5691 capture = self.pg5.get_capture(len(pkts))
5692 self.assertEqual(1, len(capture))
5695 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5696 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5697 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5699 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5702 def test_pool(self):
5703 """ Add/delete address to NAT64 pool """
5704 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5706 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5708 addresses = self.vapi.nat64_pool_addr_dump()
5709 self.assertEqual(len(addresses), 1)
5710 self.assertEqual(addresses[0].address, nat_addr)
5712 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5714 addresses = self.vapi.nat64_pool_addr_dump()
5715 self.assertEqual(len(addresses), 0)
5717 def test_interface(self):
5718 """ Enable/disable NAT64 feature on the interface """
5719 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5720 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5722 interfaces = self.vapi.nat64_interface_dump()
5723 self.assertEqual(len(interfaces), 2)
5726 for intf in interfaces:
5727 if intf.sw_if_index == self.pg0.sw_if_index:
5728 self.assertEqual(intf.is_inside, 1)
5730 elif intf.sw_if_index == self.pg1.sw_if_index:
5731 self.assertEqual(intf.is_inside, 0)
5733 self.assertTrue(pg0_found)
5734 self.assertTrue(pg1_found)
5736 features = self.vapi.cli("show interface features pg0")
5737 self.assertNotEqual(features.find('nat64-in2out'), -1)
5738 features = self.vapi.cli("show interface features pg1")
5739 self.assertNotEqual(features.find('nat64-out2in'), -1)
5741 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5742 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5744 interfaces = self.vapi.nat64_interface_dump()
5745 self.assertEqual(len(interfaces), 0)
5747 def test_static_bib(self):
5748 """ Add/delete static BIB entry """
5749 in_addr = socket.inet_pton(socket.AF_INET6,
5750 '2001:db8:85a3::8a2e:370:7334')
5751 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5754 proto = IP_PROTOS.tcp
5756 self.vapi.nat64_add_del_static_bib(in_addr,
5761 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5766 self.assertEqual(bibe.i_addr, in_addr)
5767 self.assertEqual(bibe.o_addr, out_addr)
5768 self.assertEqual(bibe.i_port, in_port)
5769 self.assertEqual(bibe.o_port, out_port)
5770 self.assertEqual(static_bib_num, 1)
5772 self.vapi.nat64_add_del_static_bib(in_addr,
5778 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5783 self.assertEqual(static_bib_num, 0)
5785 def test_set_timeouts(self):
5786 """ Set NAT64 timeouts """
5787 # verify default values
5788 timeouts = self.vapi.nat64_get_timeouts()
5789 self.assertEqual(timeouts.udp, 300)
5790 self.assertEqual(timeouts.icmp, 60)
5791 self.assertEqual(timeouts.tcp_trans, 240)
5792 self.assertEqual(timeouts.tcp_est, 7440)
5793 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5795 # set and verify custom values
5796 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5797 tcp_est=7450, tcp_incoming_syn=10)
5798 timeouts = self.vapi.nat64_get_timeouts()
5799 self.assertEqual(timeouts.udp, 200)
5800 self.assertEqual(timeouts.icmp, 30)
5801 self.assertEqual(timeouts.tcp_trans, 250)
5802 self.assertEqual(timeouts.tcp_est, 7450)
5803 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5805 def test_dynamic(self):
5806 """ NAT64 dynamic translation test """
5807 self.tcp_port_in = 6303
5808 self.udp_port_in = 6304
5809 self.icmp_id_in = 6305
5811 ses_num_start = self.nat64_get_ses_num()
5813 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5815 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5816 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5819 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5820 self.pg0.add_stream(pkts)
5821 self.pg_enable_capture(self.pg_interfaces)
5823 capture = self.pg1.get_capture(len(pkts))
5824 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5825 dst_ip=self.pg1.remote_ip4)
5828 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5829 self.pg1.add_stream(pkts)
5830 self.pg_enable_capture(self.pg_interfaces)
5832 capture = self.pg0.get_capture(len(pkts))
5833 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5834 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5837 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5838 self.pg0.add_stream(pkts)
5839 self.pg_enable_capture(self.pg_interfaces)
5841 capture = self.pg1.get_capture(len(pkts))
5842 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5843 dst_ip=self.pg1.remote_ip4)
5846 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5847 self.pg1.add_stream(pkts)
5848 self.pg_enable_capture(self.pg_interfaces)
5850 capture = self.pg0.get_capture(len(pkts))
5851 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5853 ses_num_end = self.nat64_get_ses_num()
5855 self.assertEqual(ses_num_end - ses_num_start, 3)
5857 # tenant with specific VRF
5858 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5859 self.vrf1_nat_addr_n,
5860 vrf_id=self.vrf1_id)
5861 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5863 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5864 self.pg2.add_stream(pkts)
5865 self.pg_enable_capture(self.pg_interfaces)
5867 capture = self.pg1.get_capture(len(pkts))
5868 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5869 dst_ip=self.pg1.remote_ip4)
5871 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5872 self.pg1.add_stream(pkts)
5873 self.pg_enable_capture(self.pg_interfaces)
5875 capture = self.pg2.get_capture(len(pkts))
5876 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5878 def test_static(self):
5879 """ NAT64 static translation test """
5880 self.tcp_port_in = 60303
5881 self.udp_port_in = 60304
5882 self.icmp_id_in = 60305
5883 self.tcp_port_out = 60303
5884 self.udp_port_out = 60304
5885 self.icmp_id_out = 60305
5887 ses_num_start = self.nat64_get_ses_num()
5889 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5891 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5892 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5894 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5899 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5904 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5911 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5912 self.pg0.add_stream(pkts)
5913 self.pg_enable_capture(self.pg_interfaces)
5915 capture = self.pg1.get_capture(len(pkts))
5916 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5917 dst_ip=self.pg1.remote_ip4, same_port=True)
5920 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5921 self.pg1.add_stream(pkts)
5922 self.pg_enable_capture(self.pg_interfaces)
5924 capture = self.pg0.get_capture(len(pkts))
5925 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5926 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5928 ses_num_end = self.nat64_get_ses_num()
5930 self.assertEqual(ses_num_end - ses_num_start, 3)
5932 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5933 def test_session_timeout(self):
5934 """ NAT64 session timeout """
5935 self.icmp_id_in = 1234
5936 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5938 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5939 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5940 self.vapi.nat64_set_timeouts(icmp=5)
5942 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5943 self.pg0.add_stream(pkts)
5944 self.pg_enable_capture(self.pg_interfaces)
5946 capture = self.pg1.get_capture(len(pkts))
5948 ses_num_before_timeout = self.nat64_get_ses_num()
5952 # ICMP session after timeout
5953 ses_num_after_timeout = self.nat64_get_ses_num()
5954 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5956 def test_icmp_error(self):
5957 """ NAT64 ICMP Error message translation """
5958 self.tcp_port_in = 6303
5959 self.udp_port_in = 6304
5960 self.icmp_id_in = 6305
5962 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5964 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5965 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5967 # send some packets to create sessions
5968 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5969 self.pg0.add_stream(pkts)
5970 self.pg_enable_capture(self.pg_interfaces)
5972 capture_ip4 = self.pg1.get_capture(len(pkts))
5973 self.verify_capture_out(capture_ip4,
5974 nat_ip=self.nat_addr,
5975 dst_ip=self.pg1.remote_ip4)
5977 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5978 self.pg1.add_stream(pkts)
5979 self.pg_enable_capture(self.pg_interfaces)
5981 capture_ip6 = self.pg0.get_capture(len(pkts))
5982 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5983 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5984 self.pg0.remote_ip6)
5987 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5988 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5989 ICMPv6DestUnreach(code=1) /
5990 packet[IPv6] for packet in capture_ip6]
5991 self.pg0.add_stream(pkts)
5992 self.pg_enable_capture(self.pg_interfaces)
5994 capture = self.pg1.get_capture(len(pkts))
5995 for packet in capture:
5997 self.assertEqual(packet[IP].src, self.nat_addr)
5998 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5999 self.assertEqual(packet[ICMP].type, 3)
6000 self.assertEqual(packet[ICMP].code, 13)
6001 inner = packet[IPerror]
6002 self.assertEqual(inner.src, self.pg1.remote_ip4)
6003 self.assertEqual(inner.dst, self.nat_addr)
6004 self.assert_packet_checksums_valid(packet)
6005 if inner.haslayer(TCPerror):
6006 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6007 elif inner.haslayer(UDPerror):
6008 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6010 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6012 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6016 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6017 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6018 ICMP(type=3, code=13) /
6019 packet[IP] for packet in capture_ip4]
6020 self.pg1.add_stream(pkts)
6021 self.pg_enable_capture(self.pg_interfaces)
6023 capture = self.pg0.get_capture(len(pkts))
6024 for packet in capture:
6026 self.assertEqual(packet[IPv6].src, ip.src)
6027 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6028 icmp = packet[ICMPv6DestUnreach]
6029 self.assertEqual(icmp.code, 1)
6030 inner = icmp[IPerror6]
6031 self.assertEqual(inner.src, self.pg0.remote_ip6)
6032 self.assertEqual(inner.dst, ip.src)
6033 self.assert_icmpv6_checksum_valid(packet)
6034 if inner.haslayer(TCPerror):
6035 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6036 elif inner.haslayer(UDPerror):
6037 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6039 self.assertEqual(inner[ICMPv6EchoRequest].id,
6042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6045 def test_hairpinning(self):
6046 """ NAT64 hairpinning """
6048 client = self.pg0.remote_hosts[0]
6049 server = self.pg0.remote_hosts[1]
6050 server_tcp_in_port = 22
6051 server_tcp_out_port = 4022
6052 server_udp_in_port = 23
6053 server_udp_out_port = 4023
6054 client_tcp_in_port = 1234
6055 client_udp_in_port = 1235
6056 client_tcp_out_port = 0
6057 client_udp_out_port = 0
6058 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6059 nat_addr_ip6 = ip.src
6061 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6063 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6064 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6066 self.vapi.nat64_add_del_static_bib(server.ip6n,
6069 server_tcp_out_port,
6071 self.vapi.nat64_add_del_static_bib(server.ip6n,
6074 server_udp_out_port,
6079 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6080 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6081 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6083 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6084 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6085 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6087 self.pg0.add_stream(pkts)
6088 self.pg_enable_capture(self.pg_interfaces)
6090 capture = self.pg0.get_capture(len(pkts))
6091 for packet in capture:
6093 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6094 self.assertEqual(packet[IPv6].dst, server.ip6)
6095 self.assert_packet_checksums_valid(packet)
6096 if packet.haslayer(TCP):
6097 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6098 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6099 client_tcp_out_port = packet[TCP].sport
6101 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6102 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6103 client_udp_out_port = packet[UDP].sport
6105 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6110 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6111 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6112 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6114 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6115 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6116 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6118 self.pg0.add_stream(pkts)
6119 self.pg_enable_capture(self.pg_interfaces)
6121 capture = self.pg0.get_capture(len(pkts))
6122 for packet in capture:
6124 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6125 self.assertEqual(packet[IPv6].dst, client.ip6)
6126 self.assert_packet_checksums_valid(packet)
6127 if packet.haslayer(TCP):
6128 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6129 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6131 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6132 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6134 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6139 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6140 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6141 ICMPv6DestUnreach(code=1) /
6142 packet[IPv6] for packet in capture]
6143 self.pg0.add_stream(pkts)
6144 self.pg_enable_capture(self.pg_interfaces)
6146 capture = self.pg0.get_capture(len(pkts))
6147 for packet in capture:
6149 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6150 self.assertEqual(packet[IPv6].dst, server.ip6)
6151 icmp = packet[ICMPv6DestUnreach]
6152 self.assertEqual(icmp.code, 1)
6153 inner = icmp[IPerror6]
6154 self.assertEqual(inner.src, server.ip6)
6155 self.assertEqual(inner.dst, nat_addr_ip6)
6156 self.assert_packet_checksums_valid(packet)
6157 if inner.haslayer(TCPerror):
6158 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6159 self.assertEqual(inner[TCPerror].dport,
6160 client_tcp_out_port)
6162 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6163 self.assertEqual(inner[UDPerror].dport,
6164 client_udp_out_port)
6166 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6169 def test_prefix(self):
6170 """ NAT64 Network-Specific Prefix """
6172 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6174 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6175 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6176 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6177 self.vrf1_nat_addr_n,
6178 vrf_id=self.vrf1_id)
6179 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6182 global_pref64 = "2001:db8::"
6183 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6184 global_pref64_len = 32
6185 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6187 prefix = self.vapi.nat64_prefix_dump()
6188 self.assertEqual(len(prefix), 1)
6189 self.assertEqual(prefix[0].prefix, global_pref64_n)
6190 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6191 self.assertEqual(prefix[0].vrf_id, 0)
6193 # Add tenant specific prefix
6194 vrf1_pref64 = "2001:db8:122:300::"
6195 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6196 vrf1_pref64_len = 56
6197 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6199 vrf_id=self.vrf1_id)
6200 prefix = self.vapi.nat64_prefix_dump()
6201 self.assertEqual(len(prefix), 2)
6204 pkts = self.create_stream_in_ip6(self.pg0,
6207 plen=global_pref64_len)
6208 self.pg0.add_stream(pkts)
6209 self.pg_enable_capture(self.pg_interfaces)
6211 capture = self.pg1.get_capture(len(pkts))
6212 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6213 dst_ip=self.pg1.remote_ip4)
6215 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6216 self.pg1.add_stream(pkts)
6217 self.pg_enable_capture(self.pg_interfaces)
6219 capture = self.pg0.get_capture(len(pkts))
6220 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6223 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6225 # Tenant specific prefix
6226 pkts = self.create_stream_in_ip6(self.pg2,
6229 plen=vrf1_pref64_len)
6230 self.pg2.add_stream(pkts)
6231 self.pg_enable_capture(self.pg_interfaces)
6233 capture = self.pg1.get_capture(len(pkts))
6234 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6235 dst_ip=self.pg1.remote_ip4)
6237 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6238 self.pg1.add_stream(pkts)
6239 self.pg_enable_capture(self.pg_interfaces)
6241 capture = self.pg2.get_capture(len(pkts))
6242 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6245 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6247 def test_unknown_proto(self):
6248 """ NAT64 translate packet with unknown protocol """
6250 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6252 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6253 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6254 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6258 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6259 TCP(sport=self.tcp_port_in, dport=20))
6260 self.pg0.add_stream(p)
6261 self.pg_enable_capture(self.pg_interfaces)
6263 p = self.pg1.get_capture(1)
6265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6266 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6268 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6269 TCP(sport=1234, dport=1234))
6270 self.pg0.add_stream(p)
6271 self.pg_enable_capture(self.pg_interfaces)
6273 p = self.pg1.get_capture(1)
6276 self.assertEqual(packet[IP].src, self.nat_addr)
6277 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6278 self.assertTrue(packet.haslayer(GRE))
6279 self.assert_packet_checksums_valid(packet)
6281 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6285 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6286 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6288 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6289 TCP(sport=1234, dport=1234))
6290 self.pg1.add_stream(p)
6291 self.pg_enable_capture(self.pg_interfaces)
6293 p = self.pg0.get_capture(1)
6296 self.assertEqual(packet[IPv6].src, remote_ip6)
6297 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6298 self.assertEqual(packet[IPv6].nh, 47)
6300 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6303 def test_hairpinning_unknown_proto(self):
6304 """ NAT64 translate packet with unknown protocol - hairpinning """
6306 client = self.pg0.remote_hosts[0]
6307 server = self.pg0.remote_hosts[1]
6308 server_tcp_in_port = 22
6309 server_tcp_out_port = 4022
6310 client_tcp_in_port = 1234
6311 client_tcp_out_port = 1235
6312 server_nat_ip = "10.0.0.100"
6313 client_nat_ip = "10.0.0.110"
6314 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6315 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6316 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6317 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6319 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6321 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6322 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6324 self.vapi.nat64_add_del_static_bib(server.ip6n,
6327 server_tcp_out_port,
6330 self.vapi.nat64_add_del_static_bib(server.ip6n,
6336 self.vapi.nat64_add_del_static_bib(client.ip6n,
6339 client_tcp_out_port,
6343 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6344 IPv6(src=client.ip6, dst=server_nat_ip6) /
6345 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6346 self.pg0.add_stream(p)
6347 self.pg_enable_capture(self.pg_interfaces)
6349 p = self.pg0.get_capture(1)
6351 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6352 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6354 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6355 TCP(sport=1234, dport=1234))
6356 self.pg0.add_stream(p)
6357 self.pg_enable_capture(self.pg_interfaces)
6359 p = self.pg0.get_capture(1)
6362 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6363 self.assertEqual(packet[IPv6].dst, server.ip6)
6364 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6366 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6370 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6371 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6373 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6374 TCP(sport=1234, dport=1234))
6375 self.pg0.add_stream(p)
6376 self.pg_enable_capture(self.pg_interfaces)
6378 p = self.pg0.get_capture(1)
6381 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6382 self.assertEqual(packet[IPv6].dst, client.ip6)
6383 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6385 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6388 def test_one_armed_nat64(self):
6389 """ One armed NAT64 """
6391 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6395 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6397 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6398 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6401 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6402 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6403 TCP(sport=12345, dport=80))
6404 self.pg3.add_stream(p)
6405 self.pg_enable_capture(self.pg_interfaces)
6407 capture = self.pg3.get_capture(1)
6412 self.assertEqual(ip.src, self.nat_addr)
6413 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6414 self.assertNotEqual(tcp.sport, 12345)
6415 external_port = tcp.sport
6416 self.assertEqual(tcp.dport, 80)
6417 self.assert_packet_checksums_valid(p)
6419 self.logger.error(ppp("Unexpected or invalid packet:", p))
6423 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6424 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6425 TCP(sport=80, dport=external_port))
6426 self.pg3.add_stream(p)
6427 self.pg_enable_capture(self.pg_interfaces)
6429 capture = self.pg3.get_capture(1)
6434 self.assertEqual(ip.src, remote_host_ip6)
6435 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6436 self.assertEqual(tcp.sport, 80)
6437 self.assertEqual(tcp.dport, 12345)
6438 self.assert_packet_checksums_valid(p)
6440 self.logger.error(ppp("Unexpected or invalid packet:", p))
6443 def test_frag_in_order(self):
6444 """ NAT64 translate fragments arriving in order """
6445 self.tcp_port_in = random.randint(1025, 65535)
6447 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6449 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6450 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6452 reass = self.vapi.nat_reass_dump()
6453 reass_n_start = len(reass)
6457 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6458 self.tcp_port_in, 20, data)
6459 self.pg0.add_stream(pkts)
6460 self.pg_enable_capture(self.pg_interfaces)
6462 frags = self.pg1.get_capture(len(pkts))
6463 p = self.reass_frags_and_verify(frags,
6465 self.pg1.remote_ip4)
6466 self.assertEqual(p[TCP].dport, 20)
6467 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6468 self.tcp_port_out = p[TCP].sport
6469 self.assertEqual(data, p[Raw].load)
6472 data = "A" * 4 + "b" * 16 + "C" * 3
6473 pkts = self.create_stream_frag(self.pg1,
6478 self.pg1.add_stream(pkts)
6479 self.pg_enable_capture(self.pg_interfaces)
6481 frags = self.pg0.get_capture(len(pkts))
6482 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6483 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6484 self.assertEqual(p[TCP].sport, 20)
6485 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6486 self.assertEqual(data, p[Raw].load)
6488 reass = self.vapi.nat_reass_dump()
6489 reass_n_end = len(reass)
6491 self.assertEqual(reass_n_end - reass_n_start, 2)
6493 def test_reass_hairpinning(self):
6494 """ NAT64 fragments hairpinning """
6496 server = self.pg0.remote_hosts[1]
6497 server_in_port = random.randint(1025, 65535)
6498 server_out_port = random.randint(1025, 65535)
6499 client_in_port = random.randint(1025, 65535)
6500 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6501 nat_addr_ip6 = ip.src
6503 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6505 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6506 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6508 # add static BIB entry for server
6509 self.vapi.nat64_add_del_static_bib(server.ip6n,
6515 # send packet from host to server
6516 pkts = self.create_stream_frag_ip6(self.pg0,
6521 self.pg0.add_stream(pkts)
6522 self.pg_enable_capture(self.pg_interfaces)
6524 frags = self.pg0.get_capture(len(pkts))
6525 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6526 self.assertNotEqual(p[TCP].sport, client_in_port)
6527 self.assertEqual(p[TCP].dport, server_in_port)
6528 self.assertEqual(data, p[Raw].load)
6530 def test_frag_out_of_order(self):
6531 """ NAT64 translate fragments arriving out of order """
6532 self.tcp_port_in = random.randint(1025, 65535)
6534 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6536 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6537 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6541 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6542 self.tcp_port_in, 20, data)
6544 self.pg0.add_stream(pkts)
6545 self.pg_enable_capture(self.pg_interfaces)
6547 frags = self.pg1.get_capture(len(pkts))
6548 p = self.reass_frags_and_verify(frags,
6550 self.pg1.remote_ip4)
6551 self.assertEqual(p[TCP].dport, 20)
6552 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6553 self.tcp_port_out = p[TCP].sport
6554 self.assertEqual(data, p[Raw].load)
6557 data = "A" * 4 + "B" * 16 + "C" * 3
6558 pkts = self.create_stream_frag(self.pg1,
6564 self.pg1.add_stream(pkts)
6565 self.pg_enable_capture(self.pg_interfaces)
6567 frags = self.pg0.get_capture(len(pkts))
6568 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6569 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6570 self.assertEqual(p[TCP].sport, 20)
6571 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6572 self.assertEqual(data, p[Raw].load)
6574 def test_interface_addr(self):
6575 """ Acquire NAT64 pool addresses from interface """
6576 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6578 # no address in NAT64 pool
6579 adresses = self.vapi.nat44_address_dump()
6580 self.assertEqual(0, len(adresses))
6582 # configure interface address and check NAT64 address pool
6583 self.pg4.config_ip4()
6584 addresses = self.vapi.nat64_pool_addr_dump()
6585 self.assertEqual(len(addresses), 1)
6586 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6588 # remove interface address and check NAT64 address pool
6589 self.pg4.unconfig_ip4()
6590 addresses = self.vapi.nat64_pool_addr_dump()
6591 self.assertEqual(0, len(adresses))
6593 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6594 def test_ipfix_max_bibs_sessions(self):
6595 """ IPFIX logging maximum session and BIB entries exceeded """
6598 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6602 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6604 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6605 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6609 for i in range(0, max_bibs):
6610 src = "fd01:aa::%x" % (i)
6611 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6612 IPv6(src=src, dst=remote_host_ip6) /
6613 TCP(sport=12345, dport=80))
6615 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6616 IPv6(src=src, dst=remote_host_ip6) /
6617 TCP(sport=12345, dport=22))
6619 self.pg0.add_stream(pkts)
6620 self.pg_enable_capture(self.pg_interfaces)
6622 self.pg1.get_capture(max_sessions)
6624 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6625 src_address=self.pg3.local_ip4n,
6627 template_interval=10)
6628 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6629 src_port=self.ipfix_src_port)
6631 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6632 IPv6(src=src, dst=remote_host_ip6) /
6633 TCP(sport=12345, dport=25))
6634 self.pg0.add_stream(p)
6635 self.pg_enable_capture(self.pg_interfaces)
6637 self.pg1.assert_nothing_captured()
6639 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6640 capture = self.pg3.get_capture(9)
6641 ipfix = IPFIXDecoder()
6642 # first load template
6644 self.assertTrue(p.haslayer(IPFIX))
6645 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6646 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6647 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6648 self.assertEqual(p[UDP].dport, 4739)
6649 self.assertEqual(p[IPFIX].observationDomainID,
6650 self.ipfix_domain_id)
6651 if p.haslayer(Template):
6652 ipfix.add_template(p.getlayer(Template))
6653 # verify events in data set
6655 if p.haslayer(Data):
6656 data = ipfix.decode_data_set(p.getlayer(Set))
6657 self.verify_ipfix_max_sessions(data, max_sessions)
6659 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6660 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6661 TCP(sport=12345, dport=80))
6662 self.pg0.add_stream(p)
6663 self.pg_enable_capture(self.pg_interfaces)
6665 self.pg1.assert_nothing_captured()
6667 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6668 capture = self.pg3.get_capture(1)
6669 # verify events in data set
6671 self.assertTrue(p.haslayer(IPFIX))
6672 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6673 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6674 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6675 self.assertEqual(p[UDP].dport, 4739)
6676 self.assertEqual(p[IPFIX].observationDomainID,
6677 self.ipfix_domain_id)
6678 if p.haslayer(Data):
6679 data = ipfix.decode_data_set(p.getlayer(Set))
6680 self.verify_ipfix_max_bibs(data, max_bibs)
6682 def test_ipfix_max_frags(self):
6683 """ IPFIX logging maximum fragments pending reassembly exceeded """
6684 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6686 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6687 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6688 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6689 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6690 src_address=self.pg3.local_ip4n,
6692 template_interval=10)
6693 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6694 src_port=self.ipfix_src_port)
6697 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6698 self.tcp_port_in, 20, data)
6699 self.pg0.add_stream(pkts[-1])
6700 self.pg_enable_capture(self.pg_interfaces)
6702 self.pg1.assert_nothing_captured()
6704 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6705 capture = self.pg3.get_capture(9)
6706 ipfix = IPFIXDecoder()
6707 # first load template
6709 self.assertTrue(p.haslayer(IPFIX))
6710 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6711 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6712 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6713 self.assertEqual(p[UDP].dport, 4739)
6714 self.assertEqual(p[IPFIX].observationDomainID,
6715 self.ipfix_domain_id)
6716 if p.haslayer(Template):
6717 ipfix.add_template(p.getlayer(Template))
6718 # verify events in data set
6720 if p.haslayer(Data):
6721 data = ipfix.decode_data_set(p.getlayer(Set))
6722 self.verify_ipfix_max_fragments_ip6(data, 0,
6723 self.pg0.remote_ip6n)
6725 def test_ipfix_bib_ses(self):
6726 """ IPFIX logging NAT64 BIB/session create and delete events """
6727 self.tcp_port_in = random.randint(1025, 65535)
6728 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6732 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6734 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6735 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6736 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6737 src_address=self.pg3.local_ip4n,
6739 template_interval=10)
6740 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6741 src_port=self.ipfix_src_port)
6744 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6745 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6746 TCP(sport=self.tcp_port_in, dport=25))
6747 self.pg0.add_stream(p)
6748 self.pg_enable_capture(self.pg_interfaces)
6750 p = self.pg1.get_capture(1)
6751 self.tcp_port_out = p[0][TCP].sport
6752 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6753 capture = self.pg3.get_capture(10)
6754 ipfix = IPFIXDecoder()
6755 # first load template
6757 self.assertTrue(p.haslayer(IPFIX))
6758 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6759 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6760 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6761 self.assertEqual(p[UDP].dport, 4739)
6762 self.assertEqual(p[IPFIX].observationDomainID,
6763 self.ipfix_domain_id)
6764 if p.haslayer(Template):
6765 ipfix.add_template(p.getlayer(Template))
6766 # verify events in data set
6768 if p.haslayer(Data):
6769 data = ipfix.decode_data_set(p.getlayer(Set))
6770 if ord(data[0][230]) == 10:
6771 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6772 elif ord(data[0][230]) == 6:
6773 self.verify_ipfix_nat64_ses(data,
6775 self.pg0.remote_ip6n,
6776 self.pg1.remote_ip4,
6779 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6782 self.pg_enable_capture(self.pg_interfaces)
6783 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6786 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6787 capture = self.pg3.get_capture(2)
6788 # verify events in data set
6790 self.assertTrue(p.haslayer(IPFIX))
6791 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6792 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6793 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6794 self.assertEqual(p[UDP].dport, 4739)
6795 self.assertEqual(p[IPFIX].observationDomainID,
6796 self.ipfix_domain_id)
6797 if p.haslayer(Data):
6798 data = ipfix.decode_data_set(p.getlayer(Set))
6799 if ord(data[0][230]) == 11:
6800 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6801 elif ord(data[0][230]) == 7:
6802 self.verify_ipfix_nat64_ses(data,
6804 self.pg0.remote_ip6n,
6805 self.pg1.remote_ip4,
6808 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6810 def nat64_get_ses_num(self):
6812 Return number of active NAT64 sessions.
6814 st = self.vapi.nat64_st_dump()
6817 def clear_nat64(self):
6819 Clear NAT64 configuration.
6821 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6822 domain_id=self.ipfix_domain_id)
6823 self.ipfix_src_port = 4739
6824 self.ipfix_domain_id = 1
6826 self.vapi.nat64_set_timeouts()
6828 interfaces = self.vapi.nat64_interface_dump()
6829 for intf in interfaces:
6830 if intf.is_inside > 1:
6831 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6834 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6838 bib = self.vapi.nat64_bib_dump(255)
6841 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6849 adresses = self.vapi.nat64_pool_addr_dump()
6850 for addr in adresses:
6851 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6856 prefixes = self.vapi.nat64_prefix_dump()
6857 for prefix in prefixes:
6858 self.vapi.nat64_add_del_prefix(prefix.prefix,
6860 vrf_id=prefix.vrf_id,
6864 super(TestNAT64, self).tearDown()
6865 if not self.vpp_dead:
6866 self.logger.info(self.vapi.cli("show nat64 pool"))
6867 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6868 self.logger.info(self.vapi.cli("show nat64 prefix"))
6869 self.logger.info(self.vapi.cli("show nat64 bib all"))
6870 self.logger.info(self.vapi.cli("show nat64 session table all"))
6871 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6875 class TestDSlite(MethodHolder):
6876 """ DS-Lite Test Cases """
6879 def setUpClass(cls):
6880 super(TestDSlite, cls).setUpClass()
6883 cls.nat_addr = '10.0.0.3'
6884 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6886 cls.create_pg_interfaces(range(2))
6888 cls.pg0.config_ip4()
6889 cls.pg0.resolve_arp()
6891 cls.pg1.config_ip6()
6892 cls.pg1.generate_remote_hosts(2)
6893 cls.pg1.configure_ipv6_neighbors()
6896 super(TestDSlite, cls).tearDownClass()
6899 def test_dslite(self):
6900 """ Test DS-Lite """
6901 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6903 aftr_ip4 = '192.0.0.1'
6904 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6905 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6906 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6907 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6910 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6911 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6912 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6913 UDP(sport=20000, dport=10000))
6914 self.pg1.add_stream(p)
6915 self.pg_enable_capture(self.pg_interfaces)
6917 capture = self.pg0.get_capture(1)
6918 capture = capture[0]
6919 self.assertFalse(capture.haslayer(IPv6))
6920 self.assertEqual(capture[IP].src, self.nat_addr)
6921 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6922 self.assertNotEqual(capture[UDP].sport, 20000)
6923 self.assertEqual(capture[UDP].dport, 10000)
6924 self.assert_packet_checksums_valid(capture)
6925 out_port = capture[UDP].sport
6927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6928 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6929 UDP(sport=10000, dport=out_port))
6930 self.pg0.add_stream(p)
6931 self.pg_enable_capture(self.pg_interfaces)
6933 capture = self.pg1.get_capture(1)
6934 capture = capture[0]
6935 self.assertEqual(capture[IPv6].src, aftr_ip6)
6936 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6937 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6938 self.assertEqual(capture[IP].dst, '192.168.1.1')
6939 self.assertEqual(capture[UDP].sport, 10000)
6940 self.assertEqual(capture[UDP].dport, 20000)
6941 self.assert_packet_checksums_valid(capture)
6944 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6945 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6946 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6947 TCP(sport=20001, dport=10001))
6948 self.pg1.add_stream(p)
6949 self.pg_enable_capture(self.pg_interfaces)
6951 capture = self.pg0.get_capture(1)
6952 capture = capture[0]
6953 self.assertFalse(capture.haslayer(IPv6))
6954 self.assertEqual(capture[IP].src, self.nat_addr)
6955 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6956 self.assertNotEqual(capture[TCP].sport, 20001)
6957 self.assertEqual(capture[TCP].dport, 10001)
6958 self.assert_packet_checksums_valid(capture)
6959 out_port = capture[TCP].sport
6961 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6962 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6963 TCP(sport=10001, dport=out_port))
6964 self.pg0.add_stream(p)
6965 self.pg_enable_capture(self.pg_interfaces)
6967 capture = self.pg1.get_capture(1)
6968 capture = capture[0]
6969 self.assertEqual(capture[IPv6].src, aftr_ip6)
6970 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6971 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6972 self.assertEqual(capture[IP].dst, '192.168.1.1')
6973 self.assertEqual(capture[TCP].sport, 10001)
6974 self.assertEqual(capture[TCP].dport, 20001)
6975 self.assert_packet_checksums_valid(capture)
6978 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6979 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6980 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6981 ICMP(id=4000, type='echo-request'))
6982 self.pg1.add_stream(p)
6983 self.pg_enable_capture(self.pg_interfaces)
6985 capture = self.pg0.get_capture(1)
6986 capture = capture[0]
6987 self.assertFalse(capture.haslayer(IPv6))
6988 self.assertEqual(capture[IP].src, self.nat_addr)
6989 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6990 self.assertNotEqual(capture[ICMP].id, 4000)
6991 self.assert_packet_checksums_valid(capture)
6992 out_id = capture[ICMP].id
6994 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6995 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6996 ICMP(id=out_id, type='echo-reply'))
6997 self.pg0.add_stream(p)
6998 self.pg_enable_capture(self.pg_interfaces)
7000 capture = self.pg1.get_capture(1)
7001 capture = capture[0]
7002 self.assertEqual(capture[IPv6].src, aftr_ip6)
7003 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7004 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7005 self.assertEqual(capture[IP].dst, '192.168.1.1')
7006 self.assertEqual(capture[ICMP].id, 4000)
7007 self.assert_packet_checksums_valid(capture)
7009 # ping DS-Lite AFTR tunnel endpoint address
7010 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7011 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7012 ICMPv6EchoRequest())
7013 self.pg1.add_stream(p)
7014 self.pg_enable_capture(self.pg_interfaces)
7016 capture = self.pg1.get_capture(1)
7017 self.assertEqual(1, len(capture))
7018 capture = capture[0]
7019 self.assertEqual(capture[IPv6].src, aftr_ip6)
7020 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7021 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7024 super(TestDSlite, self).tearDown()
7025 if not self.vpp_dead:
7026 self.logger.info(self.vapi.cli("show dslite pool"))
7028 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7029 self.logger.info(self.vapi.cli("show dslite sessions"))
7032 class TestDSliteCE(MethodHolder):
7033 """ DS-Lite CE Test Cases """
7036 def setUpConstants(cls):
7037 super(TestDSliteCE, cls).setUpConstants()
7038 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7041 def setUpClass(cls):
7042 super(TestDSliteCE, cls).setUpClass()
7045 cls.create_pg_interfaces(range(2))
7047 cls.pg0.config_ip4()
7048 cls.pg0.resolve_arp()
7050 cls.pg1.config_ip6()
7051 cls.pg1.generate_remote_hosts(1)
7052 cls.pg1.configure_ipv6_neighbors()
7055 super(TestDSliteCE, cls).tearDownClass()
7058 def test_dslite_ce(self):
7059 """ Test DS-Lite CE """
7061 b4_ip4 = '192.0.0.2'
7062 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7063 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7064 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7065 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7067 aftr_ip4 = '192.0.0.1'
7068 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7069 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7070 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7071 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7073 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7074 dst_address_length=128,
7075 next_hop_address=self.pg1.remote_ip6n,
7076 next_hop_sw_if_index=self.pg1.sw_if_index,
7080 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7081 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7082 UDP(sport=10000, dport=20000))
7083 self.pg0.add_stream(p)
7084 self.pg_enable_capture(self.pg_interfaces)
7086 capture = self.pg1.get_capture(1)
7087 capture = capture[0]
7088 self.assertEqual(capture[IPv6].src, b4_ip6)
7089 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7090 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7091 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7092 self.assertEqual(capture[UDP].sport, 10000)
7093 self.assertEqual(capture[UDP].dport, 20000)
7094 self.assert_packet_checksums_valid(capture)
7097 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7098 IPv6(dst=b4_ip6, src=aftr_ip6) /
7099 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7100 UDP(sport=20000, dport=10000))
7101 self.pg1.add_stream(p)
7102 self.pg_enable_capture(self.pg_interfaces)
7104 capture = self.pg0.get_capture(1)
7105 capture = capture[0]
7106 self.assertFalse(capture.haslayer(IPv6))
7107 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7108 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7109 self.assertEqual(capture[UDP].sport, 20000)
7110 self.assertEqual(capture[UDP].dport, 10000)
7111 self.assert_packet_checksums_valid(capture)
7113 # ping DS-Lite B4 tunnel endpoint address
7114 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7115 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7116 ICMPv6EchoRequest())
7117 self.pg1.add_stream(p)
7118 self.pg_enable_capture(self.pg_interfaces)
7120 capture = self.pg1.get_capture(1)
7121 self.assertEqual(1, len(capture))
7122 capture = capture[0]
7123 self.assertEqual(capture[IPv6].src, b4_ip6)
7124 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7125 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7128 super(TestDSliteCE, self).tearDown()
7129 if not self.vpp_dead:
7131 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7133 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7136 class TestNAT66(MethodHolder):
7137 """ NAT66 Test Cases """
7140 def setUpClass(cls):
7141 super(TestNAT66, cls).setUpClass()
7144 cls.nat_addr = 'fd01:ff::2'
7145 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7147 cls.create_pg_interfaces(range(2))
7148 cls.interfaces = list(cls.pg_interfaces)
7150 for i in cls.interfaces:
7153 i.configure_ipv6_neighbors()
7156 super(TestNAT66, cls).tearDownClass()
7159 def test_static(self):
7160 """ 1:1 NAT66 test """
7161 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7162 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7163 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7168 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7169 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7172 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7173 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7176 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7177 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7178 ICMPv6EchoRequest())
7180 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7181 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7182 GRE() / IP() / TCP())
7184 self.pg0.add_stream(pkts)
7185 self.pg_enable_capture(self.pg_interfaces)
7187 capture = self.pg1.get_capture(len(pkts))
7188 for packet in capture:
7190 self.assertEqual(packet[IPv6].src, self.nat_addr)
7191 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7192 self.assert_packet_checksums_valid(packet)
7194 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7199 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7200 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7203 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7204 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7207 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7208 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7211 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7212 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7213 GRE() / IP() / TCP())
7215 self.pg1.add_stream(pkts)
7216 self.pg_enable_capture(self.pg_interfaces)
7218 capture = self.pg0.get_capture(len(pkts))
7219 for packet in capture:
7221 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7222 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7223 self.assert_packet_checksums_valid(packet)
7225 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7228 sm = self.vapi.nat66_static_mapping_dump()
7229 self.assertEqual(len(sm), 1)
7230 self.assertEqual(sm[0].total_pkts, 8)
7232 def test_check_no_translate(self):
7233 """ NAT66 translate only when egress interface is outside interface """
7234 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7235 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7236 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7240 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7241 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7243 self.pg0.add_stream([p])
7244 self.pg_enable_capture(self.pg_interfaces)
7246 capture = self.pg1.get_capture(1)
7249 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7250 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7252 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7255 def clear_nat66(self):
7257 Clear NAT66 configuration.
7259 interfaces = self.vapi.nat66_interface_dump()
7260 for intf in interfaces:
7261 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7265 static_mappings = self.vapi.nat66_static_mapping_dump()
7266 for sm in static_mappings:
7267 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7268 sm.external_ip_address,
7273 super(TestNAT66, self).tearDown()
7274 if not self.vpp_dead:
7275 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7276 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7280 if __name__ == '__main__':
7281 unittest.main(testRunner=VppTestRunner)