9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
112 twice_nat=lb_sm.twice_nat,
113 self_twice_nat=lb_sm.self_twice_nat,
114 out2in_only=lb_sm.out2in_only,
120 identity_mappings = self.vapi.nat44_identity_mapping_dump()
121 for id_m in identity_mappings:
122 self.vapi.nat44_add_del_identity_mapping(
123 addr_only=id_m.addr_only,
126 sw_if_index=id_m.sw_if_index,
128 protocol=id_m.protocol,
131 adresses = self.vapi.nat44_address_dump()
132 for addr in adresses:
133 self.vapi.nat44_add_del_address_range(addr.ip_address,
135 twice_nat=addr.twice_nat,
138 self.vapi.nat_set_reass()
139 self.vapi.nat_set_reass(is_ip6=1)
141 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
142 local_port=0, external_port=0, vrf_id=0,
143 is_add=1, external_sw_if_index=0xFFFFFFFF,
144 proto=0, twice_nat=0, self_twice_nat=0,
145 out2in_only=0, tag=""):
147 Add/delete NAT44 static mapping
149 :param local_ip: Local IP address
150 :param external_ip: External IP address
151 :param local_port: Local port number (Optional)
152 :param external_port: External port number (Optional)
153 :param vrf_id: VRF ID (Default 0)
154 :param is_add: 1 if add, 0 if delete (Default add)
155 :param external_sw_if_index: External interface instead of IP address
156 :param proto: IP protocol (Mandatory if port specified)
157 :param twice_nat: 1 if translate external host address and port
158 :param self_twice_nat: 1 if translate external host address and port
159 whenever external host address equals
160 local address of internal host
161 :param out2in_only: if 1 rule is matching only out2in direction
162 :param tag: Opaque string tag
165 if local_port and external_port:
167 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
168 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
169 self.vapi.nat44_add_del_static_mapping(
172 external_sw_if_index,
184 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
186 Add/delete NAT44 address
188 :param ip: IP address
189 :param is_add: 1 if add, 0 if delete (Default add)
190 :param twice_nat: twice NAT address for extenal hosts
192 nat_addr = socket.inet_pton(socket.AF_INET, ip)
193 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
197 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
199 Create packet stream for inside network
201 :param in_if: Inside interface
202 :param out_if: Outside interface
203 :param dst_ip: Destination address
204 :param ttl: TTL of generated packets
207 dst_ip = out_if.remote_ip4
211 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
212 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 TCP(sport=self.tcp_port_in, dport=20))
217 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
218 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 UDP(sport=self.udp_port_in, dport=20))
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
225 ICMP(id=self.icmp_id_in, type='echo-request'))
230 def compose_ip6(self, ip4, pref, plen):
232 Compose IPv4-embedded IPv6 addresses
234 :param ip4: IPv4 address
235 :param pref: IPv6 prefix
236 :param plen: IPv6 prefix length
237 :returns: IPv4-embedded IPv6 addresses
239 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
240 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
255 pref_n[10] = ip4_n[3]
259 pref_n[10] = ip4_n[2]
260 pref_n[11] = ip4_n[3]
263 pref_n[10] = ip4_n[1]
264 pref_n[11] = ip4_n[2]
265 pref_n[12] = ip4_n[3]
267 pref_n[12] = ip4_n[0]
268 pref_n[13] = ip4_n[1]
269 pref_n[14] = ip4_n[2]
270 pref_n[15] = ip4_n[3]
271 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
273 def extract_ip4(self, ip6, plen):
275 Extract IPv4 address embedded in IPv6 addresses
277 :param ip6: IPv6 address
278 :param plen: IPv6 prefix length
279 :returns: extracted IPv4 address
281 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
313 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
315 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
317 Create IPv6 packet stream for inside network
319 :param in_if: Inside interface
320 :param out_if: Outside interface
321 :param ttl: Hop Limit of generated packets
322 :param pref: NAT64 prefix
323 :param plen: NAT64 prefix length
327 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
329 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
333 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
334 TCP(sport=self.tcp_port_in, dport=20))
338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
339 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
340 UDP(sport=self.udp_port_in, dport=20))
344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
345 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
346 ICMPv6EchoRequest(id=self.icmp_id_in))
351 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
352 use_inside_ports=False):
354 Create packet stream for outside network
356 :param out_if: Outside interface
357 :param dst_ip: Destination IP address (Default use global NAT address)
358 :param ttl: TTL of generated packets
359 :param use_inside_ports: Use inside NAT ports as destination ports
360 instead of outside ports
363 dst_ip = self.nat_addr
364 if not use_inside_ports:
365 tcp_port = self.tcp_port_out
366 udp_port = self.udp_port_out
367 icmp_id = self.icmp_id_out
369 tcp_port = self.tcp_port_in
370 udp_port = self.udp_port_in
371 icmp_id = self.icmp_id_in
374 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
375 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
376 TCP(dport=tcp_port, sport=20))
380 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
381 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
382 UDP(dport=udp_port, sport=20))
386 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
387 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
388 ICMP(id=icmp_id, type='echo-reply'))
393 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
395 Create packet stream for outside network
397 :param out_if: Outside interface
398 :param dst_ip: Destination IP address (Default use global NAT address)
399 :param hl: HL of generated packets
403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
404 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
405 TCP(dport=self.tcp_port_out, sport=20))
409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
410 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
411 UDP(dport=self.udp_port_out, sport=20))
415 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
416 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
417 ICMPv6EchoReply(id=self.icmp_id_out))
422 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
423 packet_num=3, dst_ip=None, is_ip6=False):
425 Verify captured packets on outside network
427 :param capture: Captured packets
428 :param nat_ip: Translated IP address (Default use global NAT address)
429 :param same_port: Sorce port number is not translated (Default False)
430 :param packet_num: Expected number of packets (Default 3)
431 :param dst_ip: Destination IP address (Default do not verify)
432 :param is_ip6: If L3 protocol is IPv6 (Default False)
436 ICMP46 = ICMPv6EchoRequest
441 nat_ip = self.nat_addr
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
478 packet_num=3, dst_ip=None):
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param packet_num: Expected number of packets (Default 3)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
491 def verify_capture_in(self, capture, in_if, packet_num=3):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
497 :param packet_num: Expected number of packets (Default 3)
499 self.assertEqual(packet_num, len(capture))
500 for packet in capture:
502 self.assert_packet_checksums_valid(packet)
503 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
504 if packet.haslayer(TCP):
505 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
506 elif packet.haslayer(UDP):
507 self.assertEqual(packet[UDP].dport, self.udp_port_in)
509 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511 self.logger.error(ppp("Unexpected or invalid packet "
512 "(inside network):", packet))
515 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
517 Verify captured IPv6 packets on inside network
519 :param capture: Captured packets
520 :param src_ip: Source IP
521 :param dst_ip: Destination IP address
522 :param packet_num: Expected number of packets (Default 3)
524 self.assertEqual(packet_num, len(capture))
525 for packet in capture:
527 self.assertEqual(packet[IPv6].src, src_ip)
528 self.assertEqual(packet[IPv6].dst, dst_ip)
529 self.assert_packet_checksums_valid(packet)
530 if packet.haslayer(TCP):
531 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
532 elif packet.haslayer(UDP):
533 self.assertEqual(packet[UDP].dport, self.udp_port_in)
535 self.assertEqual(packet[ICMPv6EchoReply].id,
538 self.logger.error(ppp("Unexpected or invalid packet "
539 "(inside network):", packet))
542 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
544 Verify captured packet that don't have to be translated
546 :param capture: Captured packets
547 :param ingress_if: Ingress interface
548 :param egress_if: Egress interface
550 for packet in capture:
552 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
553 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
554 if packet.haslayer(TCP):
555 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
556 elif packet.haslayer(UDP):
557 self.assertEqual(packet[UDP].sport, self.udp_port_in)
559 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
561 self.logger.error(ppp("Unexpected or invalid packet "
562 "(inside network):", packet))
565 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
566 packet_num=3, icmp_type=11):
568 Verify captured packets with ICMP errors on outside network
570 :param capture: Captured packets
571 :param src_ip: Translated IP address or IP address of VPP
572 (Default use global NAT address)
573 :param packet_num: Expected number of packets (Default 3)
574 :param icmp_type: Type of error ICMP packet
575 we are expecting (Default 11)
578 src_ip = self.nat_addr
579 self.assertEqual(packet_num, len(capture))
580 for packet in capture:
582 self.assertEqual(packet[IP].src, src_ip)
583 self.assertTrue(packet.haslayer(ICMP))
585 self.assertEqual(icmp.type, icmp_type)
586 self.assertTrue(icmp.haslayer(IPerror))
587 inner_ip = icmp[IPerror]
588 if inner_ip.haslayer(TCPerror):
589 self.assertEqual(inner_ip[TCPerror].dport,
591 elif inner_ip.haslayer(UDPerror):
592 self.assertEqual(inner_ip[UDPerror].dport,
595 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
597 self.logger.error(ppp("Unexpected or invalid packet "
598 "(outside network):", packet))
601 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
604 Verify captured packets with ICMP errors on inside network
606 :param capture: Captured packets
607 :param in_if: Inside interface
608 :param packet_num: Expected number of packets (Default 3)
609 :param icmp_type: Type of error ICMP packet
610 we are expecting (Default 11)
612 self.assertEqual(packet_num, len(capture))
613 for packet in capture:
615 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
616 self.assertTrue(packet.haslayer(ICMP))
618 self.assertEqual(icmp.type, icmp_type)
619 self.assertTrue(icmp.haslayer(IPerror))
620 inner_ip = icmp[IPerror]
621 if inner_ip.haslayer(TCPerror):
622 self.assertEqual(inner_ip[TCPerror].sport,
624 elif inner_ip.haslayer(UDPerror):
625 self.assertEqual(inner_ip[UDPerror].sport,
628 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
630 self.logger.error(ppp("Unexpected or invalid packet "
631 "(inside network):", packet))
634 def create_stream_frag(self, src_if, dst, sport, dport, data):
636 Create fragmented packet stream
638 :param src_if: Source interface
639 :param dst: Destination IPv4 address
640 :param sport: Source TCP port
641 :param dport: Destination TCP port
642 :param data: Payload data
645 id = random.randint(0, 65535)
646 p = (IP(src=src_if.remote_ip4, dst=dst) /
647 TCP(sport=sport, dport=dport) /
649 p = p.__class__(str(p))
650 chksum = p['TCP'].chksum
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
654 TCP(sport=sport, dport=dport, chksum=chksum) /
657 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
658 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
659 proto=IP_PROTOS.tcp) /
662 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
663 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
669 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
670 pref=None, plen=0, frag_size=128):
672 Create fragmented packet stream
674 :param src_if: Source interface
675 :param dst: Destination IPv4 address
676 :param sport: Source TCP port
677 :param dport: Destination TCP port
678 :param data: Payload data
679 :param pref: NAT64 prefix
680 :param plen: NAT64 prefix length
681 :param fragsize: size of fragments
685 dst_ip6 = ''.join(['64:ff9b::', dst])
687 dst_ip6 = self.compose_ip6(dst, pref, plen)
689 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
690 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
691 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
692 TCP(sport=sport, dport=dport) /
695 return fragment6(p, frag_size)
697 def reass_frags_and_verify(self, frags, src, dst):
699 Reassemble and verify fragmented packet
701 :param frags: Captured fragments
702 :param src: Source IPv4 address to verify
703 :param dst: Destination IPv4 address to verify
705 :returns: Reassembled IPv4 packet
707 buffer = StringIO.StringIO()
709 self.assertEqual(p[IP].src, src)
710 self.assertEqual(p[IP].dst, dst)
711 self.assert_ip_checksum_valid(p)
712 buffer.seek(p[IP].frag * 8)
713 buffer.write(p[IP].payload)
714 ip = frags[0].getlayer(IP)
715 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
716 proto=frags[0][IP].proto)
717 if ip.proto == IP_PROTOS.tcp:
718 p = (ip / TCP(buffer.getvalue()))
719 self.assert_tcp_checksum_valid(p)
720 elif ip.proto == IP_PROTOS.udp:
721 p = (ip / UDP(buffer.getvalue()))
724 def reass_frags_and_verify_ip6(self, frags, src, dst):
726 Reassemble and verify fragmented packet
728 :param frags: Captured fragments
729 :param src: Source IPv6 address to verify
730 :param dst: Destination IPv6 address to verify
732 :returns: Reassembled IPv6 packet
734 buffer = StringIO.StringIO()
736 self.assertEqual(p[IPv6].src, src)
737 self.assertEqual(p[IPv6].dst, dst)
738 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
739 buffer.write(p[IPv6ExtHdrFragment].payload)
740 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
741 nh=frags[0][IPv6ExtHdrFragment].nh)
742 if ip.nh == IP_PROTOS.tcp:
743 p = (ip / TCP(buffer.getvalue()))
744 elif ip.nh == IP_PROTOS.udp:
745 p = (ip / UDP(buffer.getvalue()))
746 self.assert_packet_checksums_valid(p)
749 def initiate_tcp_session(self, in_if, out_if):
751 Initiates TCP session
753 :param in_if: Inside interface
754 :param out_if: Outside interface
758 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
759 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
760 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
763 self.pg_enable_capture(self.pg_interfaces)
765 capture = out_if.get_capture(1)
767 self.tcp_port_out = p[TCP].sport
769 # SYN + ACK packet out->in
770 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
771 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
772 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
775 self.pg_enable_capture(self.pg_interfaces)
780 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
782 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
785 self.pg_enable_capture(self.pg_interfaces)
787 out_if.get_capture(1)
790 self.logger.error("TCP 3 way handshake failed")
793 def verify_ipfix_nat44_ses(self, data):
795 Verify IPFIX NAT44 session create/delete event
797 :param data: Decoded IPFIX data records
799 nat44_ses_create_num = 0
800 nat44_ses_delete_num = 0
801 self.assertEqual(6, len(data))
804 self.assertIn(ord(record[230]), [4, 5])
805 if ord(record[230]) == 4:
806 nat44_ses_create_num += 1
808 nat44_ses_delete_num += 1
810 self.assertEqual(self.pg0.remote_ip4n, record[8])
811 # postNATSourceIPv4Address
812 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
815 self.assertEqual(struct.pack("!I", 0), record[234])
816 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
817 if IP_PROTOS.icmp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
819 self.assertEqual(struct.pack("!H", self.icmp_id_out),
821 elif IP_PROTOS.tcp == ord(record[4]):
822 self.assertEqual(struct.pack("!H", self.tcp_port_in),
824 self.assertEqual(struct.pack("!H", self.tcp_port_out),
826 elif IP_PROTOS.udp == ord(record[4]):
827 self.assertEqual(struct.pack("!H", self.udp_port_in),
829 self.assertEqual(struct.pack("!H", self.udp_port_out),
832 self.fail("Invalid protocol")
833 self.assertEqual(3, nat44_ses_create_num)
834 self.assertEqual(3, nat44_ses_delete_num)
836 def verify_ipfix_addr_exhausted(self, data):
838 Verify IPFIX NAT addresses event
840 :param data: Decoded IPFIX data records
842 self.assertEqual(1, len(data))
845 self.assertEqual(ord(record[230]), 3)
847 self.assertEqual(struct.pack("!I", 0), record[283])
849 def verify_ipfix_max_sessions(self, data, limit):
851 Verify IPFIX maximum session entries exceeded event
853 :param data: Decoded IPFIX data records
854 :param limit: Number of maximum session entries that can be created.
856 self.assertEqual(1, len(data))
859 self.assertEqual(ord(record[230]), 13)
860 # natQuotaExceededEvent
861 self.assertEqual(struct.pack("I", 1), record[466])
863 self.assertEqual(struct.pack("I", limit), record[471])
865 def verify_ipfix_max_bibs(self, data, limit):
867 Verify IPFIX maximum BIB entries exceeded event
869 :param data: Decoded IPFIX data records
870 :param limit: Number of maximum BIB entries that can be created.
872 self.assertEqual(1, len(data))
875 self.assertEqual(ord(record[230]), 13)
876 # natQuotaExceededEvent
877 self.assertEqual(struct.pack("I", 2), record[466])
879 self.assertEqual(struct.pack("I", limit), record[472])
881 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
883 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
885 :param data: Decoded IPFIX data records
886 :param limit: Number of maximum fragments pending reassembly
887 :param src_addr: IPv6 source address
889 self.assertEqual(1, len(data))
892 self.assertEqual(ord(record[230]), 13)
893 # natQuotaExceededEvent
894 self.assertEqual(struct.pack("I", 5), record[466])
895 # maxFragmentsPendingReassembly
896 self.assertEqual(struct.pack("I", limit), record[475])
898 self.assertEqual(src_addr, record[27])
900 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
902 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
904 :param data: Decoded IPFIX data records
905 :param limit: Number of maximum fragments pending reassembly
906 :param src_addr: IPv4 source address
908 self.assertEqual(1, len(data))
911 self.assertEqual(ord(record[230]), 13)
912 # natQuotaExceededEvent
913 self.assertEqual(struct.pack("I", 5), record[466])
914 # maxFragmentsPendingReassembly
915 self.assertEqual(struct.pack("I", limit), record[475])
917 self.assertEqual(src_addr, record[8])
919 def verify_ipfix_bib(self, data, is_create, src_addr):
921 Verify IPFIX NAT64 BIB create and delete events
923 :param data: Decoded IPFIX data records
924 :param is_create: Create event if nonzero value otherwise delete event
925 :param src_addr: IPv6 source address
927 self.assertEqual(1, len(data))
931 self.assertEqual(ord(record[230]), 10)
933 self.assertEqual(ord(record[230]), 11)
935 self.assertEqual(src_addr, record[27])
936 # postNATSourceIPv4Address
937 self.assertEqual(self.nat_addr_n, record[225])
939 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
941 self.assertEqual(struct.pack("!I", 0), record[234])
942 # sourceTransportPort
943 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
944 # postNAPTSourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
947 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
950 Verify IPFIX NAT64 session create and delete events
952 :param data: Decoded IPFIX data records
953 :param is_create: Create event if nonzero value otherwise delete event
954 :param src_addr: IPv6 source address
955 :param dst_addr: IPv4 destination address
956 :param dst_port: destination TCP port
958 self.assertEqual(1, len(data))
962 self.assertEqual(ord(record[230]), 6)
964 self.assertEqual(ord(record[230]), 7)
966 self.assertEqual(src_addr, record[27])
967 # destinationIPv6Address
968 self.assertEqual(socket.inet_pton(socket.AF_INET6,
969 self.compose_ip6(dst_addr,
973 # postNATSourceIPv4Address
974 self.assertEqual(self.nat_addr_n, record[225])
975 # postNATDestinationIPv4Address
976 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
979 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
981 self.assertEqual(struct.pack("!I", 0), record[234])
982 # sourceTransportPort
983 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
984 # postNAPTSourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
986 # destinationTransportPort
987 self.assertEqual(struct.pack("!H", dst_port), record[11])
988 # postNAPTDestinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[228])
992 class TestNAT44(MethodHolder):
993 """ NAT44 Test Cases """
997 super(TestNAT44, cls).setUpClass()
998 cls.vapi.cli("set log class nat level debug")
1001 cls.tcp_port_in = 6303
1002 cls.tcp_port_out = 6303
1003 cls.udp_port_in = 6304
1004 cls.udp_port_out = 6304
1005 cls.icmp_id_in = 6305
1006 cls.icmp_id_out = 6305
1007 cls.nat_addr = '10.0.0.3'
1008 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1009 cls.ipfix_src_port = 4739
1010 cls.ipfix_domain_id = 1
1011 cls.tcp_external_port = 80
1013 cls.create_pg_interfaces(range(10))
1014 cls.interfaces = list(cls.pg_interfaces[0:4])
1016 for i in cls.interfaces:
1021 cls.pg0.generate_remote_hosts(3)
1022 cls.pg0.configure_ipv4_neighbors()
1024 cls.pg1.generate_remote_hosts(1)
1025 cls.pg1.configure_ipv4_neighbors()
1027 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1028 cls.vapi.ip_table_add_del(10, is_add=1)
1029 cls.vapi.ip_table_add_del(20, is_add=1)
1031 cls.pg4._local_ip4 = "172.16.255.1"
1032 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1033 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1034 cls.pg4.set_table_ip4(10)
1035 cls.pg5._local_ip4 = "172.17.255.3"
1036 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1037 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1038 cls.pg5.set_table_ip4(10)
1039 cls.pg6._local_ip4 = "172.16.255.1"
1040 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1041 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1042 cls.pg6.set_table_ip4(20)
1043 for i in cls.overlapping_interfaces:
1051 cls.pg9.generate_remote_hosts(2)
1052 cls.pg9.config_ip4()
1053 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1054 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1058 cls.pg9.resolve_arp()
1059 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1060 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1061 cls.pg9.resolve_arp()
1064 super(TestNAT44, cls).tearDownClass()
1067 def test_dynamic(self):
1068 """ NAT44 dynamic translation test """
1070 self.nat44_add_address(self.nat_addr)
1071 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1072 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1076 pkts = self.create_stream_in(self.pg0, self.pg1)
1077 self.pg0.add_stream(pkts)
1078 self.pg_enable_capture(self.pg_interfaces)
1080 capture = self.pg1.get_capture(len(pkts))
1081 self.verify_capture_out(capture)
1084 pkts = self.create_stream_out(self.pg1)
1085 self.pg1.add_stream(pkts)
1086 self.pg_enable_capture(self.pg_interfaces)
1088 capture = self.pg0.get_capture(len(pkts))
1089 self.verify_capture_in(capture, self.pg0)
1091 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1092 """ NAT44 handling of client packets with TTL=1 """
1094 self.nat44_add_address(self.nat_addr)
1095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1099 # Client side - generate traffic
1100 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1101 self.pg0.add_stream(pkts)
1102 self.pg_enable_capture(self.pg_interfaces)
1105 # Client side - verify ICMP type 11 packets
1106 capture = self.pg0.get_capture(len(pkts))
1107 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1109 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1110 """ NAT44 handling of server packets with TTL=1 """
1112 self.nat44_add_address(self.nat_addr)
1113 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1114 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1117 # Client side - create sessions
1118 pkts = self.create_stream_in(self.pg0, self.pg1)
1119 self.pg0.add_stream(pkts)
1120 self.pg_enable_capture(self.pg_interfaces)
1123 # Server side - generate traffic
1124 capture = self.pg1.get_capture(len(pkts))
1125 self.verify_capture_out(capture)
1126 pkts = self.create_stream_out(self.pg1, ttl=1)
1127 self.pg1.add_stream(pkts)
1128 self.pg_enable_capture(self.pg_interfaces)
1131 # Server side - verify ICMP type 11 packets
1132 capture = self.pg1.get_capture(len(pkts))
1133 self.verify_capture_out_with_icmp_errors(capture,
1134 src_ip=self.pg1.local_ip4)
1136 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1137 """ NAT44 handling of error responses to client packets with TTL=2 """
1139 self.nat44_add_address(self.nat_addr)
1140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1144 # Client side - generate traffic
1145 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1146 self.pg0.add_stream(pkts)
1147 self.pg_enable_capture(self.pg_interfaces)
1150 # Server side - simulate ICMP type 11 response
1151 capture = self.pg1.get_capture(len(pkts))
1152 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1153 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1154 ICMP(type=11) / packet[IP] for packet in capture]
1155 self.pg1.add_stream(pkts)
1156 self.pg_enable_capture(self.pg_interfaces)
1159 # Client side - verify ICMP type 11 packets
1160 capture = self.pg0.get_capture(len(pkts))
1161 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1163 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1164 """ NAT44 handling of error responses to server packets with TTL=2 """
1166 self.nat44_add_address(self.nat_addr)
1167 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1168 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1171 # Client side - create sessions
1172 pkts = self.create_stream_in(self.pg0, self.pg1)
1173 self.pg0.add_stream(pkts)
1174 self.pg_enable_capture(self.pg_interfaces)
1177 # Server side - generate traffic
1178 capture = self.pg1.get_capture(len(pkts))
1179 self.verify_capture_out(capture)
1180 pkts = self.create_stream_out(self.pg1, ttl=2)
1181 self.pg1.add_stream(pkts)
1182 self.pg_enable_capture(self.pg_interfaces)
1185 # Client side - simulate ICMP type 11 response
1186 capture = self.pg0.get_capture(len(pkts))
1187 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1188 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1189 ICMP(type=11) / packet[IP] for packet in capture]
1190 self.pg0.add_stream(pkts)
1191 self.pg_enable_capture(self.pg_interfaces)
1194 # Server side - verify ICMP type 11 packets
1195 capture = self.pg1.get_capture(len(pkts))
1196 self.verify_capture_out_with_icmp_errors(capture)
1198 def test_ping_out_interface_from_outside(self):
1199 """ Ping NAT44 out interface from outside network """
1201 self.nat44_add_address(self.nat_addr)
1202 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1203 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1206 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1207 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1208 ICMP(id=self.icmp_id_out, type='echo-request'))
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1213 capture = self.pg1.get_capture(len(pkts))
1214 self.assertEqual(1, len(capture))
1217 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1218 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1219 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1220 self.assertEqual(packet[ICMP].type, 0) # echo reply
1222 self.logger.error(ppp("Unexpected or invalid packet "
1223 "(outside network):", packet))
1226 def test_ping_internal_host_from_outside(self):
1227 """ Ping internal host from outside network """
1229 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1230 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1231 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1235 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1236 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1237 ICMP(id=self.icmp_id_out, type='echo-request'))
1238 self.pg1.add_stream(pkt)
1239 self.pg_enable_capture(self.pg_interfaces)
1241 capture = self.pg0.get_capture(1)
1242 self.verify_capture_in(capture, self.pg0, packet_num=1)
1243 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1246 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1247 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1248 ICMP(id=self.icmp_id_in, type='echo-reply'))
1249 self.pg0.add_stream(pkt)
1250 self.pg_enable_capture(self.pg_interfaces)
1252 capture = self.pg1.get_capture(1)
1253 self.verify_capture_out(capture, same_port=True, packet_num=1)
1254 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1256 def test_forwarding(self):
1257 """ NAT44 forwarding test """
1259 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1260 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1262 self.vapi.nat44_forwarding_enable_disable(1)
1264 real_ip = self.pg0.remote_ip4n
1265 alias_ip = self.nat_addr_n
1266 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1267 external_ip=alias_ip)
1270 # static mapping match
1272 pkts = self.create_stream_out(self.pg1)
1273 self.pg1.add_stream(pkts)
1274 self.pg_enable_capture(self.pg_interfaces)
1276 capture = self.pg0.get_capture(len(pkts))
1277 self.verify_capture_in(capture, self.pg0)
1279 pkts = self.create_stream_in(self.pg0, self.pg1)
1280 self.pg0.add_stream(pkts)
1281 self.pg_enable_capture(self.pg_interfaces)
1283 capture = self.pg1.get_capture(len(pkts))
1284 self.verify_capture_out(capture, same_port=True)
1286 # no static mapping match
1288 host0 = self.pg0.remote_hosts[0]
1289 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1291 pkts = self.create_stream_out(self.pg1,
1292 dst_ip=self.pg0.remote_ip4,
1293 use_inside_ports=True)
1294 self.pg1.add_stream(pkts)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg0.get_capture(len(pkts))
1298 self.verify_capture_in(capture, self.pg0)
1300 pkts = self.create_stream_in(self.pg0, self.pg1)
1301 self.pg0.add_stream(pkts)
1302 self.pg_enable_capture(self.pg_interfaces)
1304 capture = self.pg1.get_capture(len(pkts))
1305 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1308 self.pg0.remote_hosts[0] = host0
1311 self.vapi.nat44_forwarding_enable_disable(0)
1312 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1313 external_ip=alias_ip,
1316 def test_static_in(self):
1317 """ 1:1 NAT initialized from inside network """
1319 nat_ip = "10.0.0.10"
1320 self.tcp_port_out = 6303
1321 self.udp_port_out = 6304
1322 self.icmp_id_out = 6305
1324 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1325 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1326 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1328 sm = self.vapi.nat44_static_mapping_dump()
1329 self.assertEqual(len(sm), 1)
1330 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1331 self.assertEqual(sm[0].protocol, 0)
1332 self.assertEqual(sm[0].local_port, 0)
1333 self.assertEqual(sm[0].external_port, 0)
1336 pkts = self.create_stream_in(self.pg0, self.pg1)
1337 self.pg0.add_stream(pkts)
1338 self.pg_enable_capture(self.pg_interfaces)
1340 capture = self.pg1.get_capture(len(pkts))
1341 self.verify_capture_out(capture, nat_ip, True)
1344 pkts = self.create_stream_out(self.pg1, nat_ip)
1345 self.pg1.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg0.get_capture(len(pkts))
1349 self.verify_capture_in(capture, self.pg0)
1351 def test_static_out(self):
1352 """ 1:1 NAT initialized from outside network """
1354 nat_ip = "10.0.0.20"
1355 self.tcp_port_out = 6303
1356 self.udp_port_out = 6304
1357 self.icmp_id_out = 6305
1360 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1361 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1362 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1364 sm = self.vapi.nat44_static_mapping_dump()
1365 self.assertEqual(len(sm), 1)
1366 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1369 pkts = self.create_stream_out(self.pg1, nat_ip)
1370 self.pg1.add_stream(pkts)
1371 self.pg_enable_capture(self.pg_interfaces)
1373 capture = self.pg0.get_capture(len(pkts))
1374 self.verify_capture_in(capture, self.pg0)
1377 pkts = self.create_stream_in(self.pg0, self.pg1)
1378 self.pg0.add_stream(pkts)
1379 self.pg_enable_capture(self.pg_interfaces)
1381 capture = self.pg1.get_capture(len(pkts))
1382 self.verify_capture_out(capture, nat_ip, True)
1384 def test_static_with_port_in(self):
1385 """ 1:1 NAPT initialized from inside network """
1387 self.tcp_port_out = 3606
1388 self.udp_port_out = 3607
1389 self.icmp_id_out = 3608
1391 self.nat44_add_address(self.nat_addr)
1392 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1393 self.tcp_port_in, self.tcp_port_out,
1394 proto=IP_PROTOS.tcp)
1395 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1396 self.udp_port_in, self.udp_port_out,
1397 proto=IP_PROTOS.udp)
1398 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1399 self.icmp_id_in, self.icmp_id_out,
1400 proto=IP_PROTOS.icmp)
1401 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1402 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1406 pkts = self.create_stream_in(self.pg0, self.pg1)
1407 self.pg0.add_stream(pkts)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg1.get_capture(len(pkts))
1411 self.verify_capture_out(capture)
1414 pkts = self.create_stream_out(self.pg1)
1415 self.pg1.add_stream(pkts)
1416 self.pg_enable_capture(self.pg_interfaces)
1418 capture = self.pg0.get_capture(len(pkts))
1419 self.verify_capture_in(capture, self.pg0)
1421 def test_static_with_port_out(self):
1422 """ 1:1 NAPT initialized from outside network """
1424 self.tcp_port_out = 30606
1425 self.udp_port_out = 30607
1426 self.icmp_id_out = 30608
1428 self.nat44_add_address(self.nat_addr)
1429 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1430 self.tcp_port_in, self.tcp_port_out,
1431 proto=IP_PROTOS.tcp)
1432 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1433 self.udp_port_in, self.udp_port_out,
1434 proto=IP_PROTOS.udp)
1435 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1436 self.icmp_id_in, self.icmp_id_out,
1437 proto=IP_PROTOS.icmp)
1438 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1439 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1443 pkts = self.create_stream_out(self.pg1)
1444 self.pg1.add_stream(pkts)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 capture = self.pg0.get_capture(len(pkts))
1448 self.verify_capture_in(capture, self.pg0)
1451 pkts = self.create_stream_in(self.pg0, self.pg1)
1452 self.pg0.add_stream(pkts)
1453 self.pg_enable_capture(self.pg_interfaces)
1455 capture = self.pg1.get_capture(len(pkts))
1456 self.verify_capture_out(capture)
1458 def test_static_vrf_aware(self):
1459 """ 1:1 NAT VRF awareness """
1461 nat_ip1 = "10.0.0.30"
1462 nat_ip2 = "10.0.0.40"
1463 self.tcp_port_out = 6303
1464 self.udp_port_out = 6304
1465 self.icmp_id_out = 6305
1467 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1469 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1471 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1473 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1474 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1476 # inside interface VRF match NAT44 static mapping VRF
1477 pkts = self.create_stream_in(self.pg4, self.pg3)
1478 self.pg4.add_stream(pkts)
1479 self.pg_enable_capture(self.pg_interfaces)
1481 capture = self.pg3.get_capture(len(pkts))
1482 self.verify_capture_out(capture, nat_ip1, True)
1484 # inside interface VRF don't match NAT44 static mapping VRF (packets
1486 pkts = self.create_stream_in(self.pg0, self.pg3)
1487 self.pg0.add_stream(pkts)
1488 self.pg_enable_capture(self.pg_interfaces)
1490 self.pg3.assert_nothing_captured()
1492 def test_dynamic_to_static(self):
1493 """ Switch from dynamic translation to 1:1NAT """
1494 nat_ip = "10.0.0.10"
1495 self.tcp_port_out = 6303
1496 self.udp_port_out = 6304
1497 self.icmp_id_out = 6305
1499 self.nat44_add_address(self.nat_addr)
1500 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1501 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1505 pkts = self.create_stream_in(self.pg0, self.pg1)
1506 self.pg0.add_stream(pkts)
1507 self.pg_enable_capture(self.pg_interfaces)
1509 capture = self.pg1.get_capture(len(pkts))
1510 self.verify_capture_out(capture)
1513 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1514 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1515 self.assertEqual(len(sessions), 0)
1516 pkts = self.create_stream_in(self.pg0, self.pg1)
1517 self.pg0.add_stream(pkts)
1518 self.pg_enable_capture(self.pg_interfaces)
1520 capture = self.pg1.get_capture(len(pkts))
1521 self.verify_capture_out(capture, nat_ip, True)
1523 def test_identity_nat(self):
1524 """ Identity NAT """
1526 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1527 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1528 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1531 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1532 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1533 TCP(sport=12345, dport=56789))
1534 self.pg1.add_stream(p)
1535 self.pg_enable_capture(self.pg_interfaces)
1537 capture = self.pg0.get_capture(1)
1542 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1543 self.assertEqual(ip.src, self.pg1.remote_ip4)
1544 self.assertEqual(tcp.dport, 56789)
1545 self.assertEqual(tcp.sport, 12345)
1546 self.assert_packet_checksums_valid(p)
1548 self.logger.error(ppp("Unexpected or invalid packet:", p))
1551 def test_multiple_inside_interfaces(self):
1552 """ NAT44 multiple non-overlapping address space inside interfaces """
1554 self.nat44_add_address(self.nat_addr)
1555 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1556 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1557 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1560 # between two NAT44 inside interfaces (no translation)
1561 pkts = self.create_stream_in(self.pg0, self.pg1)
1562 self.pg0.add_stream(pkts)
1563 self.pg_enable_capture(self.pg_interfaces)
1565 capture = self.pg1.get_capture(len(pkts))
1566 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1568 # from NAT44 inside to interface without NAT44 feature (no translation)
1569 pkts = self.create_stream_in(self.pg0, self.pg2)
1570 self.pg0.add_stream(pkts)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg2.get_capture(len(pkts))
1574 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1576 # in2out 1st interface
1577 pkts = self.create_stream_in(self.pg0, self.pg3)
1578 self.pg0.add_stream(pkts)
1579 self.pg_enable_capture(self.pg_interfaces)
1581 capture = self.pg3.get_capture(len(pkts))
1582 self.verify_capture_out(capture)
1584 # out2in 1st interface
1585 pkts = self.create_stream_out(self.pg3)
1586 self.pg3.add_stream(pkts)
1587 self.pg_enable_capture(self.pg_interfaces)
1589 capture = self.pg0.get_capture(len(pkts))
1590 self.verify_capture_in(capture, self.pg0)
1592 # in2out 2nd interface
1593 pkts = self.create_stream_in(self.pg1, self.pg3)
1594 self.pg1.add_stream(pkts)
1595 self.pg_enable_capture(self.pg_interfaces)
1597 capture = self.pg3.get_capture(len(pkts))
1598 self.verify_capture_out(capture)
1600 # out2in 2nd interface
1601 pkts = self.create_stream_out(self.pg3)
1602 self.pg3.add_stream(pkts)
1603 self.pg_enable_capture(self.pg_interfaces)
1605 capture = self.pg1.get_capture(len(pkts))
1606 self.verify_capture_in(capture, self.pg1)
1608 def test_inside_overlapping_interfaces(self):
1609 """ NAT44 multiple inside interfaces with overlapping address space """
1611 static_nat_ip = "10.0.0.10"
1612 self.nat44_add_address(self.nat_addr)
1613 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1615 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1616 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1617 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1618 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1621 # between NAT44 inside interfaces with same VRF (no translation)
1622 pkts = self.create_stream_in(self.pg4, self.pg5)
1623 self.pg4.add_stream(pkts)
1624 self.pg_enable_capture(self.pg_interfaces)
1626 capture = self.pg5.get_capture(len(pkts))
1627 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1629 # between NAT44 inside interfaces with different VRF (hairpinning)
1630 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1631 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1632 TCP(sport=1234, dport=5678))
1633 self.pg4.add_stream(p)
1634 self.pg_enable_capture(self.pg_interfaces)
1636 capture = self.pg6.get_capture(1)
1641 self.assertEqual(ip.src, self.nat_addr)
1642 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1643 self.assertNotEqual(tcp.sport, 1234)
1644 self.assertEqual(tcp.dport, 5678)
1646 self.logger.error(ppp("Unexpected or invalid packet:", p))
1649 # in2out 1st interface
1650 pkts = self.create_stream_in(self.pg4, self.pg3)
1651 self.pg4.add_stream(pkts)
1652 self.pg_enable_capture(self.pg_interfaces)
1654 capture = self.pg3.get_capture(len(pkts))
1655 self.verify_capture_out(capture)
1657 # out2in 1st interface
1658 pkts = self.create_stream_out(self.pg3)
1659 self.pg3.add_stream(pkts)
1660 self.pg_enable_capture(self.pg_interfaces)
1662 capture = self.pg4.get_capture(len(pkts))
1663 self.verify_capture_in(capture, self.pg4)
1665 # in2out 2nd interface
1666 pkts = self.create_stream_in(self.pg5, self.pg3)
1667 self.pg5.add_stream(pkts)
1668 self.pg_enable_capture(self.pg_interfaces)
1670 capture = self.pg3.get_capture(len(pkts))
1671 self.verify_capture_out(capture)
1673 # out2in 2nd interface
1674 pkts = self.create_stream_out(self.pg3)
1675 self.pg3.add_stream(pkts)
1676 self.pg_enable_capture(self.pg_interfaces)
1678 capture = self.pg5.get_capture(len(pkts))
1679 self.verify_capture_in(capture, self.pg5)
1682 addresses = self.vapi.nat44_address_dump()
1683 self.assertEqual(len(addresses), 1)
1684 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1685 self.assertEqual(len(sessions), 3)
1686 for session in sessions:
1687 self.assertFalse(session.is_static)
1688 self.assertEqual(session.inside_ip_address[0:4],
1689 self.pg5.remote_ip4n)
1690 self.assertEqual(session.outside_ip_address,
1691 addresses[0].ip_address)
1692 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1693 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1694 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1695 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1696 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1697 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1698 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1699 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1700 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1702 # in2out 3rd interface
1703 pkts = self.create_stream_in(self.pg6, self.pg3)
1704 self.pg6.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg3.get_capture(len(pkts))
1708 self.verify_capture_out(capture, static_nat_ip, True)
1710 # out2in 3rd interface
1711 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1712 self.pg3.add_stream(pkts)
1713 self.pg_enable_capture(self.pg_interfaces)
1715 capture = self.pg6.get_capture(len(pkts))
1716 self.verify_capture_in(capture, self.pg6)
1718 # general user and session dump verifications
1719 users = self.vapi.nat44_user_dump()
1720 self.assertTrue(len(users) >= 3)
1721 addresses = self.vapi.nat44_address_dump()
1722 self.assertEqual(len(addresses), 1)
1724 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1726 for session in sessions:
1727 self.assertEqual(user.ip_address, session.inside_ip_address)
1728 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1729 self.assertTrue(session.protocol in
1730 [IP_PROTOS.tcp, IP_PROTOS.udp,
1732 self.assertFalse(session.ext_host_valid)
1735 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1736 self.assertTrue(len(sessions) >= 4)
1737 for session in sessions:
1738 self.assertFalse(session.is_static)
1739 self.assertEqual(session.inside_ip_address[0:4],
1740 self.pg4.remote_ip4n)
1741 self.assertEqual(session.outside_ip_address,
1742 addresses[0].ip_address)
1745 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1746 self.assertTrue(len(sessions) >= 3)
1747 for session in sessions:
1748 self.assertTrue(session.is_static)
1749 self.assertEqual(session.inside_ip_address[0:4],
1750 self.pg6.remote_ip4n)
1751 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1752 map(int, static_nat_ip.split('.')))
1753 self.assertTrue(session.inside_port in
1754 [self.tcp_port_in, self.udp_port_in,
1757 def test_hairpinning(self):
1758 """ NAT44 hairpinning - 1:1 NAPT """
1760 host = self.pg0.remote_hosts[0]
1761 server = self.pg0.remote_hosts[1]
1764 server_in_port = 5678
1765 server_out_port = 8765
1767 self.nat44_add_address(self.nat_addr)
1768 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1769 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1771 # add static mapping for server
1772 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1773 server_in_port, server_out_port,
1774 proto=IP_PROTOS.tcp)
1776 # send packet from host to server
1777 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1778 IP(src=host.ip4, dst=self.nat_addr) /
1779 TCP(sport=host_in_port, dport=server_out_port))
1780 self.pg0.add_stream(p)
1781 self.pg_enable_capture(self.pg_interfaces)
1783 capture = self.pg0.get_capture(1)
1788 self.assertEqual(ip.src, self.nat_addr)
1789 self.assertEqual(ip.dst, server.ip4)
1790 self.assertNotEqual(tcp.sport, host_in_port)
1791 self.assertEqual(tcp.dport, server_in_port)
1792 self.assert_packet_checksums_valid(p)
1793 host_out_port = tcp.sport
1795 self.logger.error(ppp("Unexpected or invalid packet:", p))
1798 # send reply from server to host
1799 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1800 IP(src=server.ip4, dst=self.nat_addr) /
1801 TCP(sport=server_in_port, dport=host_out_port))
1802 self.pg0.add_stream(p)
1803 self.pg_enable_capture(self.pg_interfaces)
1805 capture = self.pg0.get_capture(1)
1810 self.assertEqual(ip.src, self.nat_addr)
1811 self.assertEqual(ip.dst, host.ip4)
1812 self.assertEqual(tcp.sport, server_out_port)
1813 self.assertEqual(tcp.dport, host_in_port)
1814 self.assert_packet_checksums_valid(p)
1816 self.logger.error(ppp("Unexpected or invalid packet:", p))
1819 def test_hairpinning2(self):
1820 """ NAT44 hairpinning - 1:1 NAT"""
1822 server1_nat_ip = "10.0.0.10"
1823 server2_nat_ip = "10.0.0.11"
1824 host = self.pg0.remote_hosts[0]
1825 server1 = self.pg0.remote_hosts[1]
1826 server2 = self.pg0.remote_hosts[2]
1827 server_tcp_port = 22
1828 server_udp_port = 20
1830 self.nat44_add_address(self.nat_addr)
1831 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1832 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1835 # add static mapping for servers
1836 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1837 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1841 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1842 IP(src=host.ip4, dst=server1_nat_ip) /
1843 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1845 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1846 IP(src=host.ip4, dst=server1_nat_ip) /
1847 UDP(sport=self.udp_port_in, dport=server_udp_port))
1849 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1850 IP(src=host.ip4, dst=server1_nat_ip) /
1851 ICMP(id=self.icmp_id_in, type='echo-request'))
1853 self.pg0.add_stream(pkts)
1854 self.pg_enable_capture(self.pg_interfaces)
1856 capture = self.pg0.get_capture(len(pkts))
1857 for packet in capture:
1859 self.assertEqual(packet[IP].src, self.nat_addr)
1860 self.assertEqual(packet[IP].dst, server1.ip4)
1861 if packet.haslayer(TCP):
1862 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1863 self.assertEqual(packet[TCP].dport, server_tcp_port)
1864 self.tcp_port_out = packet[TCP].sport
1865 self.assert_packet_checksums_valid(packet)
1866 elif packet.haslayer(UDP):
1867 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1868 self.assertEqual(packet[UDP].dport, server_udp_port)
1869 self.udp_port_out = packet[UDP].sport
1871 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1872 self.icmp_id_out = packet[ICMP].id
1874 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1879 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1880 IP(src=server1.ip4, dst=self.nat_addr) /
1881 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1883 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1884 IP(src=server1.ip4, dst=self.nat_addr) /
1885 UDP(sport=server_udp_port, dport=self.udp_port_out))
1887 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1888 IP(src=server1.ip4, dst=self.nat_addr) /
1889 ICMP(id=self.icmp_id_out, type='echo-reply'))
1891 self.pg0.add_stream(pkts)
1892 self.pg_enable_capture(self.pg_interfaces)
1894 capture = self.pg0.get_capture(len(pkts))
1895 for packet in capture:
1897 self.assertEqual(packet[IP].src, server1_nat_ip)
1898 self.assertEqual(packet[IP].dst, host.ip4)
1899 if packet.haslayer(TCP):
1900 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1901 self.assertEqual(packet[TCP].sport, server_tcp_port)
1902 self.assert_packet_checksums_valid(packet)
1903 elif packet.haslayer(UDP):
1904 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1905 self.assertEqual(packet[UDP].sport, server_udp_port)
1907 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1909 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1912 # server2 to server1
1914 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1915 IP(src=server2.ip4, dst=server1_nat_ip) /
1916 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1918 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1919 IP(src=server2.ip4, dst=server1_nat_ip) /
1920 UDP(sport=self.udp_port_in, dport=server_udp_port))
1922 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1923 IP(src=server2.ip4, dst=server1_nat_ip) /
1924 ICMP(id=self.icmp_id_in, type='echo-request'))
1926 self.pg0.add_stream(pkts)
1927 self.pg_enable_capture(self.pg_interfaces)
1929 capture = self.pg0.get_capture(len(pkts))
1930 for packet in capture:
1932 self.assertEqual(packet[IP].src, server2_nat_ip)
1933 self.assertEqual(packet[IP].dst, server1.ip4)
1934 if packet.haslayer(TCP):
1935 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1936 self.assertEqual(packet[TCP].dport, server_tcp_port)
1937 self.tcp_port_out = packet[TCP].sport
1938 self.assert_packet_checksums_valid(packet)
1939 elif packet.haslayer(UDP):
1940 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1941 self.assertEqual(packet[UDP].dport, server_udp_port)
1942 self.udp_port_out = packet[UDP].sport
1944 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1945 self.icmp_id_out = packet[ICMP].id
1947 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1950 # server1 to server2
1952 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1953 IP(src=server1.ip4, dst=server2_nat_ip) /
1954 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1956 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1957 IP(src=server1.ip4, dst=server2_nat_ip) /
1958 UDP(sport=server_udp_port, dport=self.udp_port_out))
1960 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1961 IP(src=server1.ip4, dst=server2_nat_ip) /
1962 ICMP(id=self.icmp_id_out, type='echo-reply'))
1964 self.pg0.add_stream(pkts)
1965 self.pg_enable_capture(self.pg_interfaces)
1967 capture = self.pg0.get_capture(len(pkts))
1968 for packet in capture:
1970 self.assertEqual(packet[IP].src, server1_nat_ip)
1971 self.assertEqual(packet[IP].dst, server2.ip4)
1972 if packet.haslayer(TCP):
1973 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1974 self.assertEqual(packet[TCP].sport, server_tcp_port)
1975 self.assert_packet_checksums_valid(packet)
1976 elif packet.haslayer(UDP):
1977 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1978 self.assertEqual(packet[UDP].sport, server_udp_port)
1980 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1982 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1985 def test_max_translations_per_user(self):
1986 """ MAX translations per user - recycle the least recently used """
1988 self.nat44_add_address(self.nat_addr)
1989 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1990 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1993 # get maximum number of translations per user
1994 nat44_config = self.vapi.nat_show_config()
1996 # send more than maximum number of translations per user packets
1997 pkts_num = nat44_config.max_translations_per_user + 5
1999 for port in range(0, pkts_num):
2000 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2001 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2002 TCP(sport=1025 + port))
2004 self.pg0.add_stream(pkts)
2005 self.pg_enable_capture(self.pg_interfaces)
2008 # verify number of translated packet
2009 self.pg1.get_capture(pkts_num)
2011 users = self.vapi.nat44_user_dump()
2013 if user.ip_address == self.pg0.remote_ip4n:
2014 self.assertEqual(user.nsessions,
2015 nat44_config.max_translations_per_user)
2016 self.assertEqual(user.nstaticsessions, 0)
2019 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2021 proto=IP_PROTOS.tcp)
2022 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2023 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2024 TCP(sport=tcp_port))
2025 self.pg0.add_stream(p)
2026 self.pg_enable_capture(self.pg_interfaces)
2028 self.pg1.get_capture(1)
2029 users = self.vapi.nat44_user_dump()
2031 if user.ip_address == self.pg0.remote_ip4n:
2032 self.assertEqual(user.nsessions,
2033 nat44_config.max_translations_per_user - 1)
2034 self.assertEqual(user.nstaticsessions, 1)
2036 def test_interface_addr(self):
2037 """ Acquire NAT44 addresses from interface """
2038 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2040 # no address in NAT pool
2041 adresses = self.vapi.nat44_address_dump()
2042 self.assertEqual(0, len(adresses))
2044 # configure interface address and check NAT address pool
2045 self.pg7.config_ip4()
2046 adresses = self.vapi.nat44_address_dump()
2047 self.assertEqual(1, len(adresses))
2048 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2050 # remove interface address and check NAT address pool
2051 self.pg7.unconfig_ip4()
2052 adresses = self.vapi.nat44_address_dump()
2053 self.assertEqual(0, len(adresses))
2055 def test_interface_addr_static_mapping(self):
2056 """ Static mapping with addresses from interface """
2059 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2060 self.nat44_add_static_mapping(
2062 external_sw_if_index=self.pg7.sw_if_index,
2065 # static mappings with external interface
2066 static_mappings = self.vapi.nat44_static_mapping_dump()
2067 self.assertEqual(1, len(static_mappings))
2068 self.assertEqual(self.pg7.sw_if_index,
2069 static_mappings[0].external_sw_if_index)
2070 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2072 # configure interface address and check static mappings
2073 self.pg7.config_ip4()
2074 static_mappings = self.vapi.nat44_static_mapping_dump()
2075 self.assertEqual(2, len(static_mappings))
2077 for sm in static_mappings:
2078 if sm.external_sw_if_index == 0xFFFFFFFF:
2079 self.assertEqual(sm.external_ip_address[0:4],
2080 self.pg7.local_ip4n)
2081 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2083 self.assertTrue(resolved)
2085 # remove interface address and check static mappings
2086 self.pg7.unconfig_ip4()
2087 static_mappings = self.vapi.nat44_static_mapping_dump()
2088 self.assertEqual(1, len(static_mappings))
2089 self.assertEqual(self.pg7.sw_if_index,
2090 static_mappings[0].external_sw_if_index)
2091 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2093 # configure interface address again and check static mappings
2094 self.pg7.config_ip4()
2095 static_mappings = self.vapi.nat44_static_mapping_dump()
2096 self.assertEqual(2, len(static_mappings))
2098 for sm in static_mappings:
2099 if sm.external_sw_if_index == 0xFFFFFFFF:
2100 self.assertEqual(sm.external_ip_address[0:4],
2101 self.pg7.local_ip4n)
2102 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2104 self.assertTrue(resolved)
2106 # remove static mapping
2107 self.nat44_add_static_mapping(
2109 external_sw_if_index=self.pg7.sw_if_index,
2112 static_mappings = self.vapi.nat44_static_mapping_dump()
2113 self.assertEqual(0, len(static_mappings))
2115 def test_interface_addr_identity_nat(self):
2116 """ Identity NAT with addresses from interface """
2119 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2120 self.vapi.nat44_add_del_identity_mapping(
2121 sw_if_index=self.pg7.sw_if_index,
2123 protocol=IP_PROTOS.tcp,
2126 # identity mappings with external interface
2127 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2128 self.assertEqual(1, len(identity_mappings))
2129 self.assertEqual(self.pg7.sw_if_index,
2130 identity_mappings[0].sw_if_index)
2132 # configure interface address and check identity mappings
2133 self.pg7.config_ip4()
2134 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2136 self.assertEqual(2, len(identity_mappings))
2137 for sm in identity_mappings:
2138 if sm.sw_if_index == 0xFFFFFFFF:
2139 self.assertEqual(identity_mappings[0].ip_address,
2140 self.pg7.local_ip4n)
2141 self.assertEqual(port, identity_mappings[0].port)
2142 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2144 self.assertTrue(resolved)
2146 # remove interface address and check identity mappings
2147 self.pg7.unconfig_ip4()
2148 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2149 self.assertEqual(1, len(identity_mappings))
2150 self.assertEqual(self.pg7.sw_if_index,
2151 identity_mappings[0].sw_if_index)
2153 def test_ipfix_nat44_sess(self):
2154 """ IPFIX logging NAT44 session created/delted """
2155 self.ipfix_domain_id = 10
2156 self.ipfix_src_port = 20202
2157 colector_port = 30303
2158 bind_layers(UDP, IPFIX, dport=30303)
2159 self.nat44_add_address(self.nat_addr)
2160 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2161 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2163 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2164 src_address=self.pg3.local_ip4n,
2166 template_interval=10,
2167 collector_port=colector_port)
2168 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2169 src_port=self.ipfix_src_port)
2171 pkts = self.create_stream_in(self.pg0, self.pg1)
2172 self.pg0.add_stream(pkts)
2173 self.pg_enable_capture(self.pg_interfaces)
2175 capture = self.pg1.get_capture(len(pkts))
2176 self.verify_capture_out(capture)
2177 self.nat44_add_address(self.nat_addr, is_add=0)
2178 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2179 capture = self.pg3.get_capture(9)
2180 ipfix = IPFIXDecoder()
2181 # first load template
2183 self.assertTrue(p.haslayer(IPFIX))
2184 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2185 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2186 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2187 self.assertEqual(p[UDP].dport, colector_port)
2188 self.assertEqual(p[IPFIX].observationDomainID,
2189 self.ipfix_domain_id)
2190 if p.haslayer(Template):
2191 ipfix.add_template(p.getlayer(Template))
2192 # verify events in data set
2194 if p.haslayer(Data):
2195 data = ipfix.decode_data_set(p.getlayer(Set))
2196 self.verify_ipfix_nat44_ses(data)
2198 def test_ipfix_addr_exhausted(self):
2199 """ IPFIX logging NAT addresses exhausted """
2200 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2201 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2203 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2204 src_address=self.pg3.local_ip4n,
2206 template_interval=10)
2207 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2208 src_port=self.ipfix_src_port)
2210 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2211 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2213 self.pg0.add_stream(p)
2214 self.pg_enable_capture(self.pg_interfaces)
2216 self.pg1.assert_nothing_captured()
2218 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2219 capture = self.pg3.get_capture(9)
2220 ipfix = IPFIXDecoder()
2221 # first load template
2223 self.assertTrue(p.haslayer(IPFIX))
2224 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2225 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2226 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2227 self.assertEqual(p[UDP].dport, 4739)
2228 self.assertEqual(p[IPFIX].observationDomainID,
2229 self.ipfix_domain_id)
2230 if p.haslayer(Template):
2231 ipfix.add_template(p.getlayer(Template))
2232 # verify events in data set
2234 if p.haslayer(Data):
2235 data = ipfix.decode_data_set(p.getlayer(Set))
2236 self.verify_ipfix_addr_exhausted(data)
2238 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2239 def test_ipfix_max_sessions(self):
2240 """ IPFIX logging maximum session entries exceeded """
2241 self.nat44_add_address(self.nat_addr)
2242 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2243 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2246 nat44_config = self.vapi.nat_show_config()
2247 max_sessions = 10 * nat44_config.translation_buckets
2250 for i in range(0, max_sessions):
2251 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2252 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2253 IP(src=src, dst=self.pg1.remote_ip4) /
2256 self.pg0.add_stream(pkts)
2257 self.pg_enable_capture(self.pg_interfaces)
2260 self.pg1.get_capture(max_sessions)
2261 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2262 src_address=self.pg3.local_ip4n,
2264 template_interval=10)
2265 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2266 src_port=self.ipfix_src_port)
2268 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2269 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2271 self.pg0.add_stream(p)
2272 self.pg_enable_capture(self.pg_interfaces)
2274 self.pg1.assert_nothing_captured()
2276 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2277 capture = self.pg3.get_capture(9)
2278 ipfix = IPFIXDecoder()
2279 # first load template
2281 self.assertTrue(p.haslayer(IPFIX))
2282 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2283 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2284 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2285 self.assertEqual(p[UDP].dport, 4739)
2286 self.assertEqual(p[IPFIX].observationDomainID,
2287 self.ipfix_domain_id)
2288 if p.haslayer(Template):
2289 ipfix.add_template(p.getlayer(Template))
2290 # verify events in data set
2292 if p.haslayer(Data):
2293 data = ipfix.decode_data_set(p.getlayer(Set))
2294 self.verify_ipfix_max_sessions(data, max_sessions)
2296 def test_pool_addr_fib(self):
2297 """ NAT44 add pool addresses to FIB """
2298 static_addr = '10.0.0.10'
2299 self.nat44_add_address(self.nat_addr)
2300 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2301 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2303 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2306 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2307 ARP(op=ARP.who_has, pdst=self.nat_addr,
2308 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2309 self.pg1.add_stream(p)
2310 self.pg_enable_capture(self.pg_interfaces)
2312 capture = self.pg1.get_capture(1)
2313 self.assertTrue(capture[0].haslayer(ARP))
2314 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2317 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2318 ARP(op=ARP.who_has, pdst=static_addr,
2319 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2320 self.pg1.add_stream(p)
2321 self.pg_enable_capture(self.pg_interfaces)
2323 capture = self.pg1.get_capture(1)
2324 self.assertTrue(capture[0].haslayer(ARP))
2325 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2327 # send ARP to non-NAT44 interface
2328 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2329 ARP(op=ARP.who_has, pdst=self.nat_addr,
2330 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2331 self.pg2.add_stream(p)
2332 self.pg_enable_capture(self.pg_interfaces)
2334 self.pg1.assert_nothing_captured()
2336 # remove addresses and verify
2337 self.nat44_add_address(self.nat_addr, is_add=0)
2338 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2341 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2342 ARP(op=ARP.who_has, pdst=self.nat_addr,
2343 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2344 self.pg1.add_stream(p)
2345 self.pg_enable_capture(self.pg_interfaces)
2347 self.pg1.assert_nothing_captured()
2349 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2350 ARP(op=ARP.who_has, pdst=static_addr,
2351 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2352 self.pg1.add_stream(p)
2353 self.pg_enable_capture(self.pg_interfaces)
2355 self.pg1.assert_nothing_captured()
2357 def test_vrf_mode(self):
2358 """ NAT44 tenant VRF aware address pool mode """
2362 nat_ip1 = "10.0.0.10"
2363 nat_ip2 = "10.0.0.11"
2365 self.pg0.unconfig_ip4()
2366 self.pg1.unconfig_ip4()
2367 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2368 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2369 self.pg0.set_table_ip4(vrf_id1)
2370 self.pg1.set_table_ip4(vrf_id2)
2371 self.pg0.config_ip4()
2372 self.pg1.config_ip4()
2373 self.pg0.resolve_arp()
2374 self.pg1.resolve_arp()
2376 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2377 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2378 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2379 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2380 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2385 pkts = self.create_stream_in(self.pg0, self.pg2)
2386 self.pg0.add_stream(pkts)
2387 self.pg_enable_capture(self.pg_interfaces)
2389 capture = self.pg2.get_capture(len(pkts))
2390 self.verify_capture_out(capture, nat_ip1)
2393 pkts = self.create_stream_in(self.pg1, self.pg2)
2394 self.pg1.add_stream(pkts)
2395 self.pg_enable_capture(self.pg_interfaces)
2397 capture = self.pg2.get_capture(len(pkts))
2398 self.verify_capture_out(capture, nat_ip2)
2401 self.pg0.unconfig_ip4()
2402 self.pg1.unconfig_ip4()
2403 self.pg0.set_table_ip4(0)
2404 self.pg1.set_table_ip4(0)
2405 self.pg0.config_ip4()
2406 self.pg1.config_ip4()
2407 self.pg0.resolve_arp()
2408 self.pg1.resolve_arp()
2409 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2410 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2412 def test_vrf_feature_independent(self):
2413 """ NAT44 tenant VRF independent address pool mode """
2415 nat_ip1 = "10.0.0.10"
2416 nat_ip2 = "10.0.0.11"
2418 self.nat44_add_address(nat_ip1)
2419 self.nat44_add_address(nat_ip2, vrf_id=99)
2420 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2421 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2422 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2426 pkts = self.create_stream_in(self.pg0, self.pg2)
2427 self.pg0.add_stream(pkts)
2428 self.pg_enable_capture(self.pg_interfaces)
2430 capture = self.pg2.get_capture(len(pkts))
2431 self.verify_capture_out(capture, nat_ip1)
2434 pkts = self.create_stream_in(self.pg1, self.pg2)
2435 self.pg1.add_stream(pkts)
2436 self.pg_enable_capture(self.pg_interfaces)
2438 capture = self.pg2.get_capture(len(pkts))
2439 self.verify_capture_out(capture, nat_ip1)
2441 def test_dynamic_ipless_interfaces(self):
2442 """ NAT44 interfaces without configured IP address """
2444 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2445 mactobinary(self.pg7.remote_mac),
2446 self.pg7.remote_ip4n,
2448 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2449 mactobinary(self.pg8.remote_mac),
2450 self.pg8.remote_ip4n,
2453 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2454 dst_address_length=32,
2455 next_hop_address=self.pg7.remote_ip4n,
2456 next_hop_sw_if_index=self.pg7.sw_if_index)
2457 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2458 dst_address_length=32,
2459 next_hop_address=self.pg8.remote_ip4n,
2460 next_hop_sw_if_index=self.pg8.sw_if_index)
2462 self.nat44_add_address(self.nat_addr)
2463 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2464 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2468 pkts = self.create_stream_in(self.pg7, self.pg8)
2469 self.pg7.add_stream(pkts)
2470 self.pg_enable_capture(self.pg_interfaces)
2472 capture = self.pg8.get_capture(len(pkts))
2473 self.verify_capture_out(capture)
2476 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2477 self.pg8.add_stream(pkts)
2478 self.pg_enable_capture(self.pg_interfaces)
2480 capture = self.pg7.get_capture(len(pkts))
2481 self.verify_capture_in(capture, self.pg7)
2483 def test_static_ipless_interfaces(self):
2484 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2486 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2487 mactobinary(self.pg7.remote_mac),
2488 self.pg7.remote_ip4n,
2490 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2491 mactobinary(self.pg8.remote_mac),
2492 self.pg8.remote_ip4n,
2495 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2496 dst_address_length=32,
2497 next_hop_address=self.pg7.remote_ip4n,
2498 next_hop_sw_if_index=self.pg7.sw_if_index)
2499 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2500 dst_address_length=32,
2501 next_hop_address=self.pg8.remote_ip4n,
2502 next_hop_sw_if_index=self.pg8.sw_if_index)
2504 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2505 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2506 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2510 pkts = self.create_stream_out(self.pg8)
2511 self.pg8.add_stream(pkts)
2512 self.pg_enable_capture(self.pg_interfaces)
2514 capture = self.pg7.get_capture(len(pkts))
2515 self.verify_capture_in(capture, self.pg7)
2518 pkts = self.create_stream_in(self.pg7, self.pg8)
2519 self.pg7.add_stream(pkts)
2520 self.pg_enable_capture(self.pg_interfaces)
2522 capture = self.pg8.get_capture(len(pkts))
2523 self.verify_capture_out(capture, self.nat_addr, True)
2525 def test_static_with_port_ipless_interfaces(self):
2526 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2528 self.tcp_port_out = 30606
2529 self.udp_port_out = 30607
2530 self.icmp_id_out = 30608
2532 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2533 mactobinary(self.pg7.remote_mac),
2534 self.pg7.remote_ip4n,
2536 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2537 mactobinary(self.pg8.remote_mac),
2538 self.pg8.remote_ip4n,
2541 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2542 dst_address_length=32,
2543 next_hop_address=self.pg7.remote_ip4n,
2544 next_hop_sw_if_index=self.pg7.sw_if_index)
2545 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2546 dst_address_length=32,
2547 next_hop_address=self.pg8.remote_ip4n,
2548 next_hop_sw_if_index=self.pg8.sw_if_index)
2550 self.nat44_add_address(self.nat_addr)
2551 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2552 self.tcp_port_in, self.tcp_port_out,
2553 proto=IP_PROTOS.tcp)
2554 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2555 self.udp_port_in, self.udp_port_out,
2556 proto=IP_PROTOS.udp)
2557 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2558 self.icmp_id_in, self.icmp_id_out,
2559 proto=IP_PROTOS.icmp)
2560 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2561 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2565 pkts = self.create_stream_out(self.pg8)
2566 self.pg8.add_stream(pkts)
2567 self.pg_enable_capture(self.pg_interfaces)
2569 capture = self.pg7.get_capture(len(pkts))
2570 self.verify_capture_in(capture, self.pg7)
2573 pkts = self.create_stream_in(self.pg7, self.pg8)
2574 self.pg7.add_stream(pkts)
2575 self.pg_enable_capture(self.pg_interfaces)
2577 capture = self.pg8.get_capture(len(pkts))
2578 self.verify_capture_out(capture)
2580 def test_static_unknown_proto(self):
2581 """ 1:1 NAT translate packet with unknown protocol """
2582 nat_ip = "10.0.0.10"
2583 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2584 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2585 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2589 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2590 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2592 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2593 TCP(sport=1234, dport=1234))
2594 self.pg0.add_stream(p)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 p = self.pg1.get_capture(1)
2600 self.assertEqual(packet[IP].src, nat_ip)
2601 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2602 self.assertTrue(packet.haslayer(GRE))
2603 self.assert_packet_checksums_valid(packet)
2605 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2609 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2610 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2612 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2613 TCP(sport=1234, dport=1234))
2614 self.pg1.add_stream(p)
2615 self.pg_enable_capture(self.pg_interfaces)
2617 p = self.pg0.get_capture(1)
2620 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2621 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2622 self.assertTrue(packet.haslayer(GRE))
2623 self.assert_packet_checksums_valid(packet)
2625 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2628 def test_hairpinning_static_unknown_proto(self):
2629 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2631 host = self.pg0.remote_hosts[0]
2632 server = self.pg0.remote_hosts[1]
2634 host_nat_ip = "10.0.0.10"
2635 server_nat_ip = "10.0.0.11"
2637 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2638 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2639 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2640 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2644 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2645 IP(src=host.ip4, dst=server_nat_ip) /
2647 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2648 TCP(sport=1234, dport=1234))
2649 self.pg0.add_stream(p)
2650 self.pg_enable_capture(self.pg_interfaces)
2652 p = self.pg0.get_capture(1)
2655 self.assertEqual(packet[IP].src, host_nat_ip)
2656 self.assertEqual(packet[IP].dst, server.ip4)
2657 self.assertTrue(packet.haslayer(GRE))
2658 self.assert_packet_checksums_valid(packet)
2660 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2664 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2665 IP(src=server.ip4, dst=host_nat_ip) /
2667 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2668 TCP(sport=1234, dport=1234))
2669 self.pg0.add_stream(p)
2670 self.pg_enable_capture(self.pg_interfaces)
2672 p = self.pg0.get_capture(1)
2675 self.assertEqual(packet[IP].src, server_nat_ip)
2676 self.assertEqual(packet[IP].dst, host.ip4)
2677 self.assertTrue(packet.haslayer(GRE))
2678 self.assert_packet_checksums_valid(packet)
2680 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2683 def test_output_feature(self):
2684 """ NAT44 interface output feature (in2out postrouting) """
2685 self.nat44_add_address(self.nat_addr)
2686 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2687 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2688 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2692 pkts = self.create_stream_in(self.pg0, self.pg3)
2693 self.pg0.add_stream(pkts)
2694 self.pg_enable_capture(self.pg_interfaces)
2696 capture = self.pg3.get_capture(len(pkts))
2697 self.verify_capture_out(capture)
2700 pkts = self.create_stream_out(self.pg3)
2701 self.pg3.add_stream(pkts)
2702 self.pg_enable_capture(self.pg_interfaces)
2704 capture = self.pg0.get_capture(len(pkts))
2705 self.verify_capture_in(capture, self.pg0)
2707 # from non-NAT interface to NAT inside interface
2708 pkts = self.create_stream_in(self.pg2, self.pg0)
2709 self.pg2.add_stream(pkts)
2710 self.pg_enable_capture(self.pg_interfaces)
2712 capture = self.pg0.get_capture(len(pkts))
2713 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2715 def test_output_feature_vrf_aware(self):
2716 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2717 nat_ip_vrf10 = "10.0.0.10"
2718 nat_ip_vrf20 = "10.0.0.20"
2720 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2721 dst_address_length=32,
2722 next_hop_address=self.pg3.remote_ip4n,
2723 next_hop_sw_if_index=self.pg3.sw_if_index,
2725 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2726 dst_address_length=32,
2727 next_hop_address=self.pg3.remote_ip4n,
2728 next_hop_sw_if_index=self.pg3.sw_if_index,
2731 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2732 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2733 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2734 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2735 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2739 pkts = self.create_stream_in(self.pg4, self.pg3)
2740 self.pg4.add_stream(pkts)
2741 self.pg_enable_capture(self.pg_interfaces)
2743 capture = self.pg3.get_capture(len(pkts))
2744 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2747 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2748 self.pg3.add_stream(pkts)
2749 self.pg_enable_capture(self.pg_interfaces)
2751 capture = self.pg4.get_capture(len(pkts))
2752 self.verify_capture_in(capture, self.pg4)
2755 pkts = self.create_stream_in(self.pg6, self.pg3)
2756 self.pg6.add_stream(pkts)
2757 self.pg_enable_capture(self.pg_interfaces)
2759 capture = self.pg3.get_capture(len(pkts))
2760 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2763 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2764 self.pg3.add_stream(pkts)
2765 self.pg_enable_capture(self.pg_interfaces)
2767 capture = self.pg6.get_capture(len(pkts))
2768 self.verify_capture_in(capture, self.pg6)
2770 def test_output_feature_hairpinning(self):
2771 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2772 host = self.pg0.remote_hosts[0]
2773 server = self.pg0.remote_hosts[1]
2776 server_in_port = 5678
2777 server_out_port = 8765
2779 self.nat44_add_address(self.nat_addr)
2780 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2781 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2784 # add static mapping for server
2785 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2786 server_in_port, server_out_port,
2787 proto=IP_PROTOS.tcp)
2789 # send packet from host to server
2790 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2791 IP(src=host.ip4, dst=self.nat_addr) /
2792 TCP(sport=host_in_port, dport=server_out_port))
2793 self.pg0.add_stream(p)
2794 self.pg_enable_capture(self.pg_interfaces)
2796 capture = self.pg0.get_capture(1)
2801 self.assertEqual(ip.src, self.nat_addr)
2802 self.assertEqual(ip.dst, server.ip4)
2803 self.assertNotEqual(tcp.sport, host_in_port)
2804 self.assertEqual(tcp.dport, server_in_port)
2805 self.assert_packet_checksums_valid(p)
2806 host_out_port = tcp.sport
2808 self.logger.error(ppp("Unexpected or invalid packet:", p))
2811 # send reply from server to host
2812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2813 IP(src=server.ip4, dst=self.nat_addr) /
2814 TCP(sport=server_in_port, dport=host_out_port))
2815 self.pg0.add_stream(p)
2816 self.pg_enable_capture(self.pg_interfaces)
2818 capture = self.pg0.get_capture(1)
2823 self.assertEqual(ip.src, self.nat_addr)
2824 self.assertEqual(ip.dst, host.ip4)
2825 self.assertEqual(tcp.sport, server_out_port)
2826 self.assertEqual(tcp.dport, host_in_port)
2827 self.assert_packet_checksums_valid(p)
2829 self.logger.error(ppp("Unexpected or invalid packet:", p))
2832 def test_one_armed_nat44(self):
2833 """ One armed NAT44 """
2834 remote_host = self.pg9.remote_hosts[0]
2835 local_host = self.pg9.remote_hosts[1]
2838 self.nat44_add_address(self.nat_addr)
2839 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2840 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2844 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2845 IP(src=local_host.ip4, dst=remote_host.ip4) /
2846 TCP(sport=12345, dport=80))
2847 self.pg9.add_stream(p)
2848 self.pg_enable_capture(self.pg_interfaces)
2850 capture = self.pg9.get_capture(1)
2855 self.assertEqual(ip.src, self.nat_addr)
2856 self.assertEqual(ip.dst, remote_host.ip4)
2857 self.assertNotEqual(tcp.sport, 12345)
2858 external_port = tcp.sport
2859 self.assertEqual(tcp.dport, 80)
2860 self.assert_packet_checksums_valid(p)
2862 self.logger.error(ppp("Unexpected or invalid packet:", p))
2866 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2867 IP(src=remote_host.ip4, dst=self.nat_addr) /
2868 TCP(sport=80, dport=external_port))
2869 self.pg9.add_stream(p)
2870 self.pg_enable_capture(self.pg_interfaces)
2872 capture = self.pg9.get_capture(1)
2877 self.assertEqual(ip.src, remote_host.ip4)
2878 self.assertEqual(ip.dst, local_host.ip4)
2879 self.assertEqual(tcp.sport, 80)
2880 self.assertEqual(tcp.dport, 12345)
2881 self.assert_packet_checksums_valid(p)
2883 self.logger.error(ppp("Unexpected or invalid packet:", p))
2886 def test_del_session(self):
2887 """ Delete NAT44 session """
2888 self.nat44_add_address(self.nat_addr)
2889 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2890 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2893 pkts = self.create_stream_in(self.pg0, self.pg1)
2894 self.pg0.add_stream(pkts)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 self.pg1.get_capture(len(pkts))
2899 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2900 nsessions = len(sessions)
2902 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2903 sessions[0].inside_port,
2904 sessions[0].protocol)
2905 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2906 sessions[1].outside_port,
2907 sessions[1].protocol,
2910 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2911 self.assertEqual(nsessions - len(sessions), 2)
2913 def test_set_get_reass(self):
2914 """ NAT44 set/get virtual fragmentation reassembly """
2915 reas_cfg1 = self.vapi.nat_get_reass()
2917 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2918 max_reass=reas_cfg1.ip4_max_reass * 2,
2919 max_frag=reas_cfg1.ip4_max_frag * 2)
2921 reas_cfg2 = self.vapi.nat_get_reass()
2923 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2924 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2925 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2927 self.vapi.nat_set_reass(drop_frag=1)
2928 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2930 def test_frag_in_order(self):
2931 """ NAT44 translate fragments arriving in order """
2932 self.nat44_add_address(self.nat_addr)
2933 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2934 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2937 data = "A" * 4 + "B" * 16 + "C" * 3
2938 self.tcp_port_in = random.randint(1025, 65535)
2940 reass = self.vapi.nat_reass_dump()
2941 reass_n_start = len(reass)
2944 pkts = self.create_stream_frag(self.pg0,
2945 self.pg1.remote_ip4,
2949 self.pg0.add_stream(pkts)
2950 self.pg_enable_capture(self.pg_interfaces)
2952 frags = self.pg1.get_capture(len(pkts))
2953 p = self.reass_frags_and_verify(frags,
2955 self.pg1.remote_ip4)
2956 self.assertEqual(p[TCP].dport, 20)
2957 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2958 self.tcp_port_out = p[TCP].sport
2959 self.assertEqual(data, p[Raw].load)
2962 pkts = self.create_stream_frag(self.pg1,
2967 self.pg1.add_stream(pkts)
2968 self.pg_enable_capture(self.pg_interfaces)
2970 frags = self.pg0.get_capture(len(pkts))
2971 p = self.reass_frags_and_verify(frags,
2972 self.pg1.remote_ip4,
2973 self.pg0.remote_ip4)
2974 self.assertEqual(p[TCP].sport, 20)
2975 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2976 self.assertEqual(data, p[Raw].load)
2978 reass = self.vapi.nat_reass_dump()
2979 reass_n_end = len(reass)
2981 self.assertEqual(reass_n_end - reass_n_start, 2)
2983 def test_reass_hairpinning(self):
2984 """ NAT44 fragments hairpinning """
2985 server = self.pg0.remote_hosts[1]
2986 host_in_port = random.randint(1025, 65535)
2987 server_in_port = random.randint(1025, 65535)
2988 server_out_port = random.randint(1025, 65535)
2989 data = "A" * 4 + "B" * 16 + "C" * 3
2991 self.nat44_add_address(self.nat_addr)
2992 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2993 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2995 # add static mapping for server
2996 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2997 server_in_port, server_out_port,
2998 proto=IP_PROTOS.tcp)
3000 # send packet from host to server
3001 pkts = self.create_stream_frag(self.pg0,
3006 self.pg0.add_stream(pkts)
3007 self.pg_enable_capture(self.pg_interfaces)
3009 frags = self.pg0.get_capture(len(pkts))
3010 p = self.reass_frags_and_verify(frags,
3013 self.assertNotEqual(p[TCP].sport, host_in_port)
3014 self.assertEqual(p[TCP].dport, server_in_port)
3015 self.assertEqual(data, p[Raw].load)
3017 def test_frag_out_of_order(self):
3018 """ NAT44 translate fragments arriving out of order """
3019 self.nat44_add_address(self.nat_addr)
3020 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3021 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3024 data = "A" * 4 + "B" * 16 + "C" * 3
3025 random.randint(1025, 65535)
3028 pkts = self.create_stream_frag(self.pg0,
3029 self.pg1.remote_ip4,
3034 self.pg0.add_stream(pkts)
3035 self.pg_enable_capture(self.pg_interfaces)
3037 frags = self.pg1.get_capture(len(pkts))
3038 p = self.reass_frags_and_verify(frags,
3040 self.pg1.remote_ip4)
3041 self.assertEqual(p[TCP].dport, 20)
3042 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3043 self.tcp_port_out = p[TCP].sport
3044 self.assertEqual(data, p[Raw].load)
3047 pkts = self.create_stream_frag(self.pg1,
3053 self.pg1.add_stream(pkts)
3054 self.pg_enable_capture(self.pg_interfaces)
3056 frags = self.pg0.get_capture(len(pkts))
3057 p = self.reass_frags_and_verify(frags,
3058 self.pg1.remote_ip4,
3059 self.pg0.remote_ip4)
3060 self.assertEqual(p[TCP].sport, 20)
3061 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3062 self.assertEqual(data, p[Raw].load)
3064 def test_port_restricted(self):
3065 """ Port restricted NAT44 (MAP-E CE) """
3066 self.nat44_add_address(self.nat_addr)
3067 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3068 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3070 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3071 "psid-offset 6 psid-len 6")
3073 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3074 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3075 TCP(sport=4567, dport=22))
3076 self.pg0.add_stream(p)
3077 self.pg_enable_capture(self.pg_interfaces)
3079 capture = self.pg1.get_capture(1)
3084 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3085 self.assertEqual(ip.src, self.nat_addr)
3086 self.assertEqual(tcp.dport, 22)
3087 self.assertNotEqual(tcp.sport, 4567)
3088 self.assertEqual((tcp.sport >> 6) & 63, 10)
3089 self.assert_packet_checksums_valid(p)
3091 self.logger.error(ppp("Unexpected or invalid packet:", p))
3094 def test_ipfix_max_frags(self):
3095 """ IPFIX logging maximum fragments pending reassembly exceeded """
3096 self.nat44_add_address(self.nat_addr)
3097 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3098 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3100 self.vapi.nat_set_reass(max_frag=0)
3101 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3102 src_address=self.pg3.local_ip4n,
3104 template_interval=10)
3105 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3106 src_port=self.ipfix_src_port)
3108 data = "A" * 4 + "B" * 16 + "C" * 3
3109 self.tcp_port_in = random.randint(1025, 65535)
3110 pkts = self.create_stream_frag(self.pg0,
3111 self.pg1.remote_ip4,
3115 self.pg0.add_stream(pkts[-1])
3116 self.pg_enable_capture(self.pg_interfaces)
3118 self.pg1.assert_nothing_captured()
3120 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3121 capture = self.pg3.get_capture(9)
3122 ipfix = IPFIXDecoder()
3123 # first load template
3125 self.assertTrue(p.haslayer(IPFIX))
3126 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3127 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3128 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3129 self.assertEqual(p[UDP].dport, 4739)
3130 self.assertEqual(p[IPFIX].observationDomainID,
3131 self.ipfix_domain_id)
3132 if p.haslayer(Template):
3133 ipfix.add_template(p.getlayer(Template))
3134 # verify events in data set
3136 if p.haslayer(Data):
3137 data = ipfix.decode_data_set(p.getlayer(Set))
3138 self.verify_ipfix_max_fragments_ip4(data, 0,
3139 self.pg0.remote_ip4n)
3141 def test_multiple_outside_vrf(self):
3142 """ Multiple outside VRF """
3146 self.pg1.unconfig_ip4()
3147 self.pg2.unconfig_ip4()
3148 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3149 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3150 self.pg1.set_table_ip4(vrf_id1)
3151 self.pg2.set_table_ip4(vrf_id2)
3152 self.pg1.config_ip4()
3153 self.pg2.config_ip4()
3154 self.pg1.resolve_arp()
3155 self.pg2.resolve_arp()
3157 self.nat44_add_address(self.nat_addr)
3158 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3159 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3161 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3166 pkts = self.create_stream_in(self.pg0, self.pg1)
3167 self.pg0.add_stream(pkts)
3168 self.pg_enable_capture(self.pg_interfaces)
3170 capture = self.pg1.get_capture(len(pkts))
3171 self.verify_capture_out(capture, self.nat_addr)
3173 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3174 self.pg1.add_stream(pkts)
3175 self.pg_enable_capture(self.pg_interfaces)
3177 capture = self.pg0.get_capture(len(pkts))
3178 self.verify_capture_in(capture, self.pg0)
3180 self.tcp_port_in = 60303
3181 self.udp_port_in = 60304
3182 self.icmp_id_in = 60305
3185 pkts = self.create_stream_in(self.pg0, self.pg2)
3186 self.pg0.add_stream(pkts)
3187 self.pg_enable_capture(self.pg_interfaces)
3189 capture = self.pg2.get_capture(len(pkts))
3190 self.verify_capture_out(capture, self.nat_addr)
3192 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3193 self.pg2.add_stream(pkts)
3194 self.pg_enable_capture(self.pg_interfaces)
3196 capture = self.pg0.get_capture(len(pkts))
3197 self.verify_capture_in(capture, self.pg0)
3200 self.pg1.unconfig_ip4()
3201 self.pg2.unconfig_ip4()
3202 self.pg1.set_table_ip4(0)
3203 self.pg2.set_table_ip4(0)
3204 self.pg1.config_ip4()
3205 self.pg2.config_ip4()
3206 self.pg1.resolve_arp()
3207 self.pg2.resolve_arp()
3210 super(TestNAT44, self).tearDown()
3211 if not self.vpp_dead:
3212 self.logger.info(self.vapi.cli("show nat44 addresses"))
3213 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3214 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3215 self.logger.info(self.vapi.cli("show nat44 interface address"))
3216 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3217 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3218 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3219 self.vapi.cli("nat addr-port-assignment-alg default")
3221 self.vapi.cli("clear logging")
3224 class TestNAT44EndpointDependent(MethodHolder):
3225 """ Endpoint-Dependent mapping and filtering test cases """
3228 def setUpConstants(cls):
3229 super(TestNAT44EndpointDependent, cls).setUpConstants()
3230 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3233 def setUpClass(cls):
3234 super(TestNAT44EndpointDependent, cls).setUpClass()
3235 cls.vapi.cli("set log class nat level debug")
3237 cls.tcp_port_in = 6303
3238 cls.tcp_port_out = 6303
3239 cls.udp_port_in = 6304
3240 cls.udp_port_out = 6304
3241 cls.icmp_id_in = 6305
3242 cls.icmp_id_out = 6305
3243 cls.nat_addr = '10.0.0.3'
3244 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3245 cls.ipfix_src_port = 4739
3246 cls.ipfix_domain_id = 1
3247 cls.tcp_external_port = 80
3249 cls.create_pg_interfaces(range(7))
3250 cls.interfaces = list(cls.pg_interfaces[0:3])
3252 for i in cls.interfaces:
3257 cls.pg0.generate_remote_hosts(3)
3258 cls.pg0.configure_ipv4_neighbors()
3262 cls.pg4.generate_remote_hosts(2)
3263 cls.pg4.config_ip4()
3264 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3265 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3269 cls.pg4.resolve_arp()
3270 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3271 cls.pg4.resolve_arp()
3273 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3274 cls.vapi.ip_table_add_del(1, is_add=1)
3276 cls.pg5._local_ip4 = "10.1.1.1"
3277 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3279 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3280 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3281 socket.AF_INET, cls.pg5.remote_ip4)
3282 cls.pg5.set_table_ip4(1)
3283 cls.pg5.config_ip4()
3285 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3286 dst_address_length=32,
3288 next_hop_sw_if_index=cls.pg5.sw_if_index,
3289 next_hop_address=zero_ip4n)
3291 cls.pg6._local_ip4 = "10.1.2.1"
3292 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3294 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3295 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3296 socket.AF_INET, cls.pg6.remote_ip4)
3297 cls.pg6.set_table_ip4(1)
3298 cls.pg6.config_ip4()
3300 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3301 dst_address_length=32,
3303 next_hop_sw_if_index=cls.pg6.sw_if_index,
3304 next_hop_address=zero_ip4n)
3306 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3307 dst_address_length=16,
3308 next_hop_address=zero_ip4n,
3310 next_hop_table_id=1)
3311 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3312 dst_address_length=0,
3313 next_hop_address=zero_ip4n,
3315 next_hop_table_id=0)
3316 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3317 dst_address_length=0,
3319 next_hop_sw_if_index=cls.pg1.sw_if_index,
3320 next_hop_address=cls.pg1.local_ip4n)
3322 cls.pg5.resolve_arp()
3323 cls.pg6.resolve_arp()
3326 super(TestNAT44EndpointDependent, cls).tearDownClass()
3329 def test_dynamic(self):
3330 """ NAT44 dynamic translation test """
3332 self.nat44_add_address(self.nat_addr)
3333 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3334 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3338 pkts = self.create_stream_in(self.pg0, self.pg1)
3339 self.pg0.add_stream(pkts)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg1.get_capture(len(pkts))
3343 self.verify_capture_out(capture)
3346 pkts = self.create_stream_out(self.pg1)
3347 self.pg1.add_stream(pkts)
3348 self.pg_enable_capture(self.pg_interfaces)
3350 capture = self.pg0.get_capture(len(pkts))
3351 self.verify_capture_in(capture, self.pg0)
3353 def test_forwarding(self):
3354 """ NAT44 forwarding test """
3356 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3357 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3359 self.vapi.nat44_forwarding_enable_disable(1)
3361 real_ip = self.pg0.remote_ip4n
3362 alias_ip = self.nat_addr_n
3363 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3364 external_ip=alias_ip)
3367 # in2out - static mapping match
3369 pkts = self.create_stream_out(self.pg1)
3370 self.pg1.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 capture = self.pg0.get_capture(len(pkts))
3374 self.verify_capture_in(capture, self.pg0)
3376 pkts = self.create_stream_in(self.pg0, self.pg1)
3377 self.pg0.add_stream(pkts)
3378 self.pg_enable_capture(self.pg_interfaces)
3380 capture = self.pg1.get_capture(len(pkts))
3381 self.verify_capture_out(capture, same_port=True)
3383 # in2out - no static mapping match
3385 host0 = self.pg0.remote_hosts[0]
3386 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3388 pkts = self.create_stream_out(self.pg1,
3389 dst_ip=self.pg0.remote_ip4,
3390 use_inside_ports=True)
3391 self.pg1.add_stream(pkts)
3392 self.pg_enable_capture(self.pg_interfaces)
3394 capture = self.pg0.get_capture(len(pkts))
3395 self.verify_capture_in(capture, self.pg0)
3397 pkts = self.create_stream_in(self.pg0, self.pg1)
3398 self.pg0.add_stream(pkts)
3399 self.pg_enable_capture(self.pg_interfaces)
3401 capture = self.pg1.get_capture(len(pkts))
3402 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3405 self.pg0.remote_hosts[0] = host0
3407 user = self.pg0.remote_hosts[1]
3408 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3409 self.assertEqual(len(sessions), 3)
3410 self.assertTrue(sessions[0].ext_host_valid)
3411 self.vapi.nat44_del_session(
3412 sessions[0].inside_ip_address,
3413 sessions[0].inside_port,
3414 sessions[0].protocol,
3415 ext_host_address=sessions[0].ext_host_address,
3416 ext_host_port=sessions[0].ext_host_port)
3417 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3418 self.assertEqual(len(sessions), 2)
3421 self.vapi.nat44_forwarding_enable_disable(0)
3422 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3423 external_ip=alias_ip,
3426 def test_static_lb(self):
3427 """ NAT44 local service load balancing """
3428 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3431 server1 = self.pg0.remote_hosts[0]
3432 server2 = self.pg0.remote_hosts[1]
3434 locals = [{'addr': server1.ip4n,
3437 {'addr': server2.ip4n,
3441 self.nat44_add_address(self.nat_addr)
3442 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3445 local_num=len(locals),
3447 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3448 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3451 # from client to service
3452 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3453 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3454 TCP(sport=12345, dport=external_port))
3455 self.pg1.add_stream(p)
3456 self.pg_enable_capture(self.pg_interfaces)
3458 capture = self.pg0.get_capture(1)
3464 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3465 if ip.dst == server1.ip4:
3469 self.assertEqual(tcp.dport, local_port)
3470 self.assert_packet_checksums_valid(p)
3472 self.logger.error(ppp("Unexpected or invalid packet:", p))
3475 # from service back to client
3476 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3477 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3478 TCP(sport=local_port, dport=12345))
3479 self.pg0.add_stream(p)
3480 self.pg_enable_capture(self.pg_interfaces)
3482 capture = self.pg1.get_capture(1)
3487 self.assertEqual(ip.src, self.nat_addr)
3488 self.assertEqual(tcp.sport, external_port)
3489 self.assert_packet_checksums_valid(p)
3491 self.logger.error(ppp("Unexpected or invalid packet:", p))
3494 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3495 self.assertEqual(len(sessions), 1)
3496 self.assertTrue(sessions[0].ext_host_valid)
3497 self.vapi.nat44_del_session(
3498 sessions[0].inside_ip_address,
3499 sessions[0].inside_port,
3500 sessions[0].protocol,
3501 ext_host_address=sessions[0].ext_host_address,
3502 ext_host_port=sessions[0].ext_host_port)
3503 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3504 self.assertEqual(len(sessions), 0)
3506 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3507 def test_static_lb_multi_clients(self):
3508 """ NAT44 local service load balancing - multiple clients"""
3510 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3513 server1 = self.pg0.remote_hosts[0]
3514 server2 = self.pg0.remote_hosts[1]
3516 locals = [{'addr': server1.ip4n,
3519 {'addr': server2.ip4n,
3523 self.nat44_add_address(self.nat_addr)
3524 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3527 local_num=len(locals),
3529 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3530 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3535 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3537 for client in clients:
3538 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3539 IP(src=client, dst=self.nat_addr) /
3540 TCP(sport=12345, dport=external_port))
3542 self.pg1.add_stream(pkts)
3543 self.pg_enable_capture(self.pg_interfaces)
3545 capture = self.pg0.get_capture(len(pkts))
3547 if p[IP].dst == server1.ip4:
3551 self.assertTrue(server1_n > server2_n)
3553 def test_static_lb_2(self):
3554 """ NAT44 local service load balancing (asymmetrical rule) """
3555 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3558 server1 = self.pg0.remote_hosts[0]
3559 server2 = self.pg0.remote_hosts[1]
3561 locals = [{'addr': server1.ip4n,
3564 {'addr': server2.ip4n,
3568 self.vapi.nat44_forwarding_enable_disable(1)
3569 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3573 local_num=len(locals),
3575 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3576 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3579 # from client to service
3580 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3581 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3582 TCP(sport=12345, dport=external_port))
3583 self.pg1.add_stream(p)
3584 self.pg_enable_capture(self.pg_interfaces)
3586 capture = self.pg0.get_capture(1)
3592 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3593 if ip.dst == server1.ip4:
3597 self.assertEqual(tcp.dport, local_port)
3598 self.assert_packet_checksums_valid(p)
3600 self.logger.error(ppp("Unexpected or invalid packet:", p))
3603 # from service back to client
3604 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3605 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3606 TCP(sport=local_port, dport=12345))
3607 self.pg0.add_stream(p)
3608 self.pg_enable_capture(self.pg_interfaces)
3610 capture = self.pg1.get_capture(1)
3615 self.assertEqual(ip.src, self.nat_addr)
3616 self.assertEqual(tcp.sport, external_port)
3617 self.assert_packet_checksums_valid(p)
3619 self.logger.error(ppp("Unexpected or invalid packet:", p))
3622 # from client to server (no translation)
3623 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3624 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3625 TCP(sport=12346, dport=local_port))
3626 self.pg1.add_stream(p)
3627 self.pg_enable_capture(self.pg_interfaces)
3629 capture = self.pg0.get_capture(1)
3635 self.assertEqual(ip.dst, server1.ip4)
3636 self.assertEqual(tcp.dport, local_port)
3637 self.assert_packet_checksums_valid(p)
3639 self.logger.error(ppp("Unexpected or invalid packet:", p))
3642 # from service back to client (no translation)
3643 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3644 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3645 TCP(sport=local_port, dport=12346))
3646 self.pg0.add_stream(p)
3647 self.pg_enable_capture(self.pg_interfaces)
3649 capture = self.pg1.get_capture(1)
3654 self.assertEqual(ip.src, server1.ip4)
3655 self.assertEqual(tcp.sport, local_port)
3656 self.assert_packet_checksums_valid(p)
3658 self.logger.error(ppp("Unexpected or invalid packet:", p))
3661 def test_unknown_proto(self):
3662 """ NAT44 translate packet with unknown protocol """
3663 self.nat44_add_address(self.nat_addr)
3664 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3665 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3669 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3670 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3671 TCP(sport=self.tcp_port_in, dport=20))
3672 self.pg0.add_stream(p)
3673 self.pg_enable_capture(self.pg_interfaces)
3675 p = self.pg1.get_capture(1)
3677 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3678 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3680 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3681 TCP(sport=1234, dport=1234))
3682 self.pg0.add_stream(p)
3683 self.pg_enable_capture(self.pg_interfaces)
3685 p = self.pg1.get_capture(1)
3688 self.assertEqual(packet[IP].src, self.nat_addr)
3689 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3690 self.assertTrue(packet.haslayer(GRE))
3691 self.assert_packet_checksums_valid(packet)
3693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3697 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3698 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3700 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3701 TCP(sport=1234, dport=1234))
3702 self.pg1.add_stream(p)
3703 self.pg_enable_capture(self.pg_interfaces)
3705 p = self.pg0.get_capture(1)
3708 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3709 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3710 self.assertTrue(packet.haslayer(GRE))
3711 self.assert_packet_checksums_valid(packet)
3713 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3716 def test_hairpinning_unknown_proto(self):
3717 """ NAT44 translate packet with unknown protocol - hairpinning """
3718 host = self.pg0.remote_hosts[0]
3719 server = self.pg0.remote_hosts[1]
3721 server_out_port = 8765
3722 server_nat_ip = "10.0.0.11"
3724 self.nat44_add_address(self.nat_addr)
3725 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3726 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3729 # add static mapping for server
3730 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3733 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3734 IP(src=host.ip4, dst=server_nat_ip) /
3735 TCP(sport=host_in_port, dport=server_out_port))
3736 self.pg0.add_stream(p)
3737 self.pg_enable_capture(self.pg_interfaces)
3739 self.pg0.get_capture(1)
3741 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3742 IP(src=host.ip4, dst=server_nat_ip) /
3744 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3745 TCP(sport=1234, dport=1234))
3746 self.pg0.add_stream(p)
3747 self.pg_enable_capture(self.pg_interfaces)
3749 p = self.pg0.get_capture(1)
3752 self.assertEqual(packet[IP].src, self.nat_addr)
3753 self.assertEqual(packet[IP].dst, server.ip4)
3754 self.assertTrue(packet.haslayer(GRE))
3755 self.assert_packet_checksums_valid(packet)
3757 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3761 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3762 IP(src=server.ip4, dst=self.nat_addr) /
3764 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3765 TCP(sport=1234, dport=1234))
3766 self.pg0.add_stream(p)
3767 self.pg_enable_capture(self.pg_interfaces)
3769 p = self.pg0.get_capture(1)
3772 self.assertEqual(packet[IP].src, server_nat_ip)
3773 self.assertEqual(packet[IP].dst, host.ip4)
3774 self.assertTrue(packet.haslayer(GRE))
3775 self.assert_packet_checksums_valid(packet)
3777 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3780 def test_output_feature_and_service(self):
3781 """ NAT44 interface output feature and services """
3782 external_addr = '1.2.3.4'
3786 self.vapi.nat44_forwarding_enable_disable(1)
3787 self.nat44_add_address(self.nat_addr)
3788 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3789 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3790 local_port, external_port,
3791 proto=IP_PROTOS.tcp, out2in_only=1)
3792 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3793 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3795 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3798 # from client to service
3799 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3800 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3801 TCP(sport=12345, dport=external_port))
3802 self.pg1.add_stream(p)
3803 self.pg_enable_capture(self.pg_interfaces)
3805 capture = self.pg0.get_capture(1)
3810 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3811 self.assertEqual(tcp.dport, local_port)
3812 self.assert_packet_checksums_valid(p)
3814 self.logger.error(ppp("Unexpected or invalid packet:", p))
3817 # from service back to client
3818 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3819 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3820 TCP(sport=local_port, dport=12345))
3821 self.pg0.add_stream(p)
3822 self.pg_enable_capture(self.pg_interfaces)
3824 capture = self.pg1.get_capture(1)
3829 self.assertEqual(ip.src, external_addr)
3830 self.assertEqual(tcp.sport, external_port)
3831 self.assert_packet_checksums_valid(p)
3833 self.logger.error(ppp("Unexpected or invalid packet:", p))
3836 # from local network host to external network
3837 pkts = self.create_stream_in(self.pg0, self.pg1)
3838 self.pg0.add_stream(pkts)
3839 self.pg_enable_capture(self.pg_interfaces)
3841 capture = self.pg1.get_capture(len(pkts))
3842 self.verify_capture_out(capture)
3843 pkts = self.create_stream_in(self.pg0, self.pg1)
3844 self.pg0.add_stream(pkts)
3845 self.pg_enable_capture(self.pg_interfaces)
3847 capture = self.pg1.get_capture(len(pkts))
3848 self.verify_capture_out(capture)
3850 # from external network back to local network host
3851 pkts = self.create_stream_out(self.pg1)
3852 self.pg1.add_stream(pkts)
3853 self.pg_enable_capture(self.pg_interfaces)
3855 capture = self.pg0.get_capture(len(pkts))
3856 self.verify_capture_in(capture, self.pg0)
3858 def test_output_feature_and_service2(self):
3859 """ NAT44 interface output feature and service host direct access """
3860 self.vapi.nat44_forwarding_enable_disable(1)
3861 self.nat44_add_address(self.nat_addr)
3862 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3865 # session initiaded from service host - translate
3866 pkts = self.create_stream_in(self.pg0, self.pg1)
3867 self.pg0.add_stream(pkts)
3868 self.pg_enable_capture(self.pg_interfaces)
3870 capture = self.pg1.get_capture(len(pkts))
3871 self.verify_capture_out(capture)
3873 pkts = self.create_stream_out(self.pg1)
3874 self.pg1.add_stream(pkts)
3875 self.pg_enable_capture(self.pg_interfaces)
3877 capture = self.pg0.get_capture(len(pkts))
3878 self.verify_capture_in(capture, self.pg0)
3880 # session initiaded from remote host - do not translate
3881 self.tcp_port_in = 60303
3882 self.udp_port_in = 60304
3883 self.icmp_id_in = 60305
3884 pkts = self.create_stream_out(self.pg1,
3885 self.pg0.remote_ip4,
3886 use_inside_ports=True)
3887 self.pg1.add_stream(pkts)
3888 self.pg_enable_capture(self.pg_interfaces)
3890 capture = self.pg0.get_capture(len(pkts))
3891 self.verify_capture_in(capture, self.pg0)
3893 pkts = self.create_stream_in(self.pg0, self.pg1)
3894 self.pg0.add_stream(pkts)
3895 self.pg_enable_capture(self.pg_interfaces)
3897 capture = self.pg1.get_capture(len(pkts))
3898 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3901 def test_output_feature_and_service3(self):
3902 """ NAT44 interface output feature and DST NAT """
3903 external_addr = '1.2.3.4'
3907 self.vapi.nat44_forwarding_enable_disable(1)
3908 self.nat44_add_address(self.nat_addr)
3909 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3910 local_port, external_port,
3911 proto=IP_PROTOS.tcp, out2in_only=1)
3912 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3913 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3915 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3918 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3919 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3920 TCP(sport=12345, dport=external_port))
3921 self.pg0.add_stream(p)
3922 self.pg_enable_capture(self.pg_interfaces)
3924 capture = self.pg1.get_capture(1)
3929 self.assertEqual(ip.src, self.pg0.remote_ip4)
3930 self.assertEqual(tcp.sport, 12345)
3931 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3932 self.assertEqual(tcp.dport, local_port)
3933 self.assert_packet_checksums_valid(p)
3935 self.logger.error(ppp("Unexpected or invalid packet:", p))
3938 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3939 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3940 TCP(sport=local_port, dport=12345))
3941 self.pg1.add_stream(p)
3942 self.pg_enable_capture(self.pg_interfaces)
3944 capture = self.pg0.get_capture(1)
3949 self.assertEqual(ip.src, external_addr)
3950 self.assertEqual(tcp.sport, external_port)
3951 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3952 self.assertEqual(tcp.dport, 12345)
3953 self.assert_packet_checksums_valid(p)
3955 self.logger.error(ppp("Unexpected or invalid packet:", p))
3958 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3960 twice_nat_addr = '10.0.1.3'
3968 port_in1 = port_in+1
3969 port_in2 = port_in+2
3974 server1 = self.pg0.remote_hosts[0]
3975 server2 = self.pg0.remote_hosts[1]
3987 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3990 self.nat44_add_address(self.nat_addr)
3991 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3993 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3995 proto=IP_PROTOS.tcp,
3996 twice_nat=int(not self_twice_nat),
3997 self_twice_nat=int(self_twice_nat))
3999 locals = [{'addr': server1.ip4n,
4002 {'addr': server2.ip4n,
4005 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4006 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4010 not self_twice_nat),
4013 local_num=len(locals),
4015 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4016 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4023 assert client_id is not None
4025 client = self.pg0.remote_hosts[0]
4026 elif client_id == 2:
4027 client = self.pg0.remote_hosts[1]
4029 client = pg1.remote_hosts[0]
4030 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4031 IP(src=client.ip4, dst=self.nat_addr) /
4032 TCP(sport=eh_port_out, dport=port_out))
4034 self.pg_enable_capture(self.pg_interfaces)
4036 capture = pg0.get_capture(1)
4042 if ip.dst == server1.ip4:
4048 self.assertEqual(ip.dst, server.ip4)
4050 self.assertIn(tcp.dport, [port_in1, port_in2])
4052 self.assertEqual(tcp.dport, port_in)
4054 self.assertEqual(ip.src, twice_nat_addr)
4055 self.assertNotEqual(tcp.sport, eh_port_out)
4057 self.assertEqual(ip.src, client.ip4)
4058 self.assertEqual(tcp.sport, eh_port_out)
4060 eh_port_in = tcp.sport
4061 saved_port_in = tcp.dport
4062 self.assert_packet_checksums_valid(p)
4064 self.logger.error(ppp("Unexpected or invalid packet:", p))
4067 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4068 IP(src=server.ip4, dst=eh_addr_in) /
4069 TCP(sport=saved_port_in, dport=eh_port_in))
4071 self.pg_enable_capture(self.pg_interfaces)
4073 capture = pg1.get_capture(1)
4078 self.assertEqual(ip.dst, client.ip4)
4079 self.assertEqual(ip.src, self.nat_addr)
4080 self.assertEqual(tcp.dport, eh_port_out)
4081 self.assertEqual(tcp.sport, port_out)
4082 self.assert_packet_checksums_valid(p)
4084 self.logger.error(ppp("Unexpected or invalid packet:", p))
4088 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4089 self.assertEqual(len(sessions), 1)
4090 self.assertTrue(sessions[0].ext_host_valid)
4091 self.assertTrue(sessions[0].is_twicenat)
4092 self.vapi.nat44_del_session(
4093 sessions[0].inside_ip_address,
4094 sessions[0].inside_port,
4095 sessions[0].protocol,
4096 ext_host_address=sessions[0].ext_host_nat_address,
4097 ext_host_port=sessions[0].ext_host_nat_port)
4098 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4099 self.assertEqual(len(sessions), 0)
4101 def test_twice_nat(self):
4103 self.twice_nat_common()
4105 def test_self_twice_nat_positive(self):
4106 """ Self Twice NAT44 (positive test) """
4107 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4109 def test_self_twice_nat_negative(self):
4110 """ Self Twice NAT44 (negative test) """
4111 self.twice_nat_common(self_twice_nat=True)
4113 def test_twice_nat_lb(self):
4114 """ Twice NAT44 local service load balancing """
4115 self.twice_nat_common(lb=True)
4117 def test_self_twice_nat_lb_positive(self):
4118 """ Self Twice NAT44 local service load balancing (positive test) """
4119 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4122 def test_self_twice_nat_lb_negative(self):
4123 """ Self Twice NAT44 local service load balancing (negative test) """
4124 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4127 def test_twice_nat_interface_addr(self):
4128 """ Acquire twice NAT44 addresses from interface """
4129 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4131 # no address in NAT pool
4132 adresses = self.vapi.nat44_address_dump()
4133 self.assertEqual(0, len(adresses))
4135 # configure interface address and check NAT address pool
4136 self.pg3.config_ip4()
4137 adresses = self.vapi.nat44_address_dump()
4138 self.assertEqual(1, len(adresses))
4139 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4140 self.assertEqual(adresses[0].twice_nat, 1)
4142 # remove interface address and check NAT address pool
4143 self.pg3.unconfig_ip4()
4144 adresses = self.vapi.nat44_address_dump()
4145 self.assertEqual(0, len(adresses))
4147 def test_tcp_session_close_in(self):
4148 """ Close TCP session from inside network """
4149 self.tcp_port_out = 10505
4150 self.nat44_add_address(self.nat_addr)
4151 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4155 proto=IP_PROTOS.tcp,
4157 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4158 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4161 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4162 start_sessnum = len(sessions)
4164 self.initiate_tcp_session(self.pg0, self.pg1)
4166 # FIN packet in -> out
4167 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4168 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4169 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4170 flags="FA", seq=100, ack=300))
4171 self.pg0.add_stream(p)
4172 self.pg_enable_capture(self.pg_interfaces)
4174 self.pg1.get_capture(1)
4178 # ACK packet out -> in
4179 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4180 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4181 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4182 flags="A", seq=300, ack=101))
4185 # FIN packet out -> in
4186 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4187 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4188 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4189 flags="FA", seq=300, ack=101))
4192 self.pg1.add_stream(pkts)
4193 self.pg_enable_capture(self.pg_interfaces)
4195 self.pg0.get_capture(2)
4197 # ACK packet in -> out
4198 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4199 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4200 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4201 flags="A", seq=101, ack=301))
4202 self.pg0.add_stream(p)
4203 self.pg_enable_capture(self.pg_interfaces)
4205 self.pg1.get_capture(1)
4207 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4209 self.assertEqual(len(sessions) - start_sessnum, 0)
4211 def test_tcp_session_close_out(self):
4212 """ Close TCP session from outside network """
4213 self.tcp_port_out = 10505
4214 self.nat44_add_address(self.nat_addr)
4215 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4219 proto=IP_PROTOS.tcp,
4221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4225 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4226 start_sessnum = len(sessions)
4228 self.initiate_tcp_session(self.pg0, self.pg1)
4230 # FIN packet out -> in
4231 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4232 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4233 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4234 flags="FA", seq=100, ack=300))
4235 self.pg1.add_stream(p)
4236 self.pg_enable_capture(self.pg_interfaces)
4238 self.pg0.get_capture(1)
4240 # FIN+ACK packet in -> out
4241 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4242 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4243 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4244 flags="FA", seq=300, ack=101))
4246 self.pg0.add_stream(p)
4247 self.pg_enable_capture(self.pg_interfaces)
4249 self.pg1.get_capture(1)
4251 # ACK packet out -> in
4252 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4253 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4254 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4255 flags="A", seq=101, ack=301))
4256 self.pg1.add_stream(p)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 self.pg0.get_capture(1)
4261 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4263 self.assertEqual(len(sessions) - start_sessnum, 0)
4265 def test_tcp_session_close_simultaneous(self):
4266 """ Close TCP session from inside network """
4267 self.tcp_port_out = 10505
4268 self.nat44_add_address(self.nat_addr)
4269 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4273 proto=IP_PROTOS.tcp,
4275 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4276 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4279 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4280 start_sessnum = len(sessions)
4282 self.initiate_tcp_session(self.pg0, self.pg1)
4284 # FIN packet in -> out
4285 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4286 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4287 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4288 flags="FA", seq=100, ack=300))
4289 self.pg0.add_stream(p)
4290 self.pg_enable_capture(self.pg_interfaces)
4292 self.pg1.get_capture(1)
4294 # FIN packet out -> in
4295 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4296 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4297 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4298 flags="FA", seq=300, ack=100))
4299 self.pg1.add_stream(p)
4300 self.pg_enable_capture(self.pg_interfaces)
4302 self.pg0.get_capture(1)
4304 # ACK packet in -> out
4305 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4306 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4307 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4308 flags="A", seq=101, ack=301))
4309 self.pg0.add_stream(p)
4310 self.pg_enable_capture(self.pg_interfaces)
4312 self.pg1.get_capture(1)
4314 # ACK packet out -> in
4315 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4316 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4317 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4318 flags="A", seq=301, ack=101))
4319 self.pg1.add_stream(p)
4320 self.pg_enable_capture(self.pg_interfaces)
4322 self.pg0.get_capture(1)
4324 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4326 self.assertEqual(len(sessions) - start_sessnum, 0)
4328 def test_one_armed_nat44_static(self):
4329 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4330 remote_host = self.pg4.remote_hosts[0]
4331 local_host = self.pg4.remote_hosts[1]
4336 self.vapi.nat44_forwarding_enable_disable(1)
4337 self.nat44_add_address(self.nat_addr, twice_nat=1)
4338 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4339 local_port, external_port,
4340 proto=IP_PROTOS.tcp, out2in_only=1,
4342 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4343 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4346 # from client to service
4347 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4348 IP(src=remote_host.ip4, dst=self.nat_addr) /
4349 TCP(sport=12345, dport=external_port))
4350 self.pg4.add_stream(p)
4351 self.pg_enable_capture(self.pg_interfaces)
4353 capture = self.pg4.get_capture(1)
4358 self.assertEqual(ip.dst, local_host.ip4)
4359 self.assertEqual(ip.src, self.nat_addr)
4360 self.assertEqual(tcp.dport, local_port)
4361 self.assertNotEqual(tcp.sport, 12345)
4362 eh_port_in = tcp.sport
4363 self.assert_packet_checksums_valid(p)
4365 self.logger.error(ppp("Unexpected or invalid packet:", p))
4368 # from service back to client
4369 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4370 IP(src=local_host.ip4, dst=self.nat_addr) /
4371 TCP(sport=local_port, dport=eh_port_in))
4372 self.pg4.add_stream(p)
4373 self.pg_enable_capture(self.pg_interfaces)
4375 capture = self.pg4.get_capture(1)
4380 self.assertEqual(ip.src, self.nat_addr)
4381 self.assertEqual(ip.dst, remote_host.ip4)
4382 self.assertEqual(tcp.sport, external_port)
4383 self.assertEqual(tcp.dport, 12345)
4384 self.assert_packet_checksums_valid(p)
4386 self.logger.error(ppp("Unexpected or invalid packet:", p))
4389 def test_static_with_port_out2(self):
4390 """ 1:1 NAPT asymmetrical rule """
4395 self.vapi.nat44_forwarding_enable_disable(1)
4396 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4397 local_port, external_port,
4398 proto=IP_PROTOS.tcp, out2in_only=1)
4399 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4400 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4403 # from client to service
4404 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4405 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4406 TCP(sport=12345, dport=external_port))
4407 self.pg1.add_stream(p)
4408 self.pg_enable_capture(self.pg_interfaces)
4410 capture = self.pg0.get_capture(1)
4415 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4416 self.assertEqual(tcp.dport, local_port)
4417 self.assert_packet_checksums_valid(p)
4419 self.logger.error(ppp("Unexpected or invalid packet:", p))
4423 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4424 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4425 ICMP(type=11) / capture[0][IP])
4426 self.pg0.add_stream(p)
4427 self.pg_enable_capture(self.pg_interfaces)
4429 capture = self.pg1.get_capture(1)
4432 self.assertEqual(p[IP].src, self.nat_addr)
4434 self.assertEqual(inner.dst, self.nat_addr)
4435 self.assertEqual(inner[TCPerror].dport, external_port)
4437 self.logger.error(ppp("Unexpected or invalid packet:", p))
4440 # from service back to client
4441 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4442 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4443 TCP(sport=local_port, dport=12345))
4444 self.pg0.add_stream(p)
4445 self.pg_enable_capture(self.pg_interfaces)
4447 capture = self.pg1.get_capture(1)
4452 self.assertEqual(ip.src, self.nat_addr)
4453 self.assertEqual(tcp.sport, external_port)
4454 self.assert_packet_checksums_valid(p)
4456 self.logger.error(ppp("Unexpected or invalid packet:", p))
4460 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4461 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4462 ICMP(type=11) / capture[0][IP])
4463 self.pg1.add_stream(p)
4464 self.pg_enable_capture(self.pg_interfaces)
4466 capture = self.pg0.get_capture(1)
4469 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4471 self.assertEqual(inner.src, self.pg0.remote_ip4)
4472 self.assertEqual(inner[TCPerror].sport, local_port)
4474 self.logger.error(ppp("Unexpected or invalid packet:", p))
4477 # from client to server (no translation)
4478 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4479 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4480 TCP(sport=12346, dport=local_port))
4481 self.pg1.add_stream(p)
4482 self.pg_enable_capture(self.pg_interfaces)
4484 capture = self.pg0.get_capture(1)
4489 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4490 self.assertEqual(tcp.dport, local_port)
4491 self.assert_packet_checksums_valid(p)
4493 self.logger.error(ppp("Unexpected or invalid packet:", p))
4496 # from service back to client (no translation)
4497 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4498 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4499 TCP(sport=local_port, dport=12346))
4500 self.pg0.add_stream(p)
4501 self.pg_enable_capture(self.pg_interfaces)
4503 capture = self.pg1.get_capture(1)
4508 self.assertEqual(ip.src, self.pg0.remote_ip4)
4509 self.assertEqual(tcp.sport, local_port)
4510 self.assert_packet_checksums_valid(p)
4512 self.logger.error(ppp("Unexpected or invalid packet:", p))
4515 def test_output_feature(self):
4516 """ NAT44 interface output feature (in2out postrouting) """
4517 self.vapi.nat44_forwarding_enable_disable(1)
4518 self.nat44_add_address(self.nat_addr)
4519 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4521 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4525 pkts = self.create_stream_in(self.pg0, self.pg1)
4526 self.pg0.add_stream(pkts)
4527 self.pg_enable_capture(self.pg_interfaces)
4529 capture = self.pg1.get_capture(len(pkts))
4530 self.verify_capture_out(capture)
4533 pkts = self.create_stream_out(self.pg1)
4534 self.pg1.add_stream(pkts)
4535 self.pg_enable_capture(self.pg_interfaces)
4537 capture = self.pg0.get_capture(len(pkts))
4538 self.verify_capture_in(capture, self.pg0)
4540 def test_multiple_vrf(self):
4541 """ Multiple VRF setup """
4542 external_addr = '1.2.3.4'
4547 self.vapi.nat44_forwarding_enable_disable(1)
4548 self.nat44_add_address(self.nat_addr)
4549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4550 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4552 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4554 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4555 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4557 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4559 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4560 local_port, external_port, vrf_id=1,
4561 proto=IP_PROTOS.tcp, out2in_only=1)
4562 self.nat44_add_static_mapping(
4563 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4564 local_port=local_port, vrf_id=0, external_port=external_port,
4565 proto=IP_PROTOS.tcp, out2in_only=1)
4567 # from client to service (both VRF1)
4568 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4569 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4570 TCP(sport=12345, dport=external_port))
4571 self.pg6.add_stream(p)
4572 self.pg_enable_capture(self.pg_interfaces)
4574 capture = self.pg5.get_capture(1)
4579 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4580 self.assertEqual(tcp.dport, local_port)
4581 self.assert_packet_checksums_valid(p)
4583 self.logger.error(ppp("Unexpected or invalid packet:", p))
4586 # from service back to client (both VRF1)
4587 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4588 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4589 TCP(sport=local_port, dport=12345))
4590 self.pg5.add_stream(p)
4591 self.pg_enable_capture(self.pg_interfaces)
4593 capture = self.pg6.get_capture(1)
4598 self.assertEqual(ip.src, external_addr)
4599 self.assertEqual(tcp.sport, external_port)
4600 self.assert_packet_checksums_valid(p)
4602 self.logger.error(ppp("Unexpected or invalid packet:", p))
4605 # dynamic NAT from VRF1 to VRF0 (output-feature)
4606 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4607 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4608 TCP(sport=2345, dport=22))
4609 self.pg5.add_stream(p)
4610 self.pg_enable_capture(self.pg_interfaces)
4612 capture = self.pg1.get_capture(1)
4617 self.assertEqual(ip.src, self.nat_addr)
4618 self.assertNotEqual(tcp.sport, 2345)
4619 self.assert_packet_checksums_valid(p)
4622 self.logger.error(ppp("Unexpected or invalid packet:", p))
4625 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4626 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4627 TCP(sport=22, dport=port))
4628 self.pg1.add_stream(p)
4629 self.pg_enable_capture(self.pg_interfaces)
4631 capture = self.pg5.get_capture(1)
4636 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4637 self.assertEqual(tcp.dport, 2345)
4638 self.assert_packet_checksums_valid(p)
4640 self.logger.error(ppp("Unexpected or invalid packet:", p))
4643 # from client VRF1 to service VRF0
4644 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4645 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4646 TCP(sport=12346, dport=external_port))
4647 self.pg6.add_stream(p)
4648 self.pg_enable_capture(self.pg_interfaces)
4650 capture = self.pg0.get_capture(1)
4655 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4656 self.assertEqual(tcp.dport, local_port)
4657 self.assert_packet_checksums_valid(p)
4659 self.logger.error(ppp("Unexpected or invalid packet:", p))
4662 # from service VRF0 back to client VRF1
4663 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4664 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4665 TCP(sport=local_port, dport=12346))
4666 self.pg0.add_stream(p)
4667 self.pg_enable_capture(self.pg_interfaces)
4669 capture = self.pg6.get_capture(1)
4674 self.assertEqual(ip.src, self.pg0.local_ip4)
4675 self.assertEqual(tcp.sport, external_port)
4676 self.assert_packet_checksums_valid(p)
4678 self.logger.error(ppp("Unexpected or invalid packet:", p))
4681 # from client VRF0 to service VRF1
4682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4683 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4684 TCP(sport=12347, dport=external_port))
4685 self.pg0.add_stream(p)
4686 self.pg_enable_capture(self.pg_interfaces)
4688 capture = self.pg5.get_capture(1)
4693 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4694 self.assertEqual(tcp.dport, local_port)
4695 self.assert_packet_checksums_valid(p)
4697 self.logger.error(ppp("Unexpected or invalid packet:", p))
4700 # from service VRF1 back to client VRF0
4701 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4702 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4703 TCP(sport=local_port, dport=12347))
4704 self.pg5.add_stream(p)
4705 self.pg_enable_capture(self.pg_interfaces)
4707 capture = self.pg0.get_capture(1)
4712 self.assertEqual(ip.src, external_addr)
4713 self.assertEqual(tcp.sport, external_port)
4714 self.assert_packet_checksums_valid(p)
4716 self.logger.error(ppp("Unexpected or invalid packet:", p))
4719 # from client to server (both VRF1, no translation)
4720 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4721 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4722 TCP(sport=12348, dport=local_port))
4723 self.pg6.add_stream(p)
4724 self.pg_enable_capture(self.pg_interfaces)
4726 capture = self.pg5.get_capture(1)
4731 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4732 self.assertEqual(tcp.dport, local_port)
4733 self.assert_packet_checksums_valid(p)
4735 self.logger.error(ppp("Unexpected or invalid packet:", p))
4738 # from server back to client (both VRF1, no translation)
4739 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4740 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4741 TCP(sport=local_port, dport=12348))
4742 self.pg5.add_stream(p)
4743 self.pg_enable_capture(self.pg_interfaces)
4745 capture = self.pg6.get_capture(1)
4750 self.assertEqual(ip.src, self.pg5.remote_ip4)
4751 self.assertEqual(tcp.sport, local_port)
4752 self.assert_packet_checksums_valid(p)
4754 self.logger.error(ppp("Unexpected or invalid packet:", p))
4757 # from client VRF1 to server VRF0 (no translation)
4758 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4759 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4760 TCP(sport=local_port, dport=12349))
4761 self.pg0.add_stream(p)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 capture = self.pg6.get_capture(1)
4769 self.assertEqual(ip.src, self.pg0.remote_ip4)
4770 self.assertEqual(tcp.sport, local_port)
4771 self.assert_packet_checksums_valid(p)
4773 self.logger.error(ppp("Unexpected or invalid packet:", p))
4776 # from server VRF0 back to client VRF1 (no translation)
4777 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4778 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4779 TCP(sport=local_port, dport=12349))
4780 self.pg0.add_stream(p)
4781 self.pg_enable_capture(self.pg_interfaces)
4783 capture = self.pg6.get_capture(1)
4788 self.assertEqual(ip.src, self.pg0.remote_ip4)
4789 self.assertEqual(tcp.sport, local_port)
4790 self.assert_packet_checksums_valid(p)
4792 self.logger.error(ppp("Unexpected or invalid packet:", p))
4795 # from client VRF0 to server VRF1 (no translation)
4796 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4797 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4798 TCP(sport=12344, dport=local_port))
4799 self.pg0.add_stream(p)
4800 self.pg_enable_capture(self.pg_interfaces)
4802 capture = self.pg5.get_capture(1)
4807 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4808 self.assertEqual(tcp.dport, local_port)
4809 self.assert_packet_checksums_valid(p)
4811 self.logger.error(ppp("Unexpected or invalid packet:", p))
4814 # from server VRF1 back to client VRF0 (no translation)
4815 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4816 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4817 TCP(sport=local_port, dport=12344))
4818 self.pg5.add_stream(p)
4819 self.pg_enable_capture(self.pg_interfaces)
4821 capture = self.pg0.get_capture(1)
4826 self.assertEqual(ip.src, self.pg5.remote_ip4)
4827 self.assertEqual(tcp.sport, local_port)
4828 self.assert_packet_checksums_valid(p)
4830 self.logger.error(ppp("Unexpected or invalid packet:", p))
4834 super(TestNAT44EndpointDependent, self).tearDown()
4835 if not self.vpp_dead:
4836 self.logger.info(self.vapi.cli("show nat44 addresses"))
4837 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4838 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4839 self.logger.info(self.vapi.cli("show nat44 interface address"))
4840 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4841 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4843 self.vapi.cli("clear logging")
4846 class TestNAT44Out2InDPO(MethodHolder):
4847 """ NAT44 Test Cases using out2in DPO """
4850 def setUpConstants(cls):
4851 super(TestNAT44Out2InDPO, cls).setUpConstants()
4852 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4855 def setUpClass(cls):
4856 super(TestNAT44Out2InDPO, cls).setUpClass()
4857 cls.vapi.cli("set log class nat level debug")
4860 cls.tcp_port_in = 6303
4861 cls.tcp_port_out = 6303
4862 cls.udp_port_in = 6304
4863 cls.udp_port_out = 6304
4864 cls.icmp_id_in = 6305
4865 cls.icmp_id_out = 6305
4866 cls.nat_addr = '10.0.0.3'
4867 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4868 cls.dst_ip4 = '192.168.70.1'
4870 cls.create_pg_interfaces(range(2))
4873 cls.pg0.config_ip4()
4874 cls.pg0.resolve_arp()
4877 cls.pg1.config_ip6()
4878 cls.pg1.resolve_ndp()
4880 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4881 dst_address_length=0,
4882 next_hop_address=cls.pg1.remote_ip6n,
4883 next_hop_sw_if_index=cls.pg1.sw_if_index)
4886 super(TestNAT44Out2InDPO, cls).tearDownClass()
4889 def configure_xlat(self):
4890 self.dst_ip6_pfx = '1:2:3::'
4891 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4893 self.dst_ip6_pfx_len = 96
4894 self.src_ip6_pfx = '4:5:6::'
4895 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4897 self.src_ip6_pfx_len = 96
4898 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4899 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4900 '\x00\x00\x00\x00', 0, is_translation=1,
4903 def test_464xlat_ce(self):
4904 """ Test 464XLAT CE with NAT44 """
4906 self.configure_xlat()
4908 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4909 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4911 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4912 self.dst_ip6_pfx_len)
4913 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4914 self.src_ip6_pfx_len)
4917 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4918 self.pg0.add_stream(pkts)
4919 self.pg_enable_capture(self.pg_interfaces)
4921 capture = self.pg1.get_capture(len(pkts))
4922 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4925 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4927 self.pg1.add_stream(pkts)
4928 self.pg_enable_capture(self.pg_interfaces)
4930 capture = self.pg0.get_capture(len(pkts))
4931 self.verify_capture_in(capture, self.pg0)
4933 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4935 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4936 self.nat_addr_n, is_add=0)
4938 def test_464xlat_ce_no_nat(self):
4939 """ Test 464XLAT CE without NAT44 """
4941 self.configure_xlat()
4943 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4944 self.dst_ip6_pfx_len)
4945 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4946 self.src_ip6_pfx_len)
4948 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4949 self.pg0.add_stream(pkts)
4950 self.pg_enable_capture(self.pg_interfaces)
4952 capture = self.pg1.get_capture(len(pkts))
4953 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4954 nat_ip=out_dst_ip6, same_port=True)
4956 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4957 self.pg1.add_stream(pkts)
4958 self.pg_enable_capture(self.pg_interfaces)
4960 capture = self.pg0.get_capture(len(pkts))
4961 self.verify_capture_in(capture, self.pg0)
4964 class TestDeterministicNAT(MethodHolder):
4965 """ Deterministic NAT Test Cases """
4968 def setUpConstants(cls):
4969 super(TestDeterministicNAT, cls).setUpConstants()
4970 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4973 def setUpClass(cls):
4974 super(TestDeterministicNAT, cls).setUpClass()
4975 cls.vapi.cli("set log class nat level debug")
4978 cls.tcp_port_in = 6303
4979 cls.tcp_external_port = 6303
4980 cls.udp_port_in = 6304
4981 cls.udp_external_port = 6304
4982 cls.icmp_id_in = 6305
4983 cls.nat_addr = '10.0.0.3'
4985 cls.create_pg_interfaces(range(3))
4986 cls.interfaces = list(cls.pg_interfaces)
4988 for i in cls.interfaces:
4993 cls.pg0.generate_remote_hosts(2)
4994 cls.pg0.configure_ipv4_neighbors()
4997 super(TestDeterministicNAT, cls).tearDownClass()
5000 def create_stream_in(self, in_if, out_if, ttl=64):
5002 Create packet stream for inside network
5004 :param in_if: Inside interface
5005 :param out_if: Outside interface
5006 :param ttl: TTL of generated packets
5010 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5011 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5012 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5016 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5017 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5018 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5022 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5023 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5024 ICMP(id=self.icmp_id_in, type='echo-request'))
5029 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5031 Create packet stream for outside network
5033 :param out_if: Outside interface
5034 :param dst_ip: Destination IP address (Default use global NAT address)
5035 :param ttl: TTL of generated packets
5038 dst_ip = self.nat_addr
5041 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5042 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5043 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5047 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5048 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5049 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5053 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5054 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5055 ICMP(id=self.icmp_external_id, type='echo-reply'))
5060 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5062 Verify captured packets on outside network
5064 :param capture: Captured packets
5065 :param nat_ip: Translated IP address (Default use global NAT address)
5066 :param same_port: Sorce port number is not translated (Default False)
5067 :param packet_num: Expected number of packets (Default 3)
5070 nat_ip = self.nat_addr
5071 self.assertEqual(packet_num, len(capture))
5072 for packet in capture:
5074 self.assertEqual(packet[IP].src, nat_ip)
5075 if packet.haslayer(TCP):
5076 self.tcp_port_out = packet[TCP].sport
5077 elif packet.haslayer(UDP):
5078 self.udp_port_out = packet[UDP].sport
5080 self.icmp_external_id = packet[ICMP].id
5082 self.logger.error(ppp("Unexpected or invalid packet "
5083 "(outside network):", packet))
5086 def verify_ipfix_max_entries_per_user(self, data):
5088 Verify IPFIX maximum entries per user exceeded event
5090 :param data: Decoded IPFIX data records
5092 self.assertEqual(1, len(data))
5095 self.assertEqual(ord(record[230]), 13)
5096 # natQuotaExceededEvent
5097 self.assertEqual('\x03\x00\x00\x00', record[466])
5099 self.assertEqual('\xe8\x03\x00\x00', record[473])
5101 self.assertEqual(self.pg0.remote_ip4n, record[8])
5103 def test_deterministic_mode(self):
5104 """ NAT plugin run deterministic mode """
5105 in_addr = '172.16.255.0'
5106 out_addr = '172.17.255.50'
5107 in_addr_t = '172.16.255.20'
5108 in_addr_n = socket.inet_aton(in_addr)
5109 out_addr_n = socket.inet_aton(out_addr)
5110 in_addr_t_n = socket.inet_aton(in_addr_t)
5114 nat_config = self.vapi.nat_show_config()
5115 self.assertEqual(1, nat_config.deterministic)
5117 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5119 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5120 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5121 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5122 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5124 deterministic_mappings = self.vapi.nat_det_map_dump()
5125 self.assertEqual(len(deterministic_mappings), 1)
5126 dsm = deterministic_mappings[0]
5127 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5128 self.assertEqual(in_plen, dsm.in_plen)
5129 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5130 self.assertEqual(out_plen, dsm.out_plen)
5132 self.clear_nat_det()
5133 deterministic_mappings = self.vapi.nat_det_map_dump()
5134 self.assertEqual(len(deterministic_mappings), 0)
5136 def test_set_timeouts(self):
5137 """ Set deterministic NAT timeouts """
5138 timeouts_before = self.vapi.nat_det_get_timeouts()
5140 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
5141 timeouts_before.tcp_established + 10,
5142 timeouts_before.tcp_transitory + 10,
5143 timeouts_before.icmp + 10)
5145 timeouts_after = self.vapi.nat_det_get_timeouts()
5147 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5148 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5149 self.assertNotEqual(timeouts_before.tcp_established,
5150 timeouts_after.tcp_established)
5151 self.assertNotEqual(timeouts_before.tcp_transitory,
5152 timeouts_after.tcp_transitory)
5154 def test_det_in(self):
5155 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5157 nat_ip = "10.0.0.10"
5159 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5161 socket.inet_aton(nat_ip),
5163 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5164 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5168 pkts = self.create_stream_in(self.pg0, self.pg1)
5169 self.pg0.add_stream(pkts)
5170 self.pg_enable_capture(self.pg_interfaces)
5172 capture = self.pg1.get_capture(len(pkts))
5173 self.verify_capture_out(capture, nat_ip)
5176 pkts = self.create_stream_out(self.pg1, nat_ip)
5177 self.pg1.add_stream(pkts)
5178 self.pg_enable_capture(self.pg_interfaces)
5180 capture = self.pg0.get_capture(len(pkts))
5181 self.verify_capture_in(capture, self.pg0)
5184 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5185 self.assertEqual(len(sessions), 3)
5189 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5190 self.assertEqual(s.in_port, self.tcp_port_in)
5191 self.assertEqual(s.out_port, self.tcp_port_out)
5192 self.assertEqual(s.ext_port, self.tcp_external_port)
5196 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5197 self.assertEqual(s.in_port, self.udp_port_in)
5198 self.assertEqual(s.out_port, self.udp_port_out)
5199 self.assertEqual(s.ext_port, self.udp_external_port)
5203 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5204 self.assertEqual(s.in_port, self.icmp_id_in)
5205 self.assertEqual(s.out_port, self.icmp_external_id)
5207 def test_multiple_users(self):
5208 """ Deterministic NAT multiple users """
5210 nat_ip = "10.0.0.10"
5212 external_port = 6303
5214 host0 = self.pg0.remote_hosts[0]
5215 host1 = self.pg0.remote_hosts[1]
5217 self.vapi.nat_det_add_del_map(host0.ip4n,
5219 socket.inet_aton(nat_ip),
5221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5226 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5227 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5228 TCP(sport=port_in, dport=external_port))
5229 self.pg0.add_stream(p)
5230 self.pg_enable_capture(self.pg_interfaces)
5232 capture = self.pg1.get_capture(1)
5237 self.assertEqual(ip.src, nat_ip)
5238 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5239 self.assertEqual(tcp.dport, external_port)
5240 port_out0 = tcp.sport
5242 self.logger.error(ppp("Unexpected or invalid packet:", p))
5246 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5247 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5248 TCP(sport=port_in, dport=external_port))
5249 self.pg0.add_stream(p)
5250 self.pg_enable_capture(self.pg_interfaces)
5252 capture = self.pg1.get_capture(1)
5257 self.assertEqual(ip.src, nat_ip)
5258 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5259 self.assertEqual(tcp.dport, external_port)
5260 port_out1 = tcp.sport
5262 self.logger.error(ppp("Unexpected or invalid packet:", p))
5265 dms = self.vapi.nat_det_map_dump()
5266 self.assertEqual(1, len(dms))
5267 self.assertEqual(2, dms[0].ses_num)
5270 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5271 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5272 TCP(sport=external_port, dport=port_out0))
5273 self.pg1.add_stream(p)
5274 self.pg_enable_capture(self.pg_interfaces)
5276 capture = self.pg0.get_capture(1)
5281 self.assertEqual(ip.src, self.pg1.remote_ip4)
5282 self.assertEqual(ip.dst, host0.ip4)
5283 self.assertEqual(tcp.dport, port_in)
5284 self.assertEqual(tcp.sport, external_port)
5286 self.logger.error(ppp("Unexpected or invalid packet:", p))
5290 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5291 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5292 TCP(sport=external_port, dport=port_out1))
5293 self.pg1.add_stream(p)
5294 self.pg_enable_capture(self.pg_interfaces)
5296 capture = self.pg0.get_capture(1)
5301 self.assertEqual(ip.src, self.pg1.remote_ip4)
5302 self.assertEqual(ip.dst, host1.ip4)
5303 self.assertEqual(tcp.dport, port_in)
5304 self.assertEqual(tcp.sport, external_port)
5306 self.logger.error(ppp("Unexpected or invalid packet", p))
5309 # session close api test
5310 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5312 self.pg1.remote_ip4n,
5314 dms = self.vapi.nat_det_map_dump()
5315 self.assertEqual(dms[0].ses_num, 1)
5317 self.vapi.nat_det_close_session_in(host0.ip4n,
5319 self.pg1.remote_ip4n,
5321 dms = self.vapi.nat_det_map_dump()
5322 self.assertEqual(dms[0].ses_num, 0)
5324 def test_tcp_session_close_detection_in(self):
5325 """ Deterministic NAT TCP session close from inside network """
5326 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5328 socket.inet_aton(self.nat_addr),
5330 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5331 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5334 self.initiate_tcp_session(self.pg0, self.pg1)
5336 # close the session from inside
5338 # FIN packet in -> out
5339 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5340 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5341 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5343 self.pg0.add_stream(p)
5344 self.pg_enable_capture(self.pg_interfaces)
5346 self.pg1.get_capture(1)
5350 # ACK packet out -> in
5351 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5352 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5353 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5357 # FIN packet out -> in
5358 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5359 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5360 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5364 self.pg1.add_stream(pkts)
5365 self.pg_enable_capture(self.pg_interfaces)
5367 self.pg0.get_capture(2)
5369 # ACK packet in -> out
5370 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5371 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5372 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5374 self.pg0.add_stream(p)
5375 self.pg_enable_capture(self.pg_interfaces)
5377 self.pg1.get_capture(1)
5379 # Check if deterministic NAT44 closed the session
5380 dms = self.vapi.nat_det_map_dump()
5381 self.assertEqual(0, dms[0].ses_num)
5383 self.logger.error("TCP session termination failed")
5386 def test_tcp_session_close_detection_out(self):
5387 """ Deterministic NAT TCP session close from outside network """
5388 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5390 socket.inet_aton(self.nat_addr),
5392 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5393 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5396 self.initiate_tcp_session(self.pg0, self.pg1)
5398 # close the session from outside
5400 # FIN packet out -> in
5401 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5402 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5403 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5405 self.pg1.add_stream(p)
5406 self.pg_enable_capture(self.pg_interfaces)
5408 self.pg0.get_capture(1)
5412 # ACK packet in -> out
5413 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5414 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5415 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5419 # ACK packet in -> out
5420 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5421 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5422 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5426 self.pg0.add_stream(pkts)
5427 self.pg_enable_capture(self.pg_interfaces)
5429 self.pg1.get_capture(2)
5431 # ACK packet out -> in
5432 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5433 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5434 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5436 self.pg1.add_stream(p)
5437 self.pg_enable_capture(self.pg_interfaces)
5439 self.pg0.get_capture(1)
5441 # Check if deterministic NAT44 closed the session
5442 dms = self.vapi.nat_det_map_dump()
5443 self.assertEqual(0, dms[0].ses_num)
5445 self.logger.error("TCP session termination failed")
5448 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5449 def test_session_timeout(self):
5450 """ Deterministic NAT session timeouts """
5451 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5453 socket.inet_aton(self.nat_addr),
5455 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5456 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5459 self.initiate_tcp_session(self.pg0, self.pg1)
5460 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5461 pkts = self.create_stream_in(self.pg0, self.pg1)
5462 self.pg0.add_stream(pkts)
5463 self.pg_enable_capture(self.pg_interfaces)
5465 capture = self.pg1.get_capture(len(pkts))
5468 dms = self.vapi.nat_det_map_dump()
5469 self.assertEqual(0, dms[0].ses_num)
5471 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5472 def test_session_limit_per_user(self):
5473 """ Deterministic NAT maximum sessions per user limit """
5474 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5476 socket.inet_aton(self.nat_addr),
5478 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5479 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5481 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5482 src_address=self.pg2.local_ip4n,
5484 template_interval=10)
5485 self.vapi.nat_ipfix()
5488 for port in range(1025, 2025):
5489 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5490 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5491 UDP(sport=port, dport=port))
5494 self.pg0.add_stream(pkts)
5495 self.pg_enable_capture(self.pg_interfaces)
5497 capture = self.pg1.get_capture(len(pkts))
5499 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5500 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5501 UDP(sport=3001, dport=3002))
5502 self.pg0.add_stream(p)
5503 self.pg_enable_capture(self.pg_interfaces)
5505 capture = self.pg1.assert_nothing_captured()
5507 # verify ICMP error packet
5508 capture = self.pg0.get_capture(1)
5510 self.assertTrue(p.haslayer(ICMP))
5512 self.assertEqual(icmp.type, 3)
5513 self.assertEqual(icmp.code, 1)
5514 self.assertTrue(icmp.haslayer(IPerror))
5515 inner_ip = icmp[IPerror]
5516 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5517 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5519 dms = self.vapi.nat_det_map_dump()
5521 self.assertEqual(1000, dms[0].ses_num)
5523 # verify IPFIX logging
5524 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5526 capture = self.pg2.get_capture(2)
5527 ipfix = IPFIXDecoder()
5528 # first load template
5530 self.assertTrue(p.haslayer(IPFIX))
5531 if p.haslayer(Template):
5532 ipfix.add_template(p.getlayer(Template))
5533 # verify events in data set
5535 if p.haslayer(Data):
5536 data = ipfix.decode_data_set(p.getlayer(Set))
5537 self.verify_ipfix_max_entries_per_user(data)
5539 def clear_nat_det(self):
5541 Clear deterministic NAT configuration.
5543 self.vapi.nat_ipfix(enable=0)
5544 self.vapi.nat_det_set_timeouts()
5545 deterministic_mappings = self.vapi.nat_det_map_dump()
5546 for dsm in deterministic_mappings:
5547 self.vapi.nat_det_add_del_map(dsm.in_addr,
5553 interfaces = self.vapi.nat44_interface_dump()
5554 for intf in interfaces:
5555 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5560 super(TestDeterministicNAT, self).tearDown()
5561 if not self.vpp_dead:
5562 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5564 self.vapi.cli("show nat44 deterministic mappings"))
5566 self.vapi.cli("show nat44 deterministic timeouts"))
5568 self.vapi.cli("show nat44 deterministic sessions"))
5569 self.clear_nat_det()
5572 class TestNAT64(MethodHolder):
5573 """ NAT64 Test Cases """
5576 def setUpConstants(cls):
5577 super(TestNAT64, cls).setUpConstants()
5578 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5579 "nat64 st hash buckets 256", "}"])
5582 def setUpClass(cls):
5583 super(TestNAT64, cls).setUpClass()
5586 cls.tcp_port_in = 6303
5587 cls.tcp_port_out = 6303
5588 cls.udp_port_in = 6304
5589 cls.udp_port_out = 6304
5590 cls.icmp_id_in = 6305
5591 cls.icmp_id_out = 6305
5592 cls.nat_addr = '10.0.0.3'
5593 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5595 cls.vrf1_nat_addr = '10.0.10.3'
5596 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5598 cls.ipfix_src_port = 4739
5599 cls.ipfix_domain_id = 1
5601 cls.create_pg_interfaces(range(6))
5602 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5603 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5604 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5606 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5608 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5610 cls.pg0.generate_remote_hosts(2)
5612 for i in cls.ip6_interfaces:
5615 i.configure_ipv6_neighbors()
5617 for i in cls.ip4_interfaces:
5623 cls.pg3.config_ip4()
5624 cls.pg3.resolve_arp()
5625 cls.pg3.config_ip6()
5626 cls.pg3.configure_ipv6_neighbors()
5629 cls.pg5.config_ip6()
5632 super(TestNAT64, cls).tearDownClass()
5635 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5636 """ NAT64 inside interface handles Neighbor Advertisement """
5638 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5641 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5642 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5643 ICMPv6EchoRequest())
5645 self.pg5.add_stream(pkts)
5646 self.pg_enable_capture(self.pg_interfaces)
5649 # Wait for Neighbor Solicitation
5650 capture = self.pg5.get_capture(len(pkts))
5651 self.assertEqual(1, len(capture))
5654 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5655 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5656 tgt = packet[ICMPv6ND_NS].tgt
5658 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5661 # Send Neighbor Advertisement
5662 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5663 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5664 ICMPv6ND_NA(tgt=tgt) /
5665 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5667 self.pg5.add_stream(pkts)
5668 self.pg_enable_capture(self.pg_interfaces)
5671 # Try to send ping again
5673 self.pg5.add_stream(pkts)
5674 self.pg_enable_capture(self.pg_interfaces)
5677 # Wait for ping reply
5678 capture = self.pg5.get_capture(len(pkts))
5679 self.assertEqual(1, len(capture))
5682 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5683 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5684 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5686 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5689 def test_pool(self):
5690 """ Add/delete address to NAT64 pool """
5691 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5693 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5695 addresses = self.vapi.nat64_pool_addr_dump()
5696 self.assertEqual(len(addresses), 1)
5697 self.assertEqual(addresses[0].address, nat_addr)
5699 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5701 addresses = self.vapi.nat64_pool_addr_dump()
5702 self.assertEqual(len(addresses), 0)
5704 def test_interface(self):
5705 """ Enable/disable NAT64 feature on the interface """
5706 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5707 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5709 interfaces = self.vapi.nat64_interface_dump()
5710 self.assertEqual(len(interfaces), 2)
5713 for intf in interfaces:
5714 if intf.sw_if_index == self.pg0.sw_if_index:
5715 self.assertEqual(intf.is_inside, 1)
5717 elif intf.sw_if_index == self.pg1.sw_if_index:
5718 self.assertEqual(intf.is_inside, 0)
5720 self.assertTrue(pg0_found)
5721 self.assertTrue(pg1_found)
5723 features = self.vapi.cli("show interface features pg0")
5724 self.assertNotEqual(features.find('nat64-in2out'), -1)
5725 features = self.vapi.cli("show interface features pg1")
5726 self.assertNotEqual(features.find('nat64-out2in'), -1)
5728 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5729 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5731 interfaces = self.vapi.nat64_interface_dump()
5732 self.assertEqual(len(interfaces), 0)
5734 def test_static_bib(self):
5735 """ Add/delete static BIB entry """
5736 in_addr = socket.inet_pton(socket.AF_INET6,
5737 '2001:db8:85a3::8a2e:370:7334')
5738 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5741 proto = IP_PROTOS.tcp
5743 self.vapi.nat64_add_del_static_bib(in_addr,
5748 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5753 self.assertEqual(bibe.i_addr, in_addr)
5754 self.assertEqual(bibe.o_addr, out_addr)
5755 self.assertEqual(bibe.i_port, in_port)
5756 self.assertEqual(bibe.o_port, out_port)
5757 self.assertEqual(static_bib_num, 1)
5759 self.vapi.nat64_add_del_static_bib(in_addr,
5765 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5770 self.assertEqual(static_bib_num, 0)
5772 def test_set_timeouts(self):
5773 """ Set NAT64 timeouts """
5774 # verify default values
5775 timeouts = self.vapi.nat64_get_timeouts()
5776 self.assertEqual(timeouts.udp, 300)
5777 self.assertEqual(timeouts.icmp, 60)
5778 self.assertEqual(timeouts.tcp_trans, 240)
5779 self.assertEqual(timeouts.tcp_est, 7440)
5780 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5782 # set and verify custom values
5783 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5784 tcp_est=7450, tcp_incoming_syn=10)
5785 timeouts = self.vapi.nat64_get_timeouts()
5786 self.assertEqual(timeouts.udp, 200)
5787 self.assertEqual(timeouts.icmp, 30)
5788 self.assertEqual(timeouts.tcp_trans, 250)
5789 self.assertEqual(timeouts.tcp_est, 7450)
5790 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5792 def test_dynamic(self):
5793 """ NAT64 dynamic translation test """
5794 self.tcp_port_in = 6303
5795 self.udp_port_in = 6304
5796 self.icmp_id_in = 6305
5798 ses_num_start = self.nat64_get_ses_num()
5800 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5802 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5803 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5806 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5807 self.pg0.add_stream(pkts)
5808 self.pg_enable_capture(self.pg_interfaces)
5810 capture = self.pg1.get_capture(len(pkts))
5811 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5812 dst_ip=self.pg1.remote_ip4)
5815 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5816 self.pg1.add_stream(pkts)
5817 self.pg_enable_capture(self.pg_interfaces)
5819 capture = self.pg0.get_capture(len(pkts))
5820 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5821 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5824 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5825 self.pg0.add_stream(pkts)
5826 self.pg_enable_capture(self.pg_interfaces)
5828 capture = self.pg1.get_capture(len(pkts))
5829 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5830 dst_ip=self.pg1.remote_ip4)
5833 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5834 self.pg1.add_stream(pkts)
5835 self.pg_enable_capture(self.pg_interfaces)
5837 capture = self.pg0.get_capture(len(pkts))
5838 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5840 ses_num_end = self.nat64_get_ses_num()
5842 self.assertEqual(ses_num_end - ses_num_start, 3)
5844 # tenant with specific VRF
5845 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5846 self.vrf1_nat_addr_n,
5847 vrf_id=self.vrf1_id)
5848 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5850 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5851 self.pg2.add_stream(pkts)
5852 self.pg_enable_capture(self.pg_interfaces)
5854 capture = self.pg1.get_capture(len(pkts))
5855 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5856 dst_ip=self.pg1.remote_ip4)
5858 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5859 self.pg1.add_stream(pkts)
5860 self.pg_enable_capture(self.pg_interfaces)
5862 capture = self.pg2.get_capture(len(pkts))
5863 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5865 def test_static(self):
5866 """ NAT64 static translation test """
5867 self.tcp_port_in = 60303
5868 self.udp_port_in = 60304
5869 self.icmp_id_in = 60305
5870 self.tcp_port_out = 60303
5871 self.udp_port_out = 60304
5872 self.icmp_id_out = 60305
5874 ses_num_start = self.nat64_get_ses_num()
5876 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5878 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5879 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5881 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5886 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5891 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5898 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5899 self.pg0.add_stream(pkts)
5900 self.pg_enable_capture(self.pg_interfaces)
5902 capture = self.pg1.get_capture(len(pkts))
5903 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5904 dst_ip=self.pg1.remote_ip4, same_port=True)
5907 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5908 self.pg1.add_stream(pkts)
5909 self.pg_enable_capture(self.pg_interfaces)
5911 capture = self.pg0.get_capture(len(pkts))
5912 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5913 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5915 ses_num_end = self.nat64_get_ses_num()
5917 self.assertEqual(ses_num_end - ses_num_start, 3)
5919 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5920 def test_session_timeout(self):
5921 """ NAT64 session timeout """
5922 self.icmp_id_in = 1234
5923 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5925 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5926 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5927 self.vapi.nat64_set_timeouts(icmp=5)
5929 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5930 self.pg0.add_stream(pkts)
5931 self.pg_enable_capture(self.pg_interfaces)
5933 capture = self.pg1.get_capture(len(pkts))
5935 ses_num_before_timeout = self.nat64_get_ses_num()
5939 # ICMP session after timeout
5940 ses_num_after_timeout = self.nat64_get_ses_num()
5941 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5943 def test_icmp_error(self):
5944 """ NAT64 ICMP Error message translation """
5945 self.tcp_port_in = 6303
5946 self.udp_port_in = 6304
5947 self.icmp_id_in = 6305
5949 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5951 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5952 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5954 # send some packets to create sessions
5955 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5956 self.pg0.add_stream(pkts)
5957 self.pg_enable_capture(self.pg_interfaces)
5959 capture_ip4 = self.pg1.get_capture(len(pkts))
5960 self.verify_capture_out(capture_ip4,
5961 nat_ip=self.nat_addr,
5962 dst_ip=self.pg1.remote_ip4)
5964 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5965 self.pg1.add_stream(pkts)
5966 self.pg_enable_capture(self.pg_interfaces)
5968 capture_ip6 = self.pg0.get_capture(len(pkts))
5969 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5970 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5971 self.pg0.remote_ip6)
5974 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5975 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5976 ICMPv6DestUnreach(code=1) /
5977 packet[IPv6] for packet in capture_ip6]
5978 self.pg0.add_stream(pkts)
5979 self.pg_enable_capture(self.pg_interfaces)
5981 capture = self.pg1.get_capture(len(pkts))
5982 for packet in capture:
5984 self.assertEqual(packet[IP].src, self.nat_addr)
5985 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5986 self.assertEqual(packet[ICMP].type, 3)
5987 self.assertEqual(packet[ICMP].code, 13)
5988 inner = packet[IPerror]
5989 self.assertEqual(inner.src, self.pg1.remote_ip4)
5990 self.assertEqual(inner.dst, self.nat_addr)
5991 self.assert_packet_checksums_valid(packet)
5992 if inner.haslayer(TCPerror):
5993 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5994 elif inner.haslayer(UDPerror):
5995 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5997 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5999 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6003 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6004 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6005 ICMP(type=3, code=13) /
6006 packet[IP] for packet in capture_ip4]
6007 self.pg1.add_stream(pkts)
6008 self.pg_enable_capture(self.pg_interfaces)
6010 capture = self.pg0.get_capture(len(pkts))
6011 for packet in capture:
6013 self.assertEqual(packet[IPv6].src, ip.src)
6014 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6015 icmp = packet[ICMPv6DestUnreach]
6016 self.assertEqual(icmp.code, 1)
6017 inner = icmp[IPerror6]
6018 self.assertEqual(inner.src, self.pg0.remote_ip6)
6019 self.assertEqual(inner.dst, ip.src)
6020 self.assert_icmpv6_checksum_valid(packet)
6021 if inner.haslayer(TCPerror):
6022 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6023 elif inner.haslayer(UDPerror):
6024 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6026 self.assertEqual(inner[ICMPv6EchoRequest].id,
6029 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6032 def test_hairpinning(self):
6033 """ NAT64 hairpinning """
6035 client = self.pg0.remote_hosts[0]
6036 server = self.pg0.remote_hosts[1]
6037 server_tcp_in_port = 22
6038 server_tcp_out_port = 4022
6039 server_udp_in_port = 23
6040 server_udp_out_port = 4023
6041 client_tcp_in_port = 1234
6042 client_udp_in_port = 1235
6043 client_tcp_out_port = 0
6044 client_udp_out_port = 0
6045 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6046 nat_addr_ip6 = ip.src
6048 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6050 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6051 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6053 self.vapi.nat64_add_del_static_bib(server.ip6n,
6056 server_tcp_out_port,
6058 self.vapi.nat64_add_del_static_bib(server.ip6n,
6061 server_udp_out_port,
6066 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6067 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6068 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6070 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6071 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6072 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6074 self.pg0.add_stream(pkts)
6075 self.pg_enable_capture(self.pg_interfaces)
6077 capture = self.pg0.get_capture(len(pkts))
6078 for packet in capture:
6080 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6081 self.assertEqual(packet[IPv6].dst, server.ip6)
6082 self.assert_packet_checksums_valid(packet)
6083 if packet.haslayer(TCP):
6084 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6085 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6086 client_tcp_out_port = packet[TCP].sport
6088 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6089 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6090 client_udp_out_port = packet[UDP].sport
6092 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6097 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6098 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6099 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6101 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6102 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6103 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6105 self.pg0.add_stream(pkts)
6106 self.pg_enable_capture(self.pg_interfaces)
6108 capture = self.pg0.get_capture(len(pkts))
6109 for packet in capture:
6111 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6112 self.assertEqual(packet[IPv6].dst, client.ip6)
6113 self.assert_packet_checksums_valid(packet)
6114 if packet.haslayer(TCP):
6115 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6116 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6118 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6119 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6121 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6126 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6127 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6128 ICMPv6DestUnreach(code=1) /
6129 packet[IPv6] for packet in capture]
6130 self.pg0.add_stream(pkts)
6131 self.pg_enable_capture(self.pg_interfaces)
6133 capture = self.pg0.get_capture(len(pkts))
6134 for packet in capture:
6136 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6137 self.assertEqual(packet[IPv6].dst, server.ip6)
6138 icmp = packet[ICMPv6DestUnreach]
6139 self.assertEqual(icmp.code, 1)
6140 inner = icmp[IPerror6]
6141 self.assertEqual(inner.src, server.ip6)
6142 self.assertEqual(inner.dst, nat_addr_ip6)
6143 self.assert_packet_checksums_valid(packet)
6144 if inner.haslayer(TCPerror):
6145 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6146 self.assertEqual(inner[TCPerror].dport,
6147 client_tcp_out_port)
6149 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6150 self.assertEqual(inner[UDPerror].dport,
6151 client_udp_out_port)
6153 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6156 def test_prefix(self):
6157 """ NAT64 Network-Specific Prefix """
6159 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6161 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6162 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6163 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6164 self.vrf1_nat_addr_n,
6165 vrf_id=self.vrf1_id)
6166 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6169 global_pref64 = "2001:db8::"
6170 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6171 global_pref64_len = 32
6172 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6174 prefix = self.vapi.nat64_prefix_dump()
6175 self.assertEqual(len(prefix), 1)
6176 self.assertEqual(prefix[0].prefix, global_pref64_n)
6177 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6178 self.assertEqual(prefix[0].vrf_id, 0)
6180 # Add tenant specific prefix
6181 vrf1_pref64 = "2001:db8:122:300::"
6182 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6183 vrf1_pref64_len = 56
6184 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6186 vrf_id=self.vrf1_id)
6187 prefix = self.vapi.nat64_prefix_dump()
6188 self.assertEqual(len(prefix), 2)
6191 pkts = self.create_stream_in_ip6(self.pg0,
6194 plen=global_pref64_len)
6195 self.pg0.add_stream(pkts)
6196 self.pg_enable_capture(self.pg_interfaces)
6198 capture = self.pg1.get_capture(len(pkts))
6199 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6200 dst_ip=self.pg1.remote_ip4)
6202 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6203 self.pg1.add_stream(pkts)
6204 self.pg_enable_capture(self.pg_interfaces)
6206 capture = self.pg0.get_capture(len(pkts))
6207 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6210 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6212 # Tenant specific prefix
6213 pkts = self.create_stream_in_ip6(self.pg2,
6216 plen=vrf1_pref64_len)
6217 self.pg2.add_stream(pkts)
6218 self.pg_enable_capture(self.pg_interfaces)
6220 capture = self.pg1.get_capture(len(pkts))
6221 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6222 dst_ip=self.pg1.remote_ip4)
6224 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6225 self.pg1.add_stream(pkts)
6226 self.pg_enable_capture(self.pg_interfaces)
6228 capture = self.pg2.get_capture(len(pkts))
6229 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6232 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6234 def test_unknown_proto(self):
6235 """ NAT64 translate packet with unknown protocol """
6237 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6239 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6240 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6241 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6244 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6245 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6246 TCP(sport=self.tcp_port_in, dport=20))
6247 self.pg0.add_stream(p)
6248 self.pg_enable_capture(self.pg_interfaces)
6250 p = self.pg1.get_capture(1)
6252 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6253 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6255 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6256 TCP(sport=1234, dport=1234))
6257 self.pg0.add_stream(p)
6258 self.pg_enable_capture(self.pg_interfaces)
6260 p = self.pg1.get_capture(1)
6263 self.assertEqual(packet[IP].src, self.nat_addr)
6264 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6265 self.assertTrue(packet.haslayer(GRE))
6266 self.assert_packet_checksums_valid(packet)
6268 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6272 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6273 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6275 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6276 TCP(sport=1234, dport=1234))
6277 self.pg1.add_stream(p)
6278 self.pg_enable_capture(self.pg_interfaces)
6280 p = self.pg0.get_capture(1)
6283 self.assertEqual(packet[IPv6].src, remote_ip6)
6284 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6285 self.assertEqual(packet[IPv6].nh, 47)
6287 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6290 def test_hairpinning_unknown_proto(self):
6291 """ NAT64 translate packet with unknown protocol - hairpinning """
6293 client = self.pg0.remote_hosts[0]
6294 server = self.pg0.remote_hosts[1]
6295 server_tcp_in_port = 22
6296 server_tcp_out_port = 4022
6297 client_tcp_in_port = 1234
6298 client_tcp_out_port = 1235
6299 server_nat_ip = "10.0.0.100"
6300 client_nat_ip = "10.0.0.110"
6301 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6302 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6303 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6304 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6306 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6308 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6309 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6311 self.vapi.nat64_add_del_static_bib(server.ip6n,
6314 server_tcp_out_port,
6317 self.vapi.nat64_add_del_static_bib(server.ip6n,
6323 self.vapi.nat64_add_del_static_bib(client.ip6n,
6326 client_tcp_out_port,
6330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6331 IPv6(src=client.ip6, dst=server_nat_ip6) /
6332 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6333 self.pg0.add_stream(p)
6334 self.pg_enable_capture(self.pg_interfaces)
6336 p = self.pg0.get_capture(1)
6338 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6339 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6341 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6342 TCP(sport=1234, dport=1234))
6343 self.pg0.add_stream(p)
6344 self.pg_enable_capture(self.pg_interfaces)
6346 p = self.pg0.get_capture(1)
6349 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6350 self.assertEqual(packet[IPv6].dst, server.ip6)
6351 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6353 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6357 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6358 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6360 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6361 TCP(sport=1234, dport=1234))
6362 self.pg0.add_stream(p)
6363 self.pg_enable_capture(self.pg_interfaces)
6365 p = self.pg0.get_capture(1)
6368 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6369 self.assertEqual(packet[IPv6].dst, client.ip6)
6370 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6372 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6375 def test_one_armed_nat64(self):
6376 """ One armed NAT64 """
6378 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6382 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6384 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6385 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6388 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6389 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6390 TCP(sport=12345, dport=80))
6391 self.pg3.add_stream(p)
6392 self.pg_enable_capture(self.pg_interfaces)
6394 capture = self.pg3.get_capture(1)
6399 self.assertEqual(ip.src, self.nat_addr)
6400 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6401 self.assertNotEqual(tcp.sport, 12345)
6402 external_port = tcp.sport
6403 self.assertEqual(tcp.dport, 80)
6404 self.assert_packet_checksums_valid(p)
6406 self.logger.error(ppp("Unexpected or invalid packet:", p))
6410 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6411 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6412 TCP(sport=80, dport=external_port))
6413 self.pg3.add_stream(p)
6414 self.pg_enable_capture(self.pg_interfaces)
6416 capture = self.pg3.get_capture(1)
6421 self.assertEqual(ip.src, remote_host_ip6)
6422 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6423 self.assertEqual(tcp.sport, 80)
6424 self.assertEqual(tcp.dport, 12345)
6425 self.assert_packet_checksums_valid(p)
6427 self.logger.error(ppp("Unexpected or invalid packet:", p))
6430 def test_frag_in_order(self):
6431 """ NAT64 translate fragments arriving in order """
6432 self.tcp_port_in = random.randint(1025, 65535)
6434 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6436 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6437 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6439 reass = self.vapi.nat_reass_dump()
6440 reass_n_start = len(reass)
6444 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6445 self.tcp_port_in, 20, data)
6446 self.pg0.add_stream(pkts)
6447 self.pg_enable_capture(self.pg_interfaces)
6449 frags = self.pg1.get_capture(len(pkts))
6450 p = self.reass_frags_and_verify(frags,
6452 self.pg1.remote_ip4)
6453 self.assertEqual(p[TCP].dport, 20)
6454 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6455 self.tcp_port_out = p[TCP].sport
6456 self.assertEqual(data, p[Raw].load)
6459 data = "A" * 4 + "b" * 16 + "C" * 3
6460 pkts = self.create_stream_frag(self.pg1,
6465 self.pg1.add_stream(pkts)
6466 self.pg_enable_capture(self.pg_interfaces)
6468 frags = self.pg0.get_capture(len(pkts))
6469 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6470 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6471 self.assertEqual(p[TCP].sport, 20)
6472 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6473 self.assertEqual(data, p[Raw].load)
6475 reass = self.vapi.nat_reass_dump()
6476 reass_n_end = len(reass)
6478 self.assertEqual(reass_n_end - reass_n_start, 2)
6480 def test_reass_hairpinning(self):
6481 """ NAT64 fragments hairpinning """
6483 server = self.pg0.remote_hosts[1]
6484 server_in_port = random.randint(1025, 65535)
6485 server_out_port = random.randint(1025, 65535)
6486 client_in_port = random.randint(1025, 65535)
6487 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6488 nat_addr_ip6 = ip.src
6490 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6492 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6493 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6495 # add static BIB entry for server
6496 self.vapi.nat64_add_del_static_bib(server.ip6n,
6502 # send packet from host to server
6503 pkts = self.create_stream_frag_ip6(self.pg0,
6508 self.pg0.add_stream(pkts)
6509 self.pg_enable_capture(self.pg_interfaces)
6511 frags = self.pg0.get_capture(len(pkts))
6512 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6513 self.assertNotEqual(p[TCP].sport, client_in_port)
6514 self.assertEqual(p[TCP].dport, server_in_port)
6515 self.assertEqual(data, p[Raw].load)
6517 def test_frag_out_of_order(self):
6518 """ NAT64 translate fragments arriving out of order """
6519 self.tcp_port_in = random.randint(1025, 65535)
6521 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6523 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6524 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6528 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6529 self.tcp_port_in, 20, data)
6531 self.pg0.add_stream(pkts)
6532 self.pg_enable_capture(self.pg_interfaces)
6534 frags = self.pg1.get_capture(len(pkts))
6535 p = self.reass_frags_and_verify(frags,
6537 self.pg1.remote_ip4)
6538 self.assertEqual(p[TCP].dport, 20)
6539 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6540 self.tcp_port_out = p[TCP].sport
6541 self.assertEqual(data, p[Raw].load)
6544 data = "A" * 4 + "B" * 16 + "C" * 3
6545 pkts = self.create_stream_frag(self.pg1,
6551 self.pg1.add_stream(pkts)
6552 self.pg_enable_capture(self.pg_interfaces)
6554 frags = self.pg0.get_capture(len(pkts))
6555 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6556 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6557 self.assertEqual(p[TCP].sport, 20)
6558 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6559 self.assertEqual(data, p[Raw].load)
6561 def test_interface_addr(self):
6562 """ Acquire NAT64 pool addresses from interface """
6563 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6565 # no address in NAT64 pool
6566 adresses = self.vapi.nat44_address_dump()
6567 self.assertEqual(0, len(adresses))
6569 # configure interface address and check NAT64 address pool
6570 self.pg4.config_ip4()
6571 addresses = self.vapi.nat64_pool_addr_dump()
6572 self.assertEqual(len(addresses), 1)
6573 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6575 # remove interface address and check NAT64 address pool
6576 self.pg4.unconfig_ip4()
6577 addresses = self.vapi.nat64_pool_addr_dump()
6578 self.assertEqual(0, len(adresses))
6580 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6581 def test_ipfix_max_bibs_sessions(self):
6582 """ IPFIX logging maximum session and BIB entries exceeded """
6585 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6589 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6591 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6592 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6596 for i in range(0, max_bibs):
6597 src = "fd01:aa::%x" % (i)
6598 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6599 IPv6(src=src, dst=remote_host_ip6) /
6600 TCP(sport=12345, dport=80))
6602 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6603 IPv6(src=src, dst=remote_host_ip6) /
6604 TCP(sport=12345, dport=22))
6606 self.pg0.add_stream(pkts)
6607 self.pg_enable_capture(self.pg_interfaces)
6609 self.pg1.get_capture(max_sessions)
6611 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6612 src_address=self.pg3.local_ip4n,
6614 template_interval=10)
6615 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6616 src_port=self.ipfix_src_port)
6618 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6619 IPv6(src=src, dst=remote_host_ip6) /
6620 TCP(sport=12345, dport=25))
6621 self.pg0.add_stream(p)
6622 self.pg_enable_capture(self.pg_interfaces)
6624 self.pg1.assert_nothing_captured()
6626 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6627 capture = self.pg3.get_capture(9)
6628 ipfix = IPFIXDecoder()
6629 # first load template
6631 self.assertTrue(p.haslayer(IPFIX))
6632 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6633 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6634 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6635 self.assertEqual(p[UDP].dport, 4739)
6636 self.assertEqual(p[IPFIX].observationDomainID,
6637 self.ipfix_domain_id)
6638 if p.haslayer(Template):
6639 ipfix.add_template(p.getlayer(Template))
6640 # verify events in data set
6642 if p.haslayer(Data):
6643 data = ipfix.decode_data_set(p.getlayer(Set))
6644 self.verify_ipfix_max_sessions(data, max_sessions)
6646 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6647 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6648 TCP(sport=12345, dport=80))
6649 self.pg0.add_stream(p)
6650 self.pg_enable_capture(self.pg_interfaces)
6652 self.pg1.assert_nothing_captured()
6654 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6655 capture = self.pg3.get_capture(1)
6656 # verify events in data set
6658 self.assertTrue(p.haslayer(IPFIX))
6659 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6660 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6661 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6662 self.assertEqual(p[UDP].dport, 4739)
6663 self.assertEqual(p[IPFIX].observationDomainID,
6664 self.ipfix_domain_id)
6665 if p.haslayer(Data):
6666 data = ipfix.decode_data_set(p.getlayer(Set))
6667 self.verify_ipfix_max_bibs(data, max_bibs)
6669 def test_ipfix_max_frags(self):
6670 """ IPFIX logging maximum fragments pending reassembly exceeded """
6671 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6673 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6674 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6675 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6676 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6677 src_address=self.pg3.local_ip4n,
6679 template_interval=10)
6680 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6681 src_port=self.ipfix_src_port)
6684 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6685 self.tcp_port_in, 20, data)
6686 self.pg0.add_stream(pkts[-1])
6687 self.pg_enable_capture(self.pg_interfaces)
6689 self.pg1.assert_nothing_captured()
6691 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6692 capture = self.pg3.get_capture(9)
6693 ipfix = IPFIXDecoder()
6694 # first load template
6696 self.assertTrue(p.haslayer(IPFIX))
6697 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6698 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6699 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6700 self.assertEqual(p[UDP].dport, 4739)
6701 self.assertEqual(p[IPFIX].observationDomainID,
6702 self.ipfix_domain_id)
6703 if p.haslayer(Template):
6704 ipfix.add_template(p.getlayer(Template))
6705 # verify events in data set
6707 if p.haslayer(Data):
6708 data = ipfix.decode_data_set(p.getlayer(Set))
6709 self.verify_ipfix_max_fragments_ip6(data, 0,
6710 self.pg0.remote_ip6n)
6712 def test_ipfix_bib_ses(self):
6713 """ IPFIX logging NAT64 BIB/session create and delete events """
6714 self.tcp_port_in = random.randint(1025, 65535)
6715 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6719 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6721 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6722 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6723 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6724 src_address=self.pg3.local_ip4n,
6726 template_interval=10)
6727 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6728 src_port=self.ipfix_src_port)
6731 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6732 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6733 TCP(sport=self.tcp_port_in, dport=25))
6734 self.pg0.add_stream(p)
6735 self.pg_enable_capture(self.pg_interfaces)
6737 p = self.pg1.get_capture(1)
6738 self.tcp_port_out = p[0][TCP].sport
6739 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6740 capture = self.pg3.get_capture(10)
6741 ipfix = IPFIXDecoder()
6742 # first load template
6744 self.assertTrue(p.haslayer(IPFIX))
6745 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6746 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6747 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6748 self.assertEqual(p[UDP].dport, 4739)
6749 self.assertEqual(p[IPFIX].observationDomainID,
6750 self.ipfix_domain_id)
6751 if p.haslayer(Template):
6752 ipfix.add_template(p.getlayer(Template))
6753 # verify events in data set
6755 if p.haslayer(Data):
6756 data = ipfix.decode_data_set(p.getlayer(Set))
6757 if ord(data[0][230]) == 10:
6758 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6759 elif ord(data[0][230]) == 6:
6760 self.verify_ipfix_nat64_ses(data,
6762 self.pg0.remote_ip6n,
6763 self.pg1.remote_ip4,
6766 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6769 self.pg_enable_capture(self.pg_interfaces)
6770 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6773 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6774 capture = self.pg3.get_capture(2)
6775 # verify events in data set
6777 self.assertTrue(p.haslayer(IPFIX))
6778 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6779 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6780 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6781 self.assertEqual(p[UDP].dport, 4739)
6782 self.assertEqual(p[IPFIX].observationDomainID,
6783 self.ipfix_domain_id)
6784 if p.haslayer(Data):
6785 data = ipfix.decode_data_set(p.getlayer(Set))
6786 if ord(data[0][230]) == 11:
6787 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6788 elif ord(data[0][230]) == 7:
6789 self.verify_ipfix_nat64_ses(data,
6791 self.pg0.remote_ip6n,
6792 self.pg1.remote_ip4,
6795 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6797 def nat64_get_ses_num(self):
6799 Return number of active NAT64 sessions.
6801 st = self.vapi.nat64_st_dump()
6804 def clear_nat64(self):
6806 Clear NAT64 configuration.
6808 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6809 domain_id=self.ipfix_domain_id)
6810 self.ipfix_src_port = 4739
6811 self.ipfix_domain_id = 1
6813 self.vapi.nat64_set_timeouts()
6815 interfaces = self.vapi.nat64_interface_dump()
6816 for intf in interfaces:
6817 if intf.is_inside > 1:
6818 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6821 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6825 bib = self.vapi.nat64_bib_dump(255)
6828 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6836 adresses = self.vapi.nat64_pool_addr_dump()
6837 for addr in adresses:
6838 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6843 prefixes = self.vapi.nat64_prefix_dump()
6844 for prefix in prefixes:
6845 self.vapi.nat64_add_del_prefix(prefix.prefix,
6847 vrf_id=prefix.vrf_id,
6851 super(TestNAT64, self).tearDown()
6852 if not self.vpp_dead:
6853 self.logger.info(self.vapi.cli("show nat64 pool"))
6854 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6855 self.logger.info(self.vapi.cli("show nat64 prefix"))
6856 self.logger.info(self.vapi.cli("show nat64 bib all"))
6857 self.logger.info(self.vapi.cli("show nat64 session table all"))
6858 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6862 class TestDSlite(MethodHolder):
6863 """ DS-Lite Test Cases """
6866 def setUpClass(cls):
6867 super(TestDSlite, cls).setUpClass()
6870 cls.nat_addr = '10.0.0.3'
6871 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6873 cls.create_pg_interfaces(range(2))
6875 cls.pg0.config_ip4()
6876 cls.pg0.resolve_arp()
6878 cls.pg1.config_ip6()
6879 cls.pg1.generate_remote_hosts(2)
6880 cls.pg1.configure_ipv6_neighbors()
6883 super(TestDSlite, cls).tearDownClass()
6886 def test_dslite(self):
6887 """ Test DS-Lite """
6888 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6890 aftr_ip4 = '192.0.0.1'
6891 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6892 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6893 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6894 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6897 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6898 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6899 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6900 UDP(sport=20000, dport=10000))
6901 self.pg1.add_stream(p)
6902 self.pg_enable_capture(self.pg_interfaces)
6904 capture = self.pg0.get_capture(1)
6905 capture = capture[0]
6906 self.assertFalse(capture.haslayer(IPv6))
6907 self.assertEqual(capture[IP].src, self.nat_addr)
6908 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6909 self.assertNotEqual(capture[UDP].sport, 20000)
6910 self.assertEqual(capture[UDP].dport, 10000)
6911 self.assert_packet_checksums_valid(capture)
6912 out_port = capture[UDP].sport
6914 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6915 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6916 UDP(sport=10000, dport=out_port))
6917 self.pg0.add_stream(p)
6918 self.pg_enable_capture(self.pg_interfaces)
6920 capture = self.pg1.get_capture(1)
6921 capture = capture[0]
6922 self.assertEqual(capture[IPv6].src, aftr_ip6)
6923 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6924 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6925 self.assertEqual(capture[IP].dst, '192.168.1.1')
6926 self.assertEqual(capture[UDP].sport, 10000)
6927 self.assertEqual(capture[UDP].dport, 20000)
6928 self.assert_packet_checksums_valid(capture)
6931 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6932 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6933 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6934 TCP(sport=20001, dport=10001))
6935 self.pg1.add_stream(p)
6936 self.pg_enable_capture(self.pg_interfaces)
6938 capture = self.pg0.get_capture(1)
6939 capture = capture[0]
6940 self.assertFalse(capture.haslayer(IPv6))
6941 self.assertEqual(capture[IP].src, self.nat_addr)
6942 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6943 self.assertNotEqual(capture[TCP].sport, 20001)
6944 self.assertEqual(capture[TCP].dport, 10001)
6945 self.assert_packet_checksums_valid(capture)
6946 out_port = capture[TCP].sport
6948 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6949 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6950 TCP(sport=10001, dport=out_port))
6951 self.pg0.add_stream(p)
6952 self.pg_enable_capture(self.pg_interfaces)
6954 capture = self.pg1.get_capture(1)
6955 capture = capture[0]
6956 self.assertEqual(capture[IPv6].src, aftr_ip6)
6957 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6958 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6959 self.assertEqual(capture[IP].dst, '192.168.1.1')
6960 self.assertEqual(capture[TCP].sport, 10001)
6961 self.assertEqual(capture[TCP].dport, 20001)
6962 self.assert_packet_checksums_valid(capture)
6965 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6966 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6967 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6968 ICMP(id=4000, type='echo-request'))
6969 self.pg1.add_stream(p)
6970 self.pg_enable_capture(self.pg_interfaces)
6972 capture = self.pg0.get_capture(1)
6973 capture = capture[0]
6974 self.assertFalse(capture.haslayer(IPv6))
6975 self.assertEqual(capture[IP].src, self.nat_addr)
6976 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6977 self.assertNotEqual(capture[ICMP].id, 4000)
6978 self.assert_packet_checksums_valid(capture)
6979 out_id = capture[ICMP].id
6981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6982 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6983 ICMP(id=out_id, type='echo-reply'))
6984 self.pg0.add_stream(p)
6985 self.pg_enable_capture(self.pg_interfaces)
6987 capture = self.pg1.get_capture(1)
6988 capture = capture[0]
6989 self.assertEqual(capture[IPv6].src, aftr_ip6)
6990 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6991 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6992 self.assertEqual(capture[IP].dst, '192.168.1.1')
6993 self.assertEqual(capture[ICMP].id, 4000)
6994 self.assert_packet_checksums_valid(capture)
6996 # ping DS-Lite AFTR tunnel endpoint address
6997 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6998 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6999 ICMPv6EchoRequest())
7000 self.pg1.add_stream(p)
7001 self.pg_enable_capture(self.pg_interfaces)
7003 capture = self.pg1.get_capture(1)
7004 self.assertEqual(1, len(capture))
7005 capture = capture[0]
7006 self.assertEqual(capture[IPv6].src, aftr_ip6)
7007 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7008 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7011 super(TestDSlite, self).tearDown()
7012 if not self.vpp_dead:
7013 self.logger.info(self.vapi.cli("show dslite pool"))
7015 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7016 self.logger.info(self.vapi.cli("show dslite sessions"))
7019 class TestDSliteCE(MethodHolder):
7020 """ DS-Lite CE Test Cases """
7023 def setUpConstants(cls):
7024 super(TestDSliteCE, cls).setUpConstants()
7025 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7028 def setUpClass(cls):
7029 super(TestDSliteCE, cls).setUpClass()
7032 cls.create_pg_interfaces(range(2))
7034 cls.pg0.config_ip4()
7035 cls.pg0.resolve_arp()
7037 cls.pg1.config_ip6()
7038 cls.pg1.generate_remote_hosts(1)
7039 cls.pg1.configure_ipv6_neighbors()
7042 super(TestDSliteCE, cls).tearDownClass()
7045 def test_dslite_ce(self):
7046 """ Test DS-Lite CE """
7048 b4_ip4 = '192.0.0.2'
7049 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7050 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7051 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7052 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7054 aftr_ip4 = '192.0.0.1'
7055 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7056 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7057 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7058 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7060 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7061 dst_address_length=128,
7062 next_hop_address=self.pg1.remote_ip6n,
7063 next_hop_sw_if_index=self.pg1.sw_if_index,
7067 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7068 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7069 UDP(sport=10000, dport=20000))
7070 self.pg0.add_stream(p)
7071 self.pg_enable_capture(self.pg_interfaces)
7073 capture = self.pg1.get_capture(1)
7074 capture = capture[0]
7075 self.assertEqual(capture[IPv6].src, b4_ip6)
7076 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7077 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7078 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7079 self.assertEqual(capture[UDP].sport, 10000)
7080 self.assertEqual(capture[UDP].dport, 20000)
7081 self.assert_packet_checksums_valid(capture)
7084 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7085 IPv6(dst=b4_ip6, src=aftr_ip6) /
7086 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7087 UDP(sport=20000, dport=10000))
7088 self.pg1.add_stream(p)
7089 self.pg_enable_capture(self.pg_interfaces)
7091 capture = self.pg0.get_capture(1)
7092 capture = capture[0]
7093 self.assertFalse(capture.haslayer(IPv6))
7094 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7095 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7096 self.assertEqual(capture[UDP].sport, 20000)
7097 self.assertEqual(capture[UDP].dport, 10000)
7098 self.assert_packet_checksums_valid(capture)
7100 # ping DS-Lite B4 tunnel endpoint address
7101 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7102 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7103 ICMPv6EchoRequest())
7104 self.pg1.add_stream(p)
7105 self.pg_enable_capture(self.pg_interfaces)
7107 capture = self.pg1.get_capture(1)
7108 self.assertEqual(1, len(capture))
7109 capture = capture[0]
7110 self.assertEqual(capture[IPv6].src, b4_ip6)
7111 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7112 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7115 super(TestDSliteCE, self).tearDown()
7116 if not self.vpp_dead:
7118 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7120 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7123 class TestNAT66(MethodHolder):
7124 """ NAT66 Test Cases """
7127 def setUpClass(cls):
7128 super(TestNAT66, cls).setUpClass()
7131 cls.nat_addr = 'fd01:ff::2'
7132 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7134 cls.create_pg_interfaces(range(2))
7135 cls.interfaces = list(cls.pg_interfaces)
7137 for i in cls.interfaces:
7140 i.configure_ipv6_neighbors()
7143 super(TestNAT66, cls).tearDownClass()
7146 def test_static(self):
7147 """ 1:1 NAT66 test """
7148 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7149 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7150 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7155 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7156 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7159 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7160 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7163 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7164 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7165 ICMPv6EchoRequest())
7167 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7168 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7169 GRE() / IP() / TCP())
7171 self.pg0.add_stream(pkts)
7172 self.pg_enable_capture(self.pg_interfaces)
7174 capture = self.pg1.get_capture(len(pkts))
7175 for packet in capture:
7177 self.assertEqual(packet[IPv6].src, self.nat_addr)
7178 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7179 self.assert_packet_checksums_valid(packet)
7181 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7186 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7187 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7190 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7191 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7194 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7195 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7198 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7199 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7200 GRE() / IP() / TCP())
7202 self.pg1.add_stream(pkts)
7203 self.pg_enable_capture(self.pg_interfaces)
7205 capture = self.pg0.get_capture(len(pkts))
7206 for packet in capture:
7208 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7209 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7210 self.assert_packet_checksums_valid(packet)
7212 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7215 sm = self.vapi.nat66_static_mapping_dump()
7216 self.assertEqual(len(sm), 1)
7217 self.assertEqual(sm[0].total_pkts, 8)
7219 def test_check_no_translate(self):
7220 """ NAT66 translate only when egress interface is outside interface """
7221 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7222 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7223 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7227 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7228 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7230 self.pg0.add_stream([p])
7231 self.pg_enable_capture(self.pg_interfaces)
7233 capture = self.pg1.get_capture(1)
7236 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7237 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7239 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7242 def clear_nat66(self):
7244 Clear NAT66 configuration.
7246 interfaces = self.vapi.nat66_interface_dump()
7247 for intf in interfaces:
7248 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7252 static_mappings = self.vapi.nat66_static_mapping_dump()
7253 for sm in static_mappings:
7254 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7255 sm.external_ip_address,
7260 super(TestNAT66, self).tearDown()
7261 if not self.vpp_dead:
7262 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7263 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7267 if __name__ == '__main__':
7268 unittest.main(testRunner=VppTestRunner)