9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
141 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
142 local_port=0, external_port=0, vrf_id=0,
143 is_add=1, external_sw_if_index=0xFFFFFFFF,
144 proto=0, twice_nat=0, self_twice_nat=0,
145 out2in_only=0, tag=""):
147 Add/delete NAT44 static mapping
149 :param local_ip: Local IP address
150 :param external_ip: External IP address
151 :param local_port: Local port number (Optional)
152 :param external_port: External port number (Optional)
153 :param vrf_id: VRF ID (Default 0)
154 :param is_add: 1 if add, 0 if delete (Default add)
155 :param external_sw_if_index: External interface instead of IP address
156 :param proto: IP protocol (Mandatory if port specified)
157 :param twice_nat: 1 if translate external host address and port
158 :param self_twice_nat: 1 if translate external host address and port
159 whenever external host address equals
160 local address of internal host
161 :param out2in_only: if 1 rule is matching only out2in direction
162 :param tag: Opaque string tag
165 if local_port and external_port:
167 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
168 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
169 self.vapi.nat44_add_del_static_mapping(
172 external_sw_if_index,
184 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
186 Add/delete NAT44 address
188 :param ip: IP address
189 :param is_add: 1 if add, 0 if delete (Default add)
190 :param twice_nat: twice NAT address for extenal hosts
192 nat_addr = socket.inet_pton(socket.AF_INET, ip)
193 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
197 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
199 Create packet stream for inside network
201 :param in_if: Inside interface
202 :param out_if: Outside interface
203 :param dst_ip: Destination address
204 :param ttl: TTL of generated packets
207 dst_ip = out_if.remote_ip4
211 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
212 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 TCP(sport=self.tcp_port_in, dport=20))
217 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
218 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 UDP(sport=self.udp_port_in, dport=20))
223 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
224 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
225 ICMP(id=self.icmp_id_in, type='echo-request'))
230 def compose_ip6(self, ip4, pref, plen):
232 Compose IPv4-embedded IPv6 addresses
234 :param ip4: IPv4 address
235 :param pref: IPv6 prefix
236 :param plen: IPv6 prefix length
237 :returns: IPv4-embedded IPv6 addresses
239 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
240 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
255 pref_n[10] = ip4_n[3]
259 pref_n[10] = ip4_n[2]
260 pref_n[11] = ip4_n[3]
263 pref_n[10] = ip4_n[1]
264 pref_n[11] = ip4_n[2]
265 pref_n[12] = ip4_n[3]
267 pref_n[12] = ip4_n[0]
268 pref_n[13] = ip4_n[1]
269 pref_n[14] = ip4_n[2]
270 pref_n[15] = ip4_n[3]
271 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
273 def extract_ip4(self, ip6, plen):
275 Extract IPv4 address embedded in IPv6 addresses
277 :param ip6: IPv6 address
278 :param plen: IPv6 prefix length
279 :returns: extracted IPv4 address
281 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
313 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
315 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
317 Create IPv6 packet stream for inside network
319 :param in_if: Inside interface
320 :param out_if: Outside interface
321 :param ttl: Hop Limit of generated packets
322 :param pref: NAT64 prefix
323 :param plen: NAT64 prefix length
327 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
329 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
332 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
333 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
334 TCP(sport=self.tcp_port_in, dport=20))
338 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
339 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
340 UDP(sport=self.udp_port_in, dport=20))
344 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
345 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
346 ICMPv6EchoRequest(id=self.icmp_id_in))
351 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
352 use_inside_ports=False):
354 Create packet stream for outside network
356 :param out_if: Outside interface
357 :param dst_ip: Destination IP address (Default use global NAT address)
358 :param ttl: TTL of generated packets
359 :param use_inside_ports: Use inside NAT ports as destination ports
360 instead of outside ports
363 dst_ip = self.nat_addr
364 if not use_inside_ports:
365 tcp_port = self.tcp_port_out
366 udp_port = self.udp_port_out
367 icmp_id = self.icmp_id_out
369 tcp_port = self.tcp_port_in
370 udp_port = self.udp_port_in
371 icmp_id = self.icmp_id_in
374 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
375 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
376 TCP(dport=tcp_port, sport=20))
380 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
381 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
382 UDP(dport=udp_port, sport=20))
386 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
387 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
388 ICMP(id=icmp_id, type='echo-reply'))
393 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
395 Create packet stream for outside network
397 :param out_if: Outside interface
398 :param dst_ip: Destination IP address (Default use global NAT address)
399 :param hl: HL of generated packets
403 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
404 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
405 TCP(dport=self.tcp_port_out, sport=20))
409 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
410 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
411 UDP(dport=self.udp_port_out, sport=20))
415 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
416 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
417 ICMPv6EchoReply(id=self.icmp_id_out))
422 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
423 packet_num=3, dst_ip=None, is_ip6=False):
425 Verify captured packets on outside network
427 :param capture: Captured packets
428 :param nat_ip: Translated IP address (Default use global NAT address)
429 :param same_port: Sorce port number is not translated (Default False)
430 :param packet_num: Expected number of packets (Default 3)
431 :param dst_ip: Destination IP address (Default do not verify)
432 :param is_ip6: If L3 protocol is IPv6 (Default False)
436 ICMP46 = ICMPv6EchoRequest
441 nat_ip = self.nat_addr
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
446 self.assert_packet_checksums_valid(packet)
447 self.assertEqual(packet[IP46].src, nat_ip)
448 if dst_ip is not None:
449 self.assertEqual(packet[IP46].dst, dst_ip)
450 if packet.haslayer(TCP):
452 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
455 packet[TCP].sport, self.tcp_port_in)
456 self.tcp_port_out = packet[TCP].sport
457 self.assert_packet_checksums_valid(packet)
458 elif packet.haslayer(UDP):
460 self.assertEqual(packet[UDP].sport, self.udp_port_in)
463 packet[UDP].sport, self.udp_port_in)
464 self.udp_port_out = packet[UDP].sport
467 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.icmp_id_out = packet[ICMP46].id
471 self.assert_packet_checksums_valid(packet)
473 self.logger.error(ppp("Unexpected or invalid packet "
474 "(outside network):", packet))
477 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
478 packet_num=3, dst_ip=None):
480 Verify captured packets on outside network
482 :param capture: Captured packets
483 :param nat_ip: Translated IP address
484 :param same_port: Sorce port number is not translated (Default False)
485 :param packet_num: Expected number of packets (Default 3)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
491 def verify_capture_in(self, capture, in_if, packet_num=3):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
497 :param packet_num: Expected number of packets (Default 3)
499 self.assertEqual(packet_num, len(capture))
500 for packet in capture:
502 self.assert_packet_checksums_valid(packet)
503 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
504 if packet.haslayer(TCP):
505 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
506 elif packet.haslayer(UDP):
507 self.assertEqual(packet[UDP].dport, self.udp_port_in)
509 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
511 self.logger.error(ppp("Unexpected or invalid packet "
512 "(inside network):", packet))
515 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
517 Verify captured IPv6 packets on inside network
519 :param capture: Captured packets
520 :param src_ip: Source IP
521 :param dst_ip: Destination IP address
522 :param packet_num: Expected number of packets (Default 3)
524 self.assertEqual(packet_num, len(capture))
525 for packet in capture:
527 self.assertEqual(packet[IPv6].src, src_ip)
528 self.assertEqual(packet[IPv6].dst, dst_ip)
529 self.assert_packet_checksums_valid(packet)
530 if packet.haslayer(TCP):
531 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
532 elif packet.haslayer(UDP):
533 self.assertEqual(packet[UDP].dport, self.udp_port_in)
535 self.assertEqual(packet[ICMPv6EchoReply].id,
538 self.logger.error(ppp("Unexpected or invalid packet "
539 "(inside network):", packet))
542 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
544 Verify captured packet that don't have to be translated
546 :param capture: Captured packets
547 :param ingress_if: Ingress interface
548 :param egress_if: Egress interface
550 for packet in capture:
552 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
553 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
554 if packet.haslayer(TCP):
555 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
556 elif packet.haslayer(UDP):
557 self.assertEqual(packet[UDP].sport, self.udp_port_in)
559 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
561 self.logger.error(ppp("Unexpected or invalid packet "
562 "(inside network):", packet))
565 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
566 packet_num=3, icmp_type=11):
568 Verify captured packets with ICMP errors on outside network
570 :param capture: Captured packets
571 :param src_ip: Translated IP address or IP address of VPP
572 (Default use global NAT address)
573 :param packet_num: Expected number of packets (Default 3)
574 :param icmp_type: Type of error ICMP packet
575 we are expecting (Default 11)
578 src_ip = self.nat_addr
579 self.assertEqual(packet_num, len(capture))
580 for packet in capture:
582 self.assertEqual(packet[IP].src, src_ip)
583 self.assertTrue(packet.haslayer(ICMP))
585 self.assertEqual(icmp.type, icmp_type)
586 self.assertTrue(icmp.haslayer(IPerror))
587 inner_ip = icmp[IPerror]
588 if inner_ip.haslayer(TCPerror):
589 self.assertEqual(inner_ip[TCPerror].dport,
591 elif inner_ip.haslayer(UDPerror):
592 self.assertEqual(inner_ip[UDPerror].dport,
595 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
597 self.logger.error(ppp("Unexpected or invalid packet "
598 "(outside network):", packet))
601 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
604 Verify captured packets with ICMP errors on inside network
606 :param capture: Captured packets
607 :param in_if: Inside interface
608 :param packet_num: Expected number of packets (Default 3)
609 :param icmp_type: Type of error ICMP packet
610 we are expecting (Default 11)
612 self.assertEqual(packet_num, len(capture))
613 for packet in capture:
615 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
616 self.assertTrue(packet.haslayer(ICMP))
618 self.assertEqual(icmp.type, icmp_type)
619 self.assertTrue(icmp.haslayer(IPerror))
620 inner_ip = icmp[IPerror]
621 if inner_ip.haslayer(TCPerror):
622 self.assertEqual(inner_ip[TCPerror].sport,
624 elif inner_ip.haslayer(UDPerror):
625 self.assertEqual(inner_ip[UDPerror].sport,
628 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
630 self.logger.error(ppp("Unexpected or invalid packet "
631 "(inside network):", packet))
634 def create_stream_frag(self, src_if, dst, sport, dport, data):
636 Create fragmented packet stream
638 :param src_if: Source interface
639 :param dst: Destination IPv4 address
640 :param sport: Source TCP port
641 :param dport: Destination TCP port
642 :param data: Payload data
645 id = random.randint(0, 65535)
646 p = (IP(src=src_if.remote_ip4, dst=dst) /
647 TCP(sport=sport, dport=dport) /
649 p = p.__class__(str(p))
650 chksum = p['TCP'].chksum
652 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
653 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
654 TCP(sport=sport, dport=dport, chksum=chksum) /
657 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
658 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
659 proto=IP_PROTOS.tcp) /
662 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
663 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
669 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
670 pref=None, plen=0, frag_size=128):
672 Create fragmented packet stream
674 :param src_if: Source interface
675 :param dst: Destination IPv4 address
676 :param sport: Source TCP port
677 :param dport: Destination TCP port
678 :param data: Payload data
679 :param pref: NAT64 prefix
680 :param plen: NAT64 prefix length
681 :param fragsize: size of fragments
685 dst_ip6 = ''.join(['64:ff9b::', dst])
687 dst_ip6 = self.compose_ip6(dst, pref, plen)
689 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
690 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
691 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
692 TCP(sport=sport, dport=dport) /
695 return fragment6(p, frag_size)
697 def reass_frags_and_verify(self, frags, src, dst):
699 Reassemble and verify fragmented packet
701 :param frags: Captured fragments
702 :param src: Source IPv4 address to verify
703 :param dst: Destination IPv4 address to verify
705 :returns: Reassembled IPv4 packet
707 buffer = StringIO.StringIO()
709 self.assertEqual(p[IP].src, src)
710 self.assertEqual(p[IP].dst, dst)
711 self.assert_ip_checksum_valid(p)
712 buffer.seek(p[IP].frag * 8)
713 buffer.write(p[IP].payload)
714 ip = frags[0].getlayer(IP)
715 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
716 proto=frags[0][IP].proto)
717 if ip.proto == IP_PROTOS.tcp:
718 p = (ip / TCP(buffer.getvalue()))
719 self.assert_tcp_checksum_valid(p)
720 elif ip.proto == IP_PROTOS.udp:
721 p = (ip / UDP(buffer.getvalue()))
724 def reass_frags_and_verify_ip6(self, frags, src, dst):
726 Reassemble and verify fragmented packet
728 :param frags: Captured fragments
729 :param src: Source IPv6 address to verify
730 :param dst: Destination IPv6 address to verify
732 :returns: Reassembled IPv6 packet
734 buffer = StringIO.StringIO()
736 self.assertEqual(p[IPv6].src, src)
737 self.assertEqual(p[IPv6].dst, dst)
738 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
739 buffer.write(p[IPv6ExtHdrFragment].payload)
740 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
741 nh=frags[0][IPv6ExtHdrFragment].nh)
742 if ip.nh == IP_PROTOS.tcp:
743 p = (ip / TCP(buffer.getvalue()))
744 elif ip.nh == IP_PROTOS.udp:
745 p = (ip / UDP(buffer.getvalue()))
746 self.assert_packet_checksums_valid(p)
749 def initiate_tcp_session(self, in_if, out_if):
751 Initiates TCP session
753 :param in_if: Inside interface
754 :param out_if: Outside interface
758 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
759 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
760 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
763 self.pg_enable_capture(self.pg_interfaces)
765 capture = out_if.get_capture(1)
767 self.tcp_port_out = p[TCP].sport
769 # SYN + ACK packet out->in
770 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
771 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
772 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
775 self.pg_enable_capture(self.pg_interfaces)
780 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
781 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
782 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
785 self.pg_enable_capture(self.pg_interfaces)
787 out_if.get_capture(1)
790 self.logger.error("TCP 3 way handshake failed")
793 def verify_ipfix_nat44_ses(self, data):
795 Verify IPFIX NAT44 session create/delete event
797 :param data: Decoded IPFIX data records
799 nat44_ses_create_num = 0
800 nat44_ses_delete_num = 0
801 self.assertEqual(6, len(data))
804 self.assertIn(ord(record[230]), [4, 5])
805 if ord(record[230]) == 4:
806 nat44_ses_create_num += 1
808 nat44_ses_delete_num += 1
810 self.assertEqual(self.pg0.remote_ip4n, record[8])
811 # postNATSourceIPv4Address
812 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
815 self.assertEqual(struct.pack("!I", 0), record[234])
816 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
817 if IP_PROTOS.icmp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
819 self.assertEqual(struct.pack("!H", self.icmp_id_out),
821 elif IP_PROTOS.tcp == ord(record[4]):
822 self.assertEqual(struct.pack("!H", self.tcp_port_in),
824 self.assertEqual(struct.pack("!H", self.tcp_port_out),
826 elif IP_PROTOS.udp == ord(record[4]):
827 self.assertEqual(struct.pack("!H", self.udp_port_in),
829 self.assertEqual(struct.pack("!H", self.udp_port_out),
832 self.fail("Invalid protocol")
833 self.assertEqual(3, nat44_ses_create_num)
834 self.assertEqual(3, nat44_ses_delete_num)
836 def verify_ipfix_addr_exhausted(self, data):
838 Verify IPFIX NAT addresses event
840 :param data: Decoded IPFIX data records
842 self.assertEqual(1, len(data))
845 self.assertEqual(ord(record[230]), 3)
847 self.assertEqual(struct.pack("!I", 0), record[283])
849 def verify_ipfix_max_sessions(self, data, limit):
851 Verify IPFIX maximum session entries exceeded event
853 :param data: Decoded IPFIX data records
854 :param limit: Number of maximum session entries that can be created.
856 self.assertEqual(1, len(data))
859 self.assertEqual(ord(record[230]), 13)
860 # natQuotaExceededEvent
861 self.assertEqual(struct.pack("I", 1), record[466])
863 self.assertEqual(struct.pack("I", limit), record[471])
865 def verify_ipfix_max_bibs(self, data, limit):
867 Verify IPFIX maximum BIB entries exceeded event
869 :param data: Decoded IPFIX data records
870 :param limit: Number of maximum BIB entries that can be created.
872 self.assertEqual(1, len(data))
875 self.assertEqual(ord(record[230]), 13)
876 # natQuotaExceededEvent
877 self.assertEqual(struct.pack("I", 2), record[466])
879 self.assertEqual(struct.pack("I", limit), record[472])
881 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
883 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
885 :param data: Decoded IPFIX data records
886 :param limit: Number of maximum fragments pending reassembly
887 :param src_addr: IPv6 source address
889 self.assertEqual(1, len(data))
892 self.assertEqual(ord(record[230]), 13)
893 # natQuotaExceededEvent
894 self.assertEqual(struct.pack("I", 5), record[466])
895 # maxFragmentsPendingReassembly
896 self.assertEqual(struct.pack("I", limit), record[475])
898 self.assertEqual(src_addr, record[27])
900 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
902 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
904 :param data: Decoded IPFIX data records
905 :param limit: Number of maximum fragments pending reassembly
906 :param src_addr: IPv4 source address
908 self.assertEqual(1, len(data))
911 self.assertEqual(ord(record[230]), 13)
912 # natQuotaExceededEvent
913 self.assertEqual(struct.pack("I", 5), record[466])
914 # maxFragmentsPendingReassembly
915 self.assertEqual(struct.pack("I", limit), record[475])
917 self.assertEqual(src_addr, record[8])
919 def verify_ipfix_bib(self, data, is_create, src_addr):
921 Verify IPFIX NAT64 BIB create and delete events
923 :param data: Decoded IPFIX data records
924 :param is_create: Create event if nonzero value otherwise delete event
925 :param src_addr: IPv6 source address
927 self.assertEqual(1, len(data))
931 self.assertEqual(ord(record[230]), 10)
933 self.assertEqual(ord(record[230]), 11)
935 self.assertEqual(src_addr, record[27])
936 # postNATSourceIPv4Address
937 self.assertEqual(self.nat_addr_n, record[225])
939 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
941 self.assertEqual(struct.pack("!I", 0), record[234])
942 # sourceTransportPort
943 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
944 # postNAPTSourceTransportPort
945 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
947 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
950 Verify IPFIX NAT64 session create and delete events
952 :param data: Decoded IPFIX data records
953 :param is_create: Create event if nonzero value otherwise delete event
954 :param src_addr: IPv6 source address
955 :param dst_addr: IPv4 destination address
956 :param dst_port: destination TCP port
958 self.assertEqual(1, len(data))
962 self.assertEqual(ord(record[230]), 6)
964 self.assertEqual(ord(record[230]), 7)
966 self.assertEqual(src_addr, record[27])
967 # destinationIPv6Address
968 self.assertEqual(socket.inet_pton(socket.AF_INET6,
969 self.compose_ip6(dst_addr,
973 # postNATSourceIPv4Address
974 self.assertEqual(self.nat_addr_n, record[225])
975 # postNATDestinationIPv4Address
976 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
979 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
981 self.assertEqual(struct.pack("!I", 0), record[234])
982 # sourceTransportPort
983 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
984 # postNAPTSourceTransportPort
985 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
986 # destinationTransportPort
987 self.assertEqual(struct.pack("!H", dst_port), record[11])
988 # postNAPTDestinationTransportPort
989 self.assertEqual(struct.pack("!H", dst_port), record[228])
991 def verify_no_nat44_user(self):
992 """ Verify that there is no NAT44 user """
993 users = self.vapi.nat44_user_dump()
994 self.assertEqual(len(users), 0)
997 class TestNAT44(MethodHolder):
998 """ NAT44 Test Cases """
1001 def setUpClass(cls):
1002 super(TestNAT44, cls).setUpClass()
1003 cls.vapi.cli("set log class nat level debug")
1006 cls.tcp_port_in = 6303
1007 cls.tcp_port_out = 6303
1008 cls.udp_port_in = 6304
1009 cls.udp_port_out = 6304
1010 cls.icmp_id_in = 6305
1011 cls.icmp_id_out = 6305
1012 cls.nat_addr = '10.0.0.3'
1013 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1014 cls.ipfix_src_port = 4739
1015 cls.ipfix_domain_id = 1
1016 cls.tcp_external_port = 80
1018 cls.create_pg_interfaces(range(10))
1019 cls.interfaces = list(cls.pg_interfaces[0:4])
1021 for i in cls.interfaces:
1026 cls.pg0.generate_remote_hosts(3)
1027 cls.pg0.configure_ipv4_neighbors()
1029 cls.pg1.generate_remote_hosts(1)
1030 cls.pg1.configure_ipv4_neighbors()
1032 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1033 cls.vapi.ip_table_add_del(10, is_add=1)
1034 cls.vapi.ip_table_add_del(20, is_add=1)
1036 cls.pg4._local_ip4 = "172.16.255.1"
1037 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1038 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1039 cls.pg4.set_table_ip4(10)
1040 cls.pg5._local_ip4 = "172.17.255.3"
1041 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1042 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1043 cls.pg5.set_table_ip4(10)
1044 cls.pg6._local_ip4 = "172.16.255.1"
1045 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1046 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1047 cls.pg6.set_table_ip4(20)
1048 for i in cls.overlapping_interfaces:
1056 cls.pg9.generate_remote_hosts(2)
1057 cls.pg9.config_ip4()
1058 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1059 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1063 cls.pg9.resolve_arp()
1064 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1065 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1066 cls.pg9.resolve_arp()
1069 super(TestNAT44, cls).tearDownClass()
1072 def test_dynamic(self):
1073 """ NAT44 dynamic translation test """
1075 self.nat44_add_address(self.nat_addr)
1076 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1077 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1081 pkts = self.create_stream_in(self.pg0, self.pg1)
1082 self.pg0.add_stream(pkts)
1083 self.pg_enable_capture(self.pg_interfaces)
1085 capture = self.pg1.get_capture(len(pkts))
1086 self.verify_capture_out(capture)
1089 pkts = self.create_stream_out(self.pg1)
1090 self.pg1.add_stream(pkts)
1091 self.pg_enable_capture(self.pg_interfaces)
1093 capture = self.pg0.get_capture(len(pkts))
1094 self.verify_capture_in(capture, self.pg0)
1096 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1097 """ NAT44 handling of client packets with TTL=1 """
1099 self.nat44_add_address(self.nat_addr)
1100 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1101 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1104 # Client side - generate traffic
1105 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1106 self.pg0.add_stream(pkts)
1107 self.pg_enable_capture(self.pg_interfaces)
1110 # Client side - verify ICMP type 11 packets
1111 capture = self.pg0.get_capture(len(pkts))
1112 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1114 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1115 """ NAT44 handling of server packets with TTL=1 """
1117 self.nat44_add_address(self.nat_addr)
1118 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1119 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1122 # Client side - create sessions
1123 pkts = self.create_stream_in(self.pg0, self.pg1)
1124 self.pg0.add_stream(pkts)
1125 self.pg_enable_capture(self.pg_interfaces)
1128 # Server side - generate traffic
1129 capture = self.pg1.get_capture(len(pkts))
1130 self.verify_capture_out(capture)
1131 pkts = self.create_stream_out(self.pg1, ttl=1)
1132 self.pg1.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1136 # Server side - verify ICMP type 11 packets
1137 capture = self.pg1.get_capture(len(pkts))
1138 self.verify_capture_out_with_icmp_errors(capture,
1139 src_ip=self.pg1.local_ip4)
1141 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1142 """ NAT44 handling of error responses to client packets with TTL=2 """
1144 self.nat44_add_address(self.nat_addr)
1145 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1146 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1149 # Client side - generate traffic
1150 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1151 self.pg0.add_stream(pkts)
1152 self.pg_enable_capture(self.pg_interfaces)
1155 # Server side - simulate ICMP type 11 response
1156 capture = self.pg1.get_capture(len(pkts))
1157 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1158 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1159 ICMP(type=11) / packet[IP] for packet in capture]
1160 self.pg1.add_stream(pkts)
1161 self.pg_enable_capture(self.pg_interfaces)
1164 # Client side - verify ICMP type 11 packets
1165 capture = self.pg0.get_capture(len(pkts))
1166 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1168 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1169 """ NAT44 handling of error responses to server packets with TTL=2 """
1171 self.nat44_add_address(self.nat_addr)
1172 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1173 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1176 # Client side - create sessions
1177 pkts = self.create_stream_in(self.pg0, self.pg1)
1178 self.pg0.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Server side - generate traffic
1183 capture = self.pg1.get_capture(len(pkts))
1184 self.verify_capture_out(capture)
1185 pkts = self.create_stream_out(self.pg1, ttl=2)
1186 self.pg1.add_stream(pkts)
1187 self.pg_enable_capture(self.pg_interfaces)
1190 # Client side - simulate ICMP type 11 response
1191 capture = self.pg0.get_capture(len(pkts))
1192 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1193 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1194 ICMP(type=11) / packet[IP] for packet in capture]
1195 self.pg0.add_stream(pkts)
1196 self.pg_enable_capture(self.pg_interfaces)
1199 # Server side - verify ICMP type 11 packets
1200 capture = self.pg1.get_capture(len(pkts))
1201 self.verify_capture_out_with_icmp_errors(capture)
1203 def test_ping_out_interface_from_outside(self):
1204 """ Ping NAT44 out interface from outside network """
1206 self.nat44_add_address(self.nat_addr)
1207 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1208 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1211 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1212 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1213 ICMP(id=self.icmp_id_out, type='echo-request'))
1215 self.pg1.add_stream(pkts)
1216 self.pg_enable_capture(self.pg_interfaces)
1218 capture = self.pg1.get_capture(len(pkts))
1219 self.assertEqual(1, len(capture))
1222 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1223 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1224 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1225 self.assertEqual(packet[ICMP].type, 0) # echo reply
1227 self.logger.error(ppp("Unexpected or invalid packet "
1228 "(outside network):", packet))
1231 def test_ping_internal_host_from_outside(self):
1232 """ Ping internal host from outside network """
1234 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1235 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1236 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1240 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1241 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1242 ICMP(id=self.icmp_id_out, type='echo-request'))
1243 self.pg1.add_stream(pkt)
1244 self.pg_enable_capture(self.pg_interfaces)
1246 capture = self.pg0.get_capture(1)
1247 self.verify_capture_in(capture, self.pg0, packet_num=1)
1248 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1251 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1252 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1253 ICMP(id=self.icmp_id_in, type='echo-reply'))
1254 self.pg0.add_stream(pkt)
1255 self.pg_enable_capture(self.pg_interfaces)
1257 capture = self.pg1.get_capture(1)
1258 self.verify_capture_out(capture, same_port=True, packet_num=1)
1259 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1261 def test_forwarding(self):
1262 """ NAT44 forwarding test """
1264 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1265 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1267 self.vapi.nat44_forwarding_enable_disable(1)
1269 real_ip = self.pg0.remote_ip4n
1270 alias_ip = self.nat_addr_n
1271 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1272 external_ip=alias_ip)
1275 # static mapping match
1277 pkts = self.create_stream_out(self.pg1)
1278 self.pg1.add_stream(pkts)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg0.get_capture(len(pkts))
1282 self.verify_capture_in(capture, self.pg0)
1284 pkts = self.create_stream_in(self.pg0, self.pg1)
1285 self.pg0.add_stream(pkts)
1286 self.pg_enable_capture(self.pg_interfaces)
1288 capture = self.pg1.get_capture(len(pkts))
1289 self.verify_capture_out(capture, same_port=True)
1291 # no static mapping match
1293 host0 = self.pg0.remote_hosts[0]
1294 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1296 pkts = self.create_stream_out(self.pg1,
1297 dst_ip=self.pg0.remote_ip4,
1298 use_inside_ports=True)
1299 self.pg1.add_stream(pkts)
1300 self.pg_enable_capture(self.pg_interfaces)
1302 capture = self.pg0.get_capture(len(pkts))
1303 self.verify_capture_in(capture, self.pg0)
1305 pkts = self.create_stream_in(self.pg0, self.pg1)
1306 self.pg0.add_stream(pkts)
1307 self.pg_enable_capture(self.pg_interfaces)
1309 capture = self.pg1.get_capture(len(pkts))
1310 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1313 self.pg0.remote_hosts[0] = host0
1316 self.vapi.nat44_forwarding_enable_disable(0)
1317 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318 external_ip=alias_ip,
1321 def test_static_in(self):
1322 """ 1:1 NAT initialized from inside network """
1324 nat_ip = "10.0.0.10"
1325 self.tcp_port_out = 6303
1326 self.udp_port_out = 6304
1327 self.icmp_id_out = 6305
1329 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1330 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1331 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1333 sm = self.vapi.nat44_static_mapping_dump()
1334 self.assertEqual(len(sm), 1)
1335 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1336 self.assertEqual(sm[0].protocol, 0)
1337 self.assertEqual(sm[0].local_port, 0)
1338 self.assertEqual(sm[0].external_port, 0)
1341 pkts = self.create_stream_in(self.pg0, self.pg1)
1342 self.pg0.add_stream(pkts)
1343 self.pg_enable_capture(self.pg_interfaces)
1345 capture = self.pg1.get_capture(len(pkts))
1346 self.verify_capture_out(capture, nat_ip, True)
1349 pkts = self.create_stream_out(self.pg1, nat_ip)
1350 self.pg1.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg0.get_capture(len(pkts))
1354 self.verify_capture_in(capture, self.pg0)
1356 def test_static_out(self):
1357 """ 1:1 NAT initialized from outside network """
1359 nat_ip = "10.0.0.20"
1360 self.tcp_port_out = 6303
1361 self.udp_port_out = 6304
1362 self.icmp_id_out = 6305
1365 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1366 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1367 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1369 sm = self.vapi.nat44_static_mapping_dump()
1370 self.assertEqual(len(sm), 1)
1371 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1374 pkts = self.create_stream_out(self.pg1, nat_ip)
1375 self.pg1.add_stream(pkts)
1376 self.pg_enable_capture(self.pg_interfaces)
1378 capture = self.pg0.get_capture(len(pkts))
1379 self.verify_capture_in(capture, self.pg0)
1382 pkts = self.create_stream_in(self.pg0, self.pg1)
1383 self.pg0.add_stream(pkts)
1384 self.pg_enable_capture(self.pg_interfaces)
1386 capture = self.pg1.get_capture(len(pkts))
1387 self.verify_capture_out(capture, nat_ip, True)
1389 def test_static_with_port_in(self):
1390 """ 1:1 NAPT initialized from inside network """
1392 self.tcp_port_out = 3606
1393 self.udp_port_out = 3607
1394 self.icmp_id_out = 3608
1396 self.nat44_add_address(self.nat_addr)
1397 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1398 self.tcp_port_in, self.tcp_port_out,
1399 proto=IP_PROTOS.tcp)
1400 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1401 self.udp_port_in, self.udp_port_out,
1402 proto=IP_PROTOS.udp)
1403 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1404 self.icmp_id_in, self.icmp_id_out,
1405 proto=IP_PROTOS.icmp)
1406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1411 pkts = self.create_stream_in(self.pg0, self.pg1)
1412 self.pg0.add_stream(pkts)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg1.get_capture(len(pkts))
1416 self.verify_capture_out(capture)
1419 pkts = self.create_stream_out(self.pg1)
1420 self.pg1.add_stream(pkts)
1421 self.pg_enable_capture(self.pg_interfaces)
1423 capture = self.pg0.get_capture(len(pkts))
1424 self.verify_capture_in(capture, self.pg0)
1426 def test_static_with_port_out(self):
1427 """ 1:1 NAPT initialized from outside network """
1429 self.tcp_port_out = 30606
1430 self.udp_port_out = 30607
1431 self.icmp_id_out = 30608
1433 self.nat44_add_address(self.nat_addr)
1434 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1435 self.tcp_port_in, self.tcp_port_out,
1436 proto=IP_PROTOS.tcp)
1437 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1438 self.udp_port_in, self.udp_port_out,
1439 proto=IP_PROTOS.udp)
1440 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1441 self.icmp_id_in, self.icmp_id_out,
1442 proto=IP_PROTOS.icmp)
1443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1448 pkts = self.create_stream_out(self.pg1)
1449 self.pg1.add_stream(pkts)
1450 self.pg_enable_capture(self.pg_interfaces)
1452 capture = self.pg0.get_capture(len(pkts))
1453 self.verify_capture_in(capture, self.pg0)
1456 pkts = self.create_stream_in(self.pg0, self.pg1)
1457 self.pg0.add_stream(pkts)
1458 self.pg_enable_capture(self.pg_interfaces)
1460 capture = self.pg1.get_capture(len(pkts))
1461 self.verify_capture_out(capture)
1463 def test_static_vrf_aware(self):
1464 """ 1:1 NAT VRF awareness """
1466 nat_ip1 = "10.0.0.30"
1467 nat_ip2 = "10.0.0.40"
1468 self.tcp_port_out = 6303
1469 self.udp_port_out = 6304
1470 self.icmp_id_out = 6305
1472 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1474 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1476 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1478 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1479 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1481 # inside interface VRF match NAT44 static mapping VRF
1482 pkts = self.create_stream_in(self.pg4, self.pg3)
1483 self.pg4.add_stream(pkts)
1484 self.pg_enable_capture(self.pg_interfaces)
1486 capture = self.pg3.get_capture(len(pkts))
1487 self.verify_capture_out(capture, nat_ip1, True)
1489 # inside interface VRF don't match NAT44 static mapping VRF (packets
1491 pkts = self.create_stream_in(self.pg0, self.pg3)
1492 self.pg0.add_stream(pkts)
1493 self.pg_enable_capture(self.pg_interfaces)
1495 self.pg3.assert_nothing_captured()
1497 def test_dynamic_to_static(self):
1498 """ Switch from dynamic translation to 1:1NAT """
1499 nat_ip = "10.0.0.10"
1500 self.tcp_port_out = 6303
1501 self.udp_port_out = 6304
1502 self.icmp_id_out = 6305
1504 self.nat44_add_address(self.nat_addr)
1505 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1506 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1510 pkts = self.create_stream_in(self.pg0, self.pg1)
1511 self.pg0.add_stream(pkts)
1512 self.pg_enable_capture(self.pg_interfaces)
1514 capture = self.pg1.get_capture(len(pkts))
1515 self.verify_capture_out(capture)
1518 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1519 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1520 self.assertEqual(len(sessions), 0)
1521 pkts = self.create_stream_in(self.pg0, self.pg1)
1522 self.pg0.add_stream(pkts)
1523 self.pg_enable_capture(self.pg_interfaces)
1525 capture = self.pg1.get_capture(len(pkts))
1526 self.verify_capture_out(capture, nat_ip, True)
1528 def test_identity_nat(self):
1529 """ Identity NAT """
1531 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1532 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1533 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1536 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1537 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1538 TCP(sport=12345, dport=56789))
1539 self.pg1.add_stream(p)
1540 self.pg_enable_capture(self.pg_interfaces)
1542 capture = self.pg0.get_capture(1)
1547 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1548 self.assertEqual(ip.src, self.pg1.remote_ip4)
1549 self.assertEqual(tcp.dport, 56789)
1550 self.assertEqual(tcp.sport, 12345)
1551 self.assert_packet_checksums_valid(p)
1553 self.logger.error(ppp("Unexpected or invalid packet:", p))
1556 def test_multiple_inside_interfaces(self):
1557 """ NAT44 multiple non-overlapping address space inside interfaces """
1559 self.nat44_add_address(self.nat_addr)
1560 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1561 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1562 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1565 # between two NAT44 inside interfaces (no translation)
1566 pkts = self.create_stream_in(self.pg0, self.pg1)
1567 self.pg0.add_stream(pkts)
1568 self.pg_enable_capture(self.pg_interfaces)
1570 capture = self.pg1.get_capture(len(pkts))
1571 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1573 # from NAT44 inside to interface without NAT44 feature (no translation)
1574 pkts = self.create_stream_in(self.pg0, self.pg2)
1575 self.pg0.add_stream(pkts)
1576 self.pg_enable_capture(self.pg_interfaces)
1578 capture = self.pg2.get_capture(len(pkts))
1579 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1581 # in2out 1st interface
1582 pkts = self.create_stream_in(self.pg0, self.pg3)
1583 self.pg0.add_stream(pkts)
1584 self.pg_enable_capture(self.pg_interfaces)
1586 capture = self.pg3.get_capture(len(pkts))
1587 self.verify_capture_out(capture)
1589 # out2in 1st interface
1590 pkts = self.create_stream_out(self.pg3)
1591 self.pg3.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg0.get_capture(len(pkts))
1595 self.verify_capture_in(capture, self.pg0)
1597 # in2out 2nd interface
1598 pkts = self.create_stream_in(self.pg1, self.pg3)
1599 self.pg1.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg3.get_capture(len(pkts))
1603 self.verify_capture_out(capture)
1605 # out2in 2nd interface
1606 pkts = self.create_stream_out(self.pg3)
1607 self.pg3.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg1.get_capture(len(pkts))
1611 self.verify_capture_in(capture, self.pg1)
1613 def test_inside_overlapping_interfaces(self):
1614 """ NAT44 multiple inside interfaces with overlapping address space """
1616 static_nat_ip = "10.0.0.10"
1617 self.nat44_add_address(self.nat_addr)
1618 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1620 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1621 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1622 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1623 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1626 # between NAT44 inside interfaces with same VRF (no translation)
1627 pkts = self.create_stream_in(self.pg4, self.pg5)
1628 self.pg4.add_stream(pkts)
1629 self.pg_enable_capture(self.pg_interfaces)
1631 capture = self.pg5.get_capture(len(pkts))
1632 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1634 # between NAT44 inside interfaces with different VRF (hairpinning)
1635 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1636 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1637 TCP(sport=1234, dport=5678))
1638 self.pg4.add_stream(p)
1639 self.pg_enable_capture(self.pg_interfaces)
1641 capture = self.pg6.get_capture(1)
1646 self.assertEqual(ip.src, self.nat_addr)
1647 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1648 self.assertNotEqual(tcp.sport, 1234)
1649 self.assertEqual(tcp.dport, 5678)
1651 self.logger.error(ppp("Unexpected or invalid packet:", p))
1654 # in2out 1st interface
1655 pkts = self.create_stream_in(self.pg4, self.pg3)
1656 self.pg4.add_stream(pkts)
1657 self.pg_enable_capture(self.pg_interfaces)
1659 capture = self.pg3.get_capture(len(pkts))
1660 self.verify_capture_out(capture)
1662 # out2in 1st interface
1663 pkts = self.create_stream_out(self.pg3)
1664 self.pg3.add_stream(pkts)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg4.get_capture(len(pkts))
1668 self.verify_capture_in(capture, self.pg4)
1670 # in2out 2nd interface
1671 pkts = self.create_stream_in(self.pg5, self.pg3)
1672 self.pg5.add_stream(pkts)
1673 self.pg_enable_capture(self.pg_interfaces)
1675 capture = self.pg3.get_capture(len(pkts))
1676 self.verify_capture_out(capture)
1678 # out2in 2nd interface
1679 pkts = self.create_stream_out(self.pg3)
1680 self.pg3.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg5.get_capture(len(pkts))
1684 self.verify_capture_in(capture, self.pg5)
1687 addresses = self.vapi.nat44_address_dump()
1688 self.assertEqual(len(addresses), 1)
1689 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1690 self.assertEqual(len(sessions), 3)
1691 for session in sessions:
1692 self.assertFalse(session.is_static)
1693 self.assertEqual(session.inside_ip_address[0:4],
1694 self.pg5.remote_ip4n)
1695 self.assertEqual(session.outside_ip_address,
1696 addresses[0].ip_address)
1697 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1698 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1699 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1700 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1701 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1702 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1703 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1704 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1705 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1707 # in2out 3rd interface
1708 pkts = self.create_stream_in(self.pg6, self.pg3)
1709 self.pg6.add_stream(pkts)
1710 self.pg_enable_capture(self.pg_interfaces)
1712 capture = self.pg3.get_capture(len(pkts))
1713 self.verify_capture_out(capture, static_nat_ip, True)
1715 # out2in 3rd interface
1716 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1717 self.pg3.add_stream(pkts)
1718 self.pg_enable_capture(self.pg_interfaces)
1720 capture = self.pg6.get_capture(len(pkts))
1721 self.verify_capture_in(capture, self.pg6)
1723 # general user and session dump verifications
1724 users = self.vapi.nat44_user_dump()
1725 self.assertTrue(len(users) >= 3)
1726 addresses = self.vapi.nat44_address_dump()
1727 self.assertEqual(len(addresses), 1)
1729 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1731 for session in sessions:
1732 self.assertEqual(user.ip_address, session.inside_ip_address)
1733 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1734 self.assertTrue(session.protocol in
1735 [IP_PROTOS.tcp, IP_PROTOS.udp,
1737 self.assertFalse(session.ext_host_valid)
1740 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1741 self.assertTrue(len(sessions) >= 4)
1742 for session in sessions:
1743 self.assertFalse(session.is_static)
1744 self.assertEqual(session.inside_ip_address[0:4],
1745 self.pg4.remote_ip4n)
1746 self.assertEqual(session.outside_ip_address,
1747 addresses[0].ip_address)
1750 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1751 self.assertTrue(len(sessions) >= 3)
1752 for session in sessions:
1753 self.assertTrue(session.is_static)
1754 self.assertEqual(session.inside_ip_address[0:4],
1755 self.pg6.remote_ip4n)
1756 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1757 map(int, static_nat_ip.split('.')))
1758 self.assertTrue(session.inside_port in
1759 [self.tcp_port_in, self.udp_port_in,
1762 def test_hairpinning(self):
1763 """ NAT44 hairpinning - 1:1 NAPT """
1765 host = self.pg0.remote_hosts[0]
1766 server = self.pg0.remote_hosts[1]
1769 server_in_port = 5678
1770 server_out_port = 8765
1772 self.nat44_add_address(self.nat_addr)
1773 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1774 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1776 # add static mapping for server
1777 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1778 server_in_port, server_out_port,
1779 proto=IP_PROTOS.tcp)
1781 # send packet from host to server
1782 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1783 IP(src=host.ip4, dst=self.nat_addr) /
1784 TCP(sport=host_in_port, dport=server_out_port))
1785 self.pg0.add_stream(p)
1786 self.pg_enable_capture(self.pg_interfaces)
1788 capture = self.pg0.get_capture(1)
1793 self.assertEqual(ip.src, self.nat_addr)
1794 self.assertEqual(ip.dst, server.ip4)
1795 self.assertNotEqual(tcp.sport, host_in_port)
1796 self.assertEqual(tcp.dport, server_in_port)
1797 self.assert_packet_checksums_valid(p)
1798 host_out_port = tcp.sport
1800 self.logger.error(ppp("Unexpected or invalid packet:", p))
1803 # send reply from server to host
1804 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1805 IP(src=server.ip4, dst=self.nat_addr) /
1806 TCP(sport=server_in_port, dport=host_out_port))
1807 self.pg0.add_stream(p)
1808 self.pg_enable_capture(self.pg_interfaces)
1810 capture = self.pg0.get_capture(1)
1815 self.assertEqual(ip.src, self.nat_addr)
1816 self.assertEqual(ip.dst, host.ip4)
1817 self.assertEqual(tcp.sport, server_out_port)
1818 self.assertEqual(tcp.dport, host_in_port)
1819 self.assert_packet_checksums_valid(p)
1821 self.logger.error(ppp("Unexpected or invalid packet:", p))
1824 def test_hairpinning2(self):
1825 """ NAT44 hairpinning - 1:1 NAT"""
1827 server1_nat_ip = "10.0.0.10"
1828 server2_nat_ip = "10.0.0.11"
1829 host = self.pg0.remote_hosts[0]
1830 server1 = self.pg0.remote_hosts[1]
1831 server2 = self.pg0.remote_hosts[2]
1832 server_tcp_port = 22
1833 server_udp_port = 20
1835 self.nat44_add_address(self.nat_addr)
1836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1840 # add static mapping for servers
1841 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1842 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1846 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1847 IP(src=host.ip4, dst=server1_nat_ip) /
1848 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1850 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1851 IP(src=host.ip4, dst=server1_nat_ip) /
1852 UDP(sport=self.udp_port_in, dport=server_udp_port))
1854 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1855 IP(src=host.ip4, dst=server1_nat_ip) /
1856 ICMP(id=self.icmp_id_in, type='echo-request'))
1858 self.pg0.add_stream(pkts)
1859 self.pg_enable_capture(self.pg_interfaces)
1861 capture = self.pg0.get_capture(len(pkts))
1862 for packet in capture:
1864 self.assertEqual(packet[IP].src, self.nat_addr)
1865 self.assertEqual(packet[IP].dst, server1.ip4)
1866 if packet.haslayer(TCP):
1867 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1868 self.assertEqual(packet[TCP].dport, server_tcp_port)
1869 self.tcp_port_out = packet[TCP].sport
1870 self.assert_packet_checksums_valid(packet)
1871 elif packet.haslayer(UDP):
1872 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1873 self.assertEqual(packet[UDP].dport, server_udp_port)
1874 self.udp_port_out = packet[UDP].sport
1876 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1877 self.icmp_id_out = packet[ICMP].id
1879 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1884 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1885 IP(src=server1.ip4, dst=self.nat_addr) /
1886 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1888 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1889 IP(src=server1.ip4, dst=self.nat_addr) /
1890 UDP(sport=server_udp_port, dport=self.udp_port_out))
1892 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1893 IP(src=server1.ip4, dst=self.nat_addr) /
1894 ICMP(id=self.icmp_id_out, type='echo-reply'))
1896 self.pg0.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg0.get_capture(len(pkts))
1900 for packet in capture:
1902 self.assertEqual(packet[IP].src, server1_nat_ip)
1903 self.assertEqual(packet[IP].dst, host.ip4)
1904 if packet.haslayer(TCP):
1905 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1906 self.assertEqual(packet[TCP].sport, server_tcp_port)
1907 self.assert_packet_checksums_valid(packet)
1908 elif packet.haslayer(UDP):
1909 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1910 self.assertEqual(packet[UDP].sport, server_udp_port)
1912 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1917 # server2 to server1
1919 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1920 IP(src=server2.ip4, dst=server1_nat_ip) /
1921 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1923 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1924 IP(src=server2.ip4, dst=server1_nat_ip) /
1925 UDP(sport=self.udp_port_in, dport=server_udp_port))
1927 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1928 IP(src=server2.ip4, dst=server1_nat_ip) /
1929 ICMP(id=self.icmp_id_in, type='echo-request'))
1931 self.pg0.add_stream(pkts)
1932 self.pg_enable_capture(self.pg_interfaces)
1934 capture = self.pg0.get_capture(len(pkts))
1935 for packet in capture:
1937 self.assertEqual(packet[IP].src, server2_nat_ip)
1938 self.assertEqual(packet[IP].dst, server1.ip4)
1939 if packet.haslayer(TCP):
1940 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1941 self.assertEqual(packet[TCP].dport, server_tcp_port)
1942 self.tcp_port_out = packet[TCP].sport
1943 self.assert_packet_checksums_valid(packet)
1944 elif packet.haslayer(UDP):
1945 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1946 self.assertEqual(packet[UDP].dport, server_udp_port)
1947 self.udp_port_out = packet[UDP].sport
1949 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1950 self.icmp_id_out = packet[ICMP].id
1952 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1955 # server1 to server2
1957 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1958 IP(src=server1.ip4, dst=server2_nat_ip) /
1959 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1961 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1962 IP(src=server1.ip4, dst=server2_nat_ip) /
1963 UDP(sport=server_udp_port, dport=self.udp_port_out))
1965 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1966 IP(src=server1.ip4, dst=server2_nat_ip) /
1967 ICMP(id=self.icmp_id_out, type='echo-reply'))
1969 self.pg0.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg0.get_capture(len(pkts))
1973 for packet in capture:
1975 self.assertEqual(packet[IP].src, server1_nat_ip)
1976 self.assertEqual(packet[IP].dst, server2.ip4)
1977 if packet.haslayer(TCP):
1978 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1979 self.assertEqual(packet[TCP].sport, server_tcp_port)
1980 self.assert_packet_checksums_valid(packet)
1981 elif packet.haslayer(UDP):
1982 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1983 self.assertEqual(packet[UDP].sport, server_udp_port)
1985 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1990 def test_max_translations_per_user(self):
1991 """ MAX translations per user - recycle the least recently used """
1993 self.nat44_add_address(self.nat_addr)
1994 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1995 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1998 # get maximum number of translations per user
1999 nat44_config = self.vapi.nat_show_config()
2001 # send more than maximum number of translations per user packets
2002 pkts_num = nat44_config.max_translations_per_user + 5
2004 for port in range(0, pkts_num):
2005 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2007 TCP(sport=1025 + port))
2009 self.pg0.add_stream(pkts)
2010 self.pg_enable_capture(self.pg_interfaces)
2013 # verify number of translated packet
2014 self.pg1.get_capture(pkts_num)
2016 users = self.vapi.nat44_user_dump()
2018 if user.ip_address == self.pg0.remote_ip4n:
2019 self.assertEqual(user.nsessions,
2020 nat44_config.max_translations_per_user)
2021 self.assertEqual(user.nstaticsessions, 0)
2024 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2026 proto=IP_PROTOS.tcp)
2027 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2028 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2029 TCP(sport=tcp_port))
2030 self.pg0.add_stream(p)
2031 self.pg_enable_capture(self.pg_interfaces)
2033 self.pg1.get_capture(1)
2034 users = self.vapi.nat44_user_dump()
2036 if user.ip_address == self.pg0.remote_ip4n:
2037 self.assertEqual(user.nsessions,
2038 nat44_config.max_translations_per_user - 1)
2039 self.assertEqual(user.nstaticsessions, 1)
2041 def test_interface_addr(self):
2042 """ Acquire NAT44 addresses from interface """
2043 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2045 # no address in NAT pool
2046 adresses = self.vapi.nat44_address_dump()
2047 self.assertEqual(0, len(adresses))
2049 # configure interface address and check NAT address pool
2050 self.pg7.config_ip4()
2051 adresses = self.vapi.nat44_address_dump()
2052 self.assertEqual(1, len(adresses))
2053 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2055 # remove interface address and check NAT address pool
2056 self.pg7.unconfig_ip4()
2057 adresses = self.vapi.nat44_address_dump()
2058 self.assertEqual(0, len(adresses))
2060 def test_interface_addr_static_mapping(self):
2061 """ Static mapping with addresses from interface """
2064 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2065 self.nat44_add_static_mapping(
2067 external_sw_if_index=self.pg7.sw_if_index,
2070 # static mappings with external interface
2071 static_mappings = self.vapi.nat44_static_mapping_dump()
2072 self.assertEqual(1, len(static_mappings))
2073 self.assertEqual(self.pg7.sw_if_index,
2074 static_mappings[0].external_sw_if_index)
2075 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2077 # configure interface address and check static mappings
2078 self.pg7.config_ip4()
2079 static_mappings = self.vapi.nat44_static_mapping_dump()
2080 self.assertEqual(2, len(static_mappings))
2082 for sm in static_mappings:
2083 if sm.external_sw_if_index == 0xFFFFFFFF:
2084 self.assertEqual(sm.external_ip_address[0:4],
2085 self.pg7.local_ip4n)
2086 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2088 self.assertTrue(resolved)
2090 # remove interface address and check static mappings
2091 self.pg7.unconfig_ip4()
2092 static_mappings = self.vapi.nat44_static_mapping_dump()
2093 self.assertEqual(1, len(static_mappings))
2094 self.assertEqual(self.pg7.sw_if_index,
2095 static_mappings[0].external_sw_if_index)
2096 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2098 # configure interface address again and check static mappings
2099 self.pg7.config_ip4()
2100 static_mappings = self.vapi.nat44_static_mapping_dump()
2101 self.assertEqual(2, len(static_mappings))
2103 for sm in static_mappings:
2104 if sm.external_sw_if_index == 0xFFFFFFFF:
2105 self.assertEqual(sm.external_ip_address[0:4],
2106 self.pg7.local_ip4n)
2107 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2109 self.assertTrue(resolved)
2111 # remove static mapping
2112 self.nat44_add_static_mapping(
2114 external_sw_if_index=self.pg7.sw_if_index,
2117 static_mappings = self.vapi.nat44_static_mapping_dump()
2118 self.assertEqual(0, len(static_mappings))
2120 def test_interface_addr_identity_nat(self):
2121 """ Identity NAT with addresses from interface """
2124 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2125 self.vapi.nat44_add_del_identity_mapping(
2126 sw_if_index=self.pg7.sw_if_index,
2128 protocol=IP_PROTOS.tcp,
2131 # identity mappings with external interface
2132 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2133 self.assertEqual(1, len(identity_mappings))
2134 self.assertEqual(self.pg7.sw_if_index,
2135 identity_mappings[0].sw_if_index)
2137 # configure interface address and check identity mappings
2138 self.pg7.config_ip4()
2139 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2141 self.assertEqual(2, len(identity_mappings))
2142 for sm in identity_mappings:
2143 if sm.sw_if_index == 0xFFFFFFFF:
2144 self.assertEqual(identity_mappings[0].ip_address,
2145 self.pg7.local_ip4n)
2146 self.assertEqual(port, identity_mappings[0].port)
2147 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2149 self.assertTrue(resolved)
2151 # remove interface address and check identity mappings
2152 self.pg7.unconfig_ip4()
2153 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2154 self.assertEqual(1, len(identity_mappings))
2155 self.assertEqual(self.pg7.sw_if_index,
2156 identity_mappings[0].sw_if_index)
2158 def test_ipfix_nat44_sess(self):
2159 """ IPFIX logging NAT44 session created/delted """
2160 self.ipfix_domain_id = 10
2161 self.ipfix_src_port = 20202
2162 colector_port = 30303
2163 bind_layers(UDP, IPFIX, dport=30303)
2164 self.nat44_add_address(self.nat_addr)
2165 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2166 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2168 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2169 src_address=self.pg3.local_ip4n,
2171 template_interval=10,
2172 collector_port=colector_port)
2173 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2174 src_port=self.ipfix_src_port)
2176 pkts = self.create_stream_in(self.pg0, self.pg1)
2177 self.pg0.add_stream(pkts)
2178 self.pg_enable_capture(self.pg_interfaces)
2180 capture = self.pg1.get_capture(len(pkts))
2181 self.verify_capture_out(capture)
2182 self.nat44_add_address(self.nat_addr, is_add=0)
2183 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2184 capture = self.pg3.get_capture(9)
2185 ipfix = IPFIXDecoder()
2186 # first load template
2188 self.assertTrue(p.haslayer(IPFIX))
2189 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2190 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2191 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2192 self.assertEqual(p[UDP].dport, colector_port)
2193 self.assertEqual(p[IPFIX].observationDomainID,
2194 self.ipfix_domain_id)
2195 if p.haslayer(Template):
2196 ipfix.add_template(p.getlayer(Template))
2197 # verify events in data set
2199 if p.haslayer(Data):
2200 data = ipfix.decode_data_set(p.getlayer(Set))
2201 self.verify_ipfix_nat44_ses(data)
2203 def test_ipfix_addr_exhausted(self):
2204 """ IPFIX logging NAT addresses exhausted """
2205 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2206 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2208 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2209 src_address=self.pg3.local_ip4n,
2211 template_interval=10)
2212 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2213 src_port=self.ipfix_src_port)
2215 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2216 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2218 self.pg0.add_stream(p)
2219 self.pg_enable_capture(self.pg_interfaces)
2221 self.pg1.assert_nothing_captured()
2223 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2224 capture = self.pg3.get_capture(9)
2225 ipfix = IPFIXDecoder()
2226 # first load template
2228 self.assertTrue(p.haslayer(IPFIX))
2229 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2230 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2231 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2232 self.assertEqual(p[UDP].dport, 4739)
2233 self.assertEqual(p[IPFIX].observationDomainID,
2234 self.ipfix_domain_id)
2235 if p.haslayer(Template):
2236 ipfix.add_template(p.getlayer(Template))
2237 # verify events in data set
2239 if p.haslayer(Data):
2240 data = ipfix.decode_data_set(p.getlayer(Set))
2241 self.verify_ipfix_addr_exhausted(data)
2243 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2244 def test_ipfix_max_sessions(self):
2245 """ IPFIX logging maximum session entries exceeded """
2246 self.nat44_add_address(self.nat_addr)
2247 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2248 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2251 nat44_config = self.vapi.nat_show_config()
2252 max_sessions = 10 * nat44_config.translation_buckets
2255 for i in range(0, max_sessions):
2256 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=src, dst=self.pg1.remote_ip4) /
2261 self.pg0.add_stream(pkts)
2262 self.pg_enable_capture(self.pg_interfaces)
2265 self.pg1.get_capture(max_sessions)
2266 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2267 src_address=self.pg3.local_ip4n,
2269 template_interval=10)
2270 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2271 src_port=self.ipfix_src_port)
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2276 self.pg0.add_stream(p)
2277 self.pg_enable_capture(self.pg_interfaces)
2279 self.pg1.assert_nothing_captured()
2281 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2282 capture = self.pg3.get_capture(9)
2283 ipfix = IPFIXDecoder()
2284 # first load template
2286 self.assertTrue(p.haslayer(IPFIX))
2287 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2288 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2289 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2290 self.assertEqual(p[UDP].dport, 4739)
2291 self.assertEqual(p[IPFIX].observationDomainID,
2292 self.ipfix_domain_id)
2293 if p.haslayer(Template):
2294 ipfix.add_template(p.getlayer(Template))
2295 # verify events in data set
2297 if p.haslayer(Data):
2298 data = ipfix.decode_data_set(p.getlayer(Set))
2299 self.verify_ipfix_max_sessions(data, max_sessions)
2301 def test_pool_addr_fib(self):
2302 """ NAT44 add pool addresses to FIB """
2303 static_addr = '10.0.0.10'
2304 self.nat44_add_address(self.nat_addr)
2305 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2308 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2311 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2312 ARP(op=ARP.who_has, pdst=self.nat_addr,
2313 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2314 self.pg1.add_stream(p)
2315 self.pg_enable_capture(self.pg_interfaces)
2317 capture = self.pg1.get_capture(1)
2318 self.assertTrue(capture[0].haslayer(ARP))
2319 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2322 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2323 ARP(op=ARP.who_has, pdst=static_addr,
2324 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2325 self.pg1.add_stream(p)
2326 self.pg_enable_capture(self.pg_interfaces)
2328 capture = self.pg1.get_capture(1)
2329 self.assertTrue(capture[0].haslayer(ARP))
2330 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2332 # send ARP to non-NAT44 interface
2333 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2334 ARP(op=ARP.who_has, pdst=self.nat_addr,
2335 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2336 self.pg2.add_stream(p)
2337 self.pg_enable_capture(self.pg_interfaces)
2339 self.pg1.assert_nothing_captured()
2341 # remove addresses and verify
2342 self.nat44_add_address(self.nat_addr, is_add=0)
2343 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2346 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2347 ARP(op=ARP.who_has, pdst=self.nat_addr,
2348 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2349 self.pg1.add_stream(p)
2350 self.pg_enable_capture(self.pg_interfaces)
2352 self.pg1.assert_nothing_captured()
2354 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2355 ARP(op=ARP.who_has, pdst=static_addr,
2356 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2357 self.pg1.add_stream(p)
2358 self.pg_enable_capture(self.pg_interfaces)
2360 self.pg1.assert_nothing_captured()
2362 def test_vrf_mode(self):
2363 """ NAT44 tenant VRF aware address pool mode """
2367 nat_ip1 = "10.0.0.10"
2368 nat_ip2 = "10.0.0.11"
2370 self.pg0.unconfig_ip4()
2371 self.pg1.unconfig_ip4()
2372 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2373 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2374 self.pg0.set_table_ip4(vrf_id1)
2375 self.pg1.set_table_ip4(vrf_id2)
2376 self.pg0.config_ip4()
2377 self.pg1.config_ip4()
2378 self.pg0.resolve_arp()
2379 self.pg1.resolve_arp()
2381 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2382 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2383 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2384 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2385 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2390 pkts = self.create_stream_in(self.pg0, self.pg2)
2391 self.pg0.add_stream(pkts)
2392 self.pg_enable_capture(self.pg_interfaces)
2394 capture = self.pg2.get_capture(len(pkts))
2395 self.verify_capture_out(capture, nat_ip1)
2398 pkts = self.create_stream_in(self.pg1, self.pg2)
2399 self.pg1.add_stream(pkts)
2400 self.pg_enable_capture(self.pg_interfaces)
2402 capture = self.pg2.get_capture(len(pkts))
2403 self.verify_capture_out(capture, nat_ip2)
2406 self.pg0.unconfig_ip4()
2407 self.pg1.unconfig_ip4()
2408 self.pg0.set_table_ip4(0)
2409 self.pg1.set_table_ip4(0)
2410 self.pg0.config_ip4()
2411 self.pg1.config_ip4()
2412 self.pg0.resolve_arp()
2413 self.pg1.resolve_arp()
2414 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2415 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2417 def test_vrf_feature_independent(self):
2418 """ NAT44 tenant VRF independent address pool mode """
2420 nat_ip1 = "10.0.0.10"
2421 nat_ip2 = "10.0.0.11"
2423 self.nat44_add_address(nat_ip1)
2424 self.nat44_add_address(nat_ip2, vrf_id=99)
2425 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2426 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2427 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2431 pkts = self.create_stream_in(self.pg0, self.pg2)
2432 self.pg0.add_stream(pkts)
2433 self.pg_enable_capture(self.pg_interfaces)
2435 capture = self.pg2.get_capture(len(pkts))
2436 self.verify_capture_out(capture, nat_ip1)
2439 pkts = self.create_stream_in(self.pg1, self.pg2)
2440 self.pg1.add_stream(pkts)
2441 self.pg_enable_capture(self.pg_interfaces)
2443 capture = self.pg2.get_capture(len(pkts))
2444 self.verify_capture_out(capture, nat_ip1)
2446 def test_dynamic_ipless_interfaces(self):
2447 """ NAT44 interfaces without configured IP address """
2449 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2450 mactobinary(self.pg7.remote_mac),
2451 self.pg7.remote_ip4n,
2453 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2454 mactobinary(self.pg8.remote_mac),
2455 self.pg8.remote_ip4n,
2458 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2459 dst_address_length=32,
2460 next_hop_address=self.pg7.remote_ip4n,
2461 next_hop_sw_if_index=self.pg7.sw_if_index)
2462 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2463 dst_address_length=32,
2464 next_hop_address=self.pg8.remote_ip4n,
2465 next_hop_sw_if_index=self.pg8.sw_if_index)
2467 self.nat44_add_address(self.nat_addr)
2468 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2469 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2473 pkts = self.create_stream_in(self.pg7, self.pg8)
2474 self.pg7.add_stream(pkts)
2475 self.pg_enable_capture(self.pg_interfaces)
2477 capture = self.pg8.get_capture(len(pkts))
2478 self.verify_capture_out(capture)
2481 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2482 self.pg8.add_stream(pkts)
2483 self.pg_enable_capture(self.pg_interfaces)
2485 capture = self.pg7.get_capture(len(pkts))
2486 self.verify_capture_in(capture, self.pg7)
2488 def test_static_ipless_interfaces(self):
2489 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2491 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2492 mactobinary(self.pg7.remote_mac),
2493 self.pg7.remote_ip4n,
2495 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2496 mactobinary(self.pg8.remote_mac),
2497 self.pg8.remote_ip4n,
2500 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2501 dst_address_length=32,
2502 next_hop_address=self.pg7.remote_ip4n,
2503 next_hop_sw_if_index=self.pg7.sw_if_index)
2504 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2505 dst_address_length=32,
2506 next_hop_address=self.pg8.remote_ip4n,
2507 next_hop_sw_if_index=self.pg8.sw_if_index)
2509 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2510 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2511 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2515 pkts = self.create_stream_out(self.pg8)
2516 self.pg8.add_stream(pkts)
2517 self.pg_enable_capture(self.pg_interfaces)
2519 capture = self.pg7.get_capture(len(pkts))
2520 self.verify_capture_in(capture, self.pg7)
2523 pkts = self.create_stream_in(self.pg7, self.pg8)
2524 self.pg7.add_stream(pkts)
2525 self.pg_enable_capture(self.pg_interfaces)
2527 capture = self.pg8.get_capture(len(pkts))
2528 self.verify_capture_out(capture, self.nat_addr, True)
2530 def test_static_with_port_ipless_interfaces(self):
2531 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2533 self.tcp_port_out = 30606
2534 self.udp_port_out = 30607
2535 self.icmp_id_out = 30608
2537 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2538 mactobinary(self.pg7.remote_mac),
2539 self.pg7.remote_ip4n,
2541 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2542 mactobinary(self.pg8.remote_mac),
2543 self.pg8.remote_ip4n,
2546 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2547 dst_address_length=32,
2548 next_hop_address=self.pg7.remote_ip4n,
2549 next_hop_sw_if_index=self.pg7.sw_if_index)
2550 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2551 dst_address_length=32,
2552 next_hop_address=self.pg8.remote_ip4n,
2553 next_hop_sw_if_index=self.pg8.sw_if_index)
2555 self.nat44_add_address(self.nat_addr)
2556 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2557 self.tcp_port_in, self.tcp_port_out,
2558 proto=IP_PROTOS.tcp)
2559 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2560 self.udp_port_in, self.udp_port_out,
2561 proto=IP_PROTOS.udp)
2562 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2563 self.icmp_id_in, self.icmp_id_out,
2564 proto=IP_PROTOS.icmp)
2565 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2566 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2570 pkts = self.create_stream_out(self.pg8)
2571 self.pg8.add_stream(pkts)
2572 self.pg_enable_capture(self.pg_interfaces)
2574 capture = self.pg7.get_capture(len(pkts))
2575 self.verify_capture_in(capture, self.pg7)
2578 pkts = self.create_stream_in(self.pg7, self.pg8)
2579 self.pg7.add_stream(pkts)
2580 self.pg_enable_capture(self.pg_interfaces)
2582 capture = self.pg8.get_capture(len(pkts))
2583 self.verify_capture_out(capture)
2585 def test_static_unknown_proto(self):
2586 """ 1:1 NAT translate packet with unknown protocol """
2587 nat_ip = "10.0.0.10"
2588 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2589 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2590 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2594 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2595 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2597 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2598 TCP(sport=1234, dport=1234))
2599 self.pg0.add_stream(p)
2600 self.pg_enable_capture(self.pg_interfaces)
2602 p = self.pg1.get_capture(1)
2605 self.assertEqual(packet[IP].src, nat_ip)
2606 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2607 self.assertTrue(packet.haslayer(GRE))
2608 self.assert_packet_checksums_valid(packet)
2610 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2614 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2615 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2617 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2618 TCP(sport=1234, dport=1234))
2619 self.pg1.add_stream(p)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 p = self.pg0.get_capture(1)
2625 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2626 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2627 self.assertTrue(packet.haslayer(GRE))
2628 self.assert_packet_checksums_valid(packet)
2630 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2633 def test_hairpinning_static_unknown_proto(self):
2634 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2636 host = self.pg0.remote_hosts[0]
2637 server = self.pg0.remote_hosts[1]
2639 host_nat_ip = "10.0.0.10"
2640 server_nat_ip = "10.0.0.11"
2642 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2643 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2649 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2650 IP(src=host.ip4, dst=server_nat_ip) /
2652 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2653 TCP(sport=1234, dport=1234))
2654 self.pg0.add_stream(p)
2655 self.pg_enable_capture(self.pg_interfaces)
2657 p = self.pg0.get_capture(1)
2660 self.assertEqual(packet[IP].src, host_nat_ip)
2661 self.assertEqual(packet[IP].dst, server.ip4)
2662 self.assertTrue(packet.haslayer(GRE))
2663 self.assert_packet_checksums_valid(packet)
2665 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2669 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2670 IP(src=server.ip4, dst=host_nat_ip) /
2672 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2673 TCP(sport=1234, dport=1234))
2674 self.pg0.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 p = self.pg0.get_capture(1)
2680 self.assertEqual(packet[IP].src, server_nat_ip)
2681 self.assertEqual(packet[IP].dst, host.ip4)
2682 self.assertTrue(packet.haslayer(GRE))
2683 self.assert_packet_checksums_valid(packet)
2685 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2688 def test_output_feature(self):
2689 """ NAT44 interface output feature (in2out postrouting) """
2690 self.nat44_add_address(self.nat_addr)
2691 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2692 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2693 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2697 pkts = self.create_stream_in(self.pg0, self.pg3)
2698 self.pg0.add_stream(pkts)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg3.get_capture(len(pkts))
2702 self.verify_capture_out(capture)
2705 pkts = self.create_stream_out(self.pg3)
2706 self.pg3.add_stream(pkts)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg0.get_capture(len(pkts))
2710 self.verify_capture_in(capture, self.pg0)
2712 # from non-NAT interface to NAT inside interface
2713 pkts = self.create_stream_in(self.pg2, self.pg0)
2714 self.pg2.add_stream(pkts)
2715 self.pg_enable_capture(self.pg_interfaces)
2717 capture = self.pg0.get_capture(len(pkts))
2718 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2720 def test_output_feature_vrf_aware(self):
2721 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2722 nat_ip_vrf10 = "10.0.0.10"
2723 nat_ip_vrf20 = "10.0.0.20"
2725 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2726 dst_address_length=32,
2727 next_hop_address=self.pg3.remote_ip4n,
2728 next_hop_sw_if_index=self.pg3.sw_if_index,
2730 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2731 dst_address_length=32,
2732 next_hop_address=self.pg3.remote_ip4n,
2733 next_hop_sw_if_index=self.pg3.sw_if_index,
2736 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2737 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2738 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2739 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2740 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2744 pkts = self.create_stream_in(self.pg4, self.pg3)
2745 self.pg4.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg3.get_capture(len(pkts))
2749 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2752 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2753 self.pg3.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg4.get_capture(len(pkts))
2757 self.verify_capture_in(capture, self.pg4)
2760 pkts = self.create_stream_in(self.pg6, self.pg3)
2761 self.pg6.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg3.get_capture(len(pkts))
2765 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2768 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2769 self.pg3.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg6.get_capture(len(pkts))
2773 self.verify_capture_in(capture, self.pg6)
2775 def test_output_feature_hairpinning(self):
2776 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2777 host = self.pg0.remote_hosts[0]
2778 server = self.pg0.remote_hosts[1]
2781 server_in_port = 5678
2782 server_out_port = 8765
2784 self.nat44_add_address(self.nat_addr)
2785 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2786 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2789 # add static mapping for server
2790 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2791 server_in_port, server_out_port,
2792 proto=IP_PROTOS.tcp)
2794 # send packet from host to server
2795 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2796 IP(src=host.ip4, dst=self.nat_addr) /
2797 TCP(sport=host_in_port, dport=server_out_port))
2798 self.pg0.add_stream(p)
2799 self.pg_enable_capture(self.pg_interfaces)
2801 capture = self.pg0.get_capture(1)
2806 self.assertEqual(ip.src, self.nat_addr)
2807 self.assertEqual(ip.dst, server.ip4)
2808 self.assertNotEqual(tcp.sport, host_in_port)
2809 self.assertEqual(tcp.dport, server_in_port)
2810 self.assert_packet_checksums_valid(p)
2811 host_out_port = tcp.sport
2813 self.logger.error(ppp("Unexpected or invalid packet:", p))
2816 # send reply from server to host
2817 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2818 IP(src=server.ip4, dst=self.nat_addr) /
2819 TCP(sport=server_in_port, dport=host_out_port))
2820 self.pg0.add_stream(p)
2821 self.pg_enable_capture(self.pg_interfaces)
2823 capture = self.pg0.get_capture(1)
2828 self.assertEqual(ip.src, self.nat_addr)
2829 self.assertEqual(ip.dst, host.ip4)
2830 self.assertEqual(tcp.sport, server_out_port)
2831 self.assertEqual(tcp.dport, host_in_port)
2832 self.assert_packet_checksums_valid(p)
2834 self.logger.error(ppp("Unexpected or invalid packet:", p))
2837 def test_one_armed_nat44(self):
2838 """ One armed NAT44 """
2839 remote_host = self.pg9.remote_hosts[0]
2840 local_host = self.pg9.remote_hosts[1]
2843 self.nat44_add_address(self.nat_addr)
2844 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2845 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2849 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2850 IP(src=local_host.ip4, dst=remote_host.ip4) /
2851 TCP(sport=12345, dport=80))
2852 self.pg9.add_stream(p)
2853 self.pg_enable_capture(self.pg_interfaces)
2855 capture = self.pg9.get_capture(1)
2860 self.assertEqual(ip.src, self.nat_addr)
2861 self.assertEqual(ip.dst, remote_host.ip4)
2862 self.assertNotEqual(tcp.sport, 12345)
2863 external_port = tcp.sport
2864 self.assertEqual(tcp.dport, 80)
2865 self.assert_packet_checksums_valid(p)
2867 self.logger.error(ppp("Unexpected or invalid packet:", p))
2871 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2872 IP(src=remote_host.ip4, dst=self.nat_addr) /
2873 TCP(sport=80, dport=external_port))
2874 self.pg9.add_stream(p)
2875 self.pg_enable_capture(self.pg_interfaces)
2877 capture = self.pg9.get_capture(1)
2882 self.assertEqual(ip.src, remote_host.ip4)
2883 self.assertEqual(ip.dst, local_host.ip4)
2884 self.assertEqual(tcp.sport, 80)
2885 self.assertEqual(tcp.dport, 12345)
2886 self.assert_packet_checksums_valid(p)
2888 self.logger.error(ppp("Unexpected or invalid packet:", p))
2891 def test_del_session(self):
2892 """ Delete NAT44 session """
2893 self.nat44_add_address(self.nat_addr)
2894 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2895 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2898 pkts = self.create_stream_in(self.pg0, self.pg1)
2899 self.pg0.add_stream(pkts)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 self.pg1.get_capture(len(pkts))
2904 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2905 nsessions = len(sessions)
2907 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2908 sessions[0].inside_port,
2909 sessions[0].protocol)
2910 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2911 sessions[1].outside_port,
2912 sessions[1].protocol,
2915 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2916 self.assertEqual(nsessions - len(sessions), 2)
2918 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2919 sessions[0].inside_port,
2920 sessions[0].protocol)
2922 self.verify_no_nat44_user()
2924 def test_set_get_reass(self):
2925 """ NAT44 set/get virtual fragmentation reassembly """
2926 reas_cfg1 = self.vapi.nat_get_reass()
2928 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2929 max_reass=reas_cfg1.ip4_max_reass * 2,
2930 max_frag=reas_cfg1.ip4_max_frag * 2)
2932 reas_cfg2 = self.vapi.nat_get_reass()
2934 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2935 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2936 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2938 self.vapi.nat_set_reass(drop_frag=1)
2939 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2941 def test_frag_in_order(self):
2942 """ NAT44 translate fragments arriving in order """
2943 self.nat44_add_address(self.nat_addr)
2944 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2945 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2948 data = "A" * 4 + "B" * 16 + "C" * 3
2949 self.tcp_port_in = random.randint(1025, 65535)
2951 reass = self.vapi.nat_reass_dump()
2952 reass_n_start = len(reass)
2955 pkts = self.create_stream_frag(self.pg0,
2956 self.pg1.remote_ip4,
2960 self.pg0.add_stream(pkts)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 frags = self.pg1.get_capture(len(pkts))
2964 p = self.reass_frags_and_verify(frags,
2966 self.pg1.remote_ip4)
2967 self.assertEqual(p[TCP].dport, 20)
2968 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2969 self.tcp_port_out = p[TCP].sport
2970 self.assertEqual(data, p[Raw].load)
2973 pkts = self.create_stream_frag(self.pg1,
2978 self.pg1.add_stream(pkts)
2979 self.pg_enable_capture(self.pg_interfaces)
2981 frags = self.pg0.get_capture(len(pkts))
2982 p = self.reass_frags_and_verify(frags,
2983 self.pg1.remote_ip4,
2984 self.pg0.remote_ip4)
2985 self.assertEqual(p[TCP].sport, 20)
2986 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2987 self.assertEqual(data, p[Raw].load)
2989 reass = self.vapi.nat_reass_dump()
2990 reass_n_end = len(reass)
2992 self.assertEqual(reass_n_end - reass_n_start, 2)
2994 def test_reass_hairpinning(self):
2995 """ NAT44 fragments hairpinning """
2996 server = self.pg0.remote_hosts[1]
2997 host_in_port = random.randint(1025, 65535)
2998 server_in_port = random.randint(1025, 65535)
2999 server_out_port = random.randint(1025, 65535)
3000 data = "A" * 4 + "B" * 16 + "C" * 3
3002 self.nat44_add_address(self.nat_addr)
3003 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3004 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3006 # add static mapping for server
3007 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3008 server_in_port, server_out_port,
3009 proto=IP_PROTOS.tcp)
3011 # send packet from host to server
3012 pkts = self.create_stream_frag(self.pg0,
3017 self.pg0.add_stream(pkts)
3018 self.pg_enable_capture(self.pg_interfaces)
3020 frags = self.pg0.get_capture(len(pkts))
3021 p = self.reass_frags_and_verify(frags,
3024 self.assertNotEqual(p[TCP].sport, host_in_port)
3025 self.assertEqual(p[TCP].dport, server_in_port)
3026 self.assertEqual(data, p[Raw].load)
3028 def test_frag_out_of_order(self):
3029 """ NAT44 translate fragments arriving out of order """
3030 self.nat44_add_address(self.nat_addr)
3031 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3032 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3035 data = "A" * 4 + "B" * 16 + "C" * 3
3036 random.randint(1025, 65535)
3039 pkts = self.create_stream_frag(self.pg0,
3040 self.pg1.remote_ip4,
3045 self.pg0.add_stream(pkts)
3046 self.pg_enable_capture(self.pg_interfaces)
3048 frags = self.pg1.get_capture(len(pkts))
3049 p = self.reass_frags_and_verify(frags,
3051 self.pg1.remote_ip4)
3052 self.assertEqual(p[TCP].dport, 20)
3053 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3054 self.tcp_port_out = p[TCP].sport
3055 self.assertEqual(data, p[Raw].load)
3058 pkts = self.create_stream_frag(self.pg1,
3064 self.pg1.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 frags = self.pg0.get_capture(len(pkts))
3068 p = self.reass_frags_and_verify(frags,
3069 self.pg1.remote_ip4,
3070 self.pg0.remote_ip4)
3071 self.assertEqual(p[TCP].sport, 20)
3072 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3073 self.assertEqual(data, p[Raw].load)
3075 def test_port_restricted(self):
3076 """ Port restricted NAT44 (MAP-E CE) """
3077 self.nat44_add_address(self.nat_addr)
3078 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3079 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3081 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3082 "psid-offset 6 psid-len 6")
3084 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3085 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3086 TCP(sport=4567, dport=22))
3087 self.pg0.add_stream(p)
3088 self.pg_enable_capture(self.pg_interfaces)
3090 capture = self.pg1.get_capture(1)
3095 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3096 self.assertEqual(ip.src, self.nat_addr)
3097 self.assertEqual(tcp.dport, 22)
3098 self.assertNotEqual(tcp.sport, 4567)
3099 self.assertEqual((tcp.sport >> 6) & 63, 10)
3100 self.assert_packet_checksums_valid(p)
3102 self.logger.error(ppp("Unexpected or invalid packet:", p))
3105 def test_ipfix_max_frags(self):
3106 """ IPFIX logging maximum fragments pending reassembly exceeded """
3107 self.nat44_add_address(self.nat_addr)
3108 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3109 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3111 self.vapi.nat_set_reass(max_frag=0)
3112 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3113 src_address=self.pg3.local_ip4n,
3115 template_interval=10)
3116 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3117 src_port=self.ipfix_src_port)
3119 data = "A" * 4 + "B" * 16 + "C" * 3
3120 self.tcp_port_in = random.randint(1025, 65535)
3121 pkts = self.create_stream_frag(self.pg0,
3122 self.pg1.remote_ip4,
3126 self.pg0.add_stream(pkts[-1])
3127 self.pg_enable_capture(self.pg_interfaces)
3129 self.pg1.assert_nothing_captured()
3131 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3132 capture = self.pg3.get_capture(9)
3133 ipfix = IPFIXDecoder()
3134 # first load template
3136 self.assertTrue(p.haslayer(IPFIX))
3137 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3138 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3139 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3140 self.assertEqual(p[UDP].dport, 4739)
3141 self.assertEqual(p[IPFIX].observationDomainID,
3142 self.ipfix_domain_id)
3143 if p.haslayer(Template):
3144 ipfix.add_template(p.getlayer(Template))
3145 # verify events in data set
3147 if p.haslayer(Data):
3148 data = ipfix.decode_data_set(p.getlayer(Set))
3149 self.verify_ipfix_max_fragments_ip4(data, 0,
3150 self.pg0.remote_ip4n)
3152 def test_multiple_outside_vrf(self):
3153 """ Multiple outside VRF """
3157 self.pg1.unconfig_ip4()
3158 self.pg2.unconfig_ip4()
3159 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3160 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3161 self.pg1.set_table_ip4(vrf_id1)
3162 self.pg2.set_table_ip4(vrf_id2)
3163 self.pg1.config_ip4()
3164 self.pg2.config_ip4()
3165 self.pg1.resolve_arp()
3166 self.pg2.resolve_arp()
3168 self.nat44_add_address(self.nat_addr)
3169 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3170 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3172 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3177 pkts = self.create_stream_in(self.pg0, self.pg1)
3178 self.pg0.add_stream(pkts)
3179 self.pg_enable_capture(self.pg_interfaces)
3181 capture = self.pg1.get_capture(len(pkts))
3182 self.verify_capture_out(capture, self.nat_addr)
3184 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3185 self.pg1.add_stream(pkts)
3186 self.pg_enable_capture(self.pg_interfaces)
3188 capture = self.pg0.get_capture(len(pkts))
3189 self.verify_capture_in(capture, self.pg0)
3191 self.tcp_port_in = 60303
3192 self.udp_port_in = 60304
3193 self.icmp_id_in = 60305
3196 pkts = self.create_stream_in(self.pg0, self.pg2)
3197 self.pg0.add_stream(pkts)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg2.get_capture(len(pkts))
3201 self.verify_capture_out(capture, self.nat_addr)
3203 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3204 self.pg2.add_stream(pkts)
3205 self.pg_enable_capture(self.pg_interfaces)
3207 capture = self.pg0.get_capture(len(pkts))
3208 self.verify_capture_in(capture, self.pg0)
3211 self.pg1.unconfig_ip4()
3212 self.pg2.unconfig_ip4()
3213 self.pg1.set_table_ip4(0)
3214 self.pg2.set_table_ip4(0)
3215 self.pg1.config_ip4()
3216 self.pg2.config_ip4()
3217 self.pg1.resolve_arp()
3218 self.pg2.resolve_arp()
3221 super(TestNAT44, self).tearDown()
3222 if not self.vpp_dead:
3223 self.logger.info(self.vapi.cli("show nat44 addresses"))
3224 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3225 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3226 self.logger.info(self.vapi.cli("show nat44 interface address"))
3227 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3228 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3229 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3230 self.vapi.cli("nat addr-port-assignment-alg default")
3232 self.vapi.cli("clear logging")
3235 class TestNAT44EndpointDependent(MethodHolder):
3236 """ Endpoint-Dependent mapping and filtering test cases """
3239 def setUpConstants(cls):
3240 super(TestNAT44EndpointDependent, cls).setUpConstants()
3241 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3244 def setUpClass(cls):
3245 super(TestNAT44EndpointDependent, cls).setUpClass()
3246 cls.vapi.cli("set log class nat level debug")
3248 cls.tcp_port_in = 6303
3249 cls.tcp_port_out = 6303
3250 cls.udp_port_in = 6304
3251 cls.udp_port_out = 6304
3252 cls.icmp_id_in = 6305
3253 cls.icmp_id_out = 6305
3254 cls.nat_addr = '10.0.0.3'
3255 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3256 cls.ipfix_src_port = 4739
3257 cls.ipfix_domain_id = 1
3258 cls.tcp_external_port = 80
3260 cls.create_pg_interfaces(range(7))
3261 cls.interfaces = list(cls.pg_interfaces[0:3])
3263 for i in cls.interfaces:
3268 cls.pg0.generate_remote_hosts(3)
3269 cls.pg0.configure_ipv4_neighbors()
3273 cls.pg4.generate_remote_hosts(2)
3274 cls.pg4.config_ip4()
3275 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3276 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3280 cls.pg4.resolve_arp()
3281 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3282 cls.pg4.resolve_arp()
3284 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3285 cls.vapi.ip_table_add_del(1, is_add=1)
3287 cls.pg5._local_ip4 = "10.1.1.1"
3288 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3290 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3291 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3292 socket.AF_INET, cls.pg5.remote_ip4)
3293 cls.pg5.set_table_ip4(1)
3294 cls.pg5.config_ip4()
3296 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3297 dst_address_length=32,
3299 next_hop_sw_if_index=cls.pg5.sw_if_index,
3300 next_hop_address=zero_ip4n)
3302 cls.pg6._local_ip4 = "10.1.2.1"
3303 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3305 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3306 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3307 socket.AF_INET, cls.pg6.remote_ip4)
3308 cls.pg6.set_table_ip4(1)
3309 cls.pg6.config_ip4()
3311 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3312 dst_address_length=32,
3314 next_hop_sw_if_index=cls.pg6.sw_if_index,
3315 next_hop_address=zero_ip4n)
3317 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3318 dst_address_length=16,
3319 next_hop_address=zero_ip4n,
3321 next_hop_table_id=1)
3322 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3323 dst_address_length=0,
3324 next_hop_address=zero_ip4n,
3326 next_hop_table_id=0)
3327 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3328 dst_address_length=0,
3330 next_hop_sw_if_index=cls.pg1.sw_if_index,
3331 next_hop_address=cls.pg1.local_ip4n)
3333 cls.pg5.resolve_arp()
3334 cls.pg6.resolve_arp()
3337 super(TestNAT44EndpointDependent, cls).tearDownClass()
3340 def test_dynamic(self):
3341 """ NAT44 dynamic translation test """
3343 self.nat44_add_address(self.nat_addr)
3344 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3345 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3349 pkts = self.create_stream_in(self.pg0, self.pg1)
3350 self.pg0.add_stream(pkts)
3351 self.pg_enable_capture(self.pg_interfaces)
3353 capture = self.pg1.get_capture(len(pkts))
3354 self.verify_capture_out(capture)
3357 pkts = self.create_stream_out(self.pg1)
3358 self.pg1.add_stream(pkts)
3359 self.pg_enable_capture(self.pg_interfaces)
3361 capture = self.pg0.get_capture(len(pkts))
3362 self.verify_capture_in(capture, self.pg0)
3364 def test_forwarding(self):
3365 """ NAT44 forwarding test """
3367 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3368 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3370 self.vapi.nat44_forwarding_enable_disable(1)
3372 real_ip = self.pg0.remote_ip4n
3373 alias_ip = self.nat_addr_n
3374 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3375 external_ip=alias_ip)
3378 # in2out - static mapping match
3380 pkts = self.create_stream_out(self.pg1)
3381 self.pg1.add_stream(pkts)
3382 self.pg_enable_capture(self.pg_interfaces)
3384 capture = self.pg0.get_capture(len(pkts))
3385 self.verify_capture_in(capture, self.pg0)
3387 pkts = self.create_stream_in(self.pg0, self.pg1)
3388 self.pg0.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 capture = self.pg1.get_capture(len(pkts))
3392 self.verify_capture_out(capture, same_port=True)
3394 # in2out - no static mapping match
3396 host0 = self.pg0.remote_hosts[0]
3397 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3399 pkts = self.create_stream_out(self.pg1,
3400 dst_ip=self.pg0.remote_ip4,
3401 use_inside_ports=True)
3402 self.pg1.add_stream(pkts)
3403 self.pg_enable_capture(self.pg_interfaces)
3405 capture = self.pg0.get_capture(len(pkts))
3406 self.verify_capture_in(capture, self.pg0)
3408 pkts = self.create_stream_in(self.pg0, self.pg1)
3409 self.pg0.add_stream(pkts)
3410 self.pg_enable_capture(self.pg_interfaces)
3412 capture = self.pg1.get_capture(len(pkts))
3413 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3416 self.pg0.remote_hosts[0] = host0
3418 user = self.pg0.remote_hosts[1]
3419 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3420 self.assertEqual(len(sessions), 3)
3421 self.assertTrue(sessions[0].ext_host_valid)
3422 self.vapi.nat44_del_session(
3423 sessions[0].inside_ip_address,
3424 sessions[0].inside_port,
3425 sessions[0].protocol,
3426 ext_host_address=sessions[0].ext_host_address,
3427 ext_host_port=sessions[0].ext_host_port)
3428 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3429 self.assertEqual(len(sessions), 2)
3432 self.vapi.nat44_forwarding_enable_disable(0)
3433 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3434 external_ip=alias_ip,
3437 def test_static_lb(self):
3438 """ NAT44 local service load balancing """
3439 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3442 server1 = self.pg0.remote_hosts[0]
3443 server2 = self.pg0.remote_hosts[1]
3445 locals = [{'addr': server1.ip4n,
3449 {'addr': server2.ip4n,
3454 self.nat44_add_address(self.nat_addr)
3455 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3458 local_num=len(locals),
3460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3464 # from client to service
3465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3466 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3467 TCP(sport=12345, dport=external_port))
3468 self.pg1.add_stream(p)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(1)
3477 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3478 if ip.dst == server1.ip4:
3482 self.assertEqual(tcp.dport, local_port)
3483 self.assert_packet_checksums_valid(p)
3485 self.logger.error(ppp("Unexpected or invalid packet:", p))
3488 # from service back to client
3489 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3490 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3491 TCP(sport=local_port, dport=12345))
3492 self.pg0.add_stream(p)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 capture = self.pg1.get_capture(1)
3500 self.assertEqual(ip.src, self.nat_addr)
3501 self.assertEqual(tcp.sport, external_port)
3502 self.assert_packet_checksums_valid(p)
3504 self.logger.error(ppp("Unexpected or invalid packet:", p))
3507 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3508 self.assertEqual(len(sessions), 1)
3509 self.assertTrue(sessions[0].ext_host_valid)
3510 self.vapi.nat44_del_session(
3511 sessions[0].inside_ip_address,
3512 sessions[0].inside_port,
3513 sessions[0].protocol,
3514 ext_host_address=sessions[0].ext_host_address,
3515 ext_host_port=sessions[0].ext_host_port)
3516 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3517 self.assertEqual(len(sessions), 0)
3519 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3520 def test_static_lb_multi_clients(self):
3521 """ NAT44 local service load balancing - multiple clients"""
3523 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3526 server1 = self.pg0.remote_hosts[0]
3527 server2 = self.pg0.remote_hosts[1]
3529 locals = [{'addr': server1.ip4n,
3533 {'addr': server2.ip4n,
3538 self.nat44_add_address(self.nat_addr)
3539 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3542 local_num=len(locals),
3544 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3545 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3550 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3552 for client in clients:
3553 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3554 IP(src=client, dst=self.nat_addr) /
3555 TCP(sport=12345, dport=external_port))
3557 self.pg1.add_stream(pkts)
3558 self.pg_enable_capture(self.pg_interfaces)
3560 capture = self.pg0.get_capture(len(pkts))
3562 if p[IP].dst == server1.ip4:
3566 self.assertTrue(server1_n > server2_n)
3568 def test_static_lb_2(self):
3569 """ NAT44 local service load balancing (asymmetrical rule) """
3570 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3573 server1 = self.pg0.remote_hosts[0]
3574 server2 = self.pg0.remote_hosts[1]
3576 locals = [{'addr': server1.ip4n,
3580 {'addr': server2.ip4n,
3585 self.vapi.nat44_forwarding_enable_disable(1)
3586 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3590 local_num=len(locals),
3592 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3593 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3596 # from client to service
3597 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3598 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3599 TCP(sport=12345, dport=external_port))
3600 self.pg1.add_stream(p)
3601 self.pg_enable_capture(self.pg_interfaces)
3603 capture = self.pg0.get_capture(1)
3609 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3610 if ip.dst == server1.ip4:
3614 self.assertEqual(tcp.dport, local_port)
3615 self.assert_packet_checksums_valid(p)
3617 self.logger.error(ppp("Unexpected or invalid packet:", p))
3620 # from service back to client
3621 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3622 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3623 TCP(sport=local_port, dport=12345))
3624 self.pg0.add_stream(p)
3625 self.pg_enable_capture(self.pg_interfaces)
3627 capture = self.pg1.get_capture(1)
3632 self.assertEqual(ip.src, self.nat_addr)
3633 self.assertEqual(tcp.sport, external_port)
3634 self.assert_packet_checksums_valid(p)
3636 self.logger.error(ppp("Unexpected or invalid packet:", p))
3639 # from client to server (no translation)
3640 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3641 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3642 TCP(sport=12346, dport=local_port))
3643 self.pg1.add_stream(p)
3644 self.pg_enable_capture(self.pg_interfaces)
3646 capture = self.pg0.get_capture(1)
3652 self.assertEqual(ip.dst, server1.ip4)
3653 self.assertEqual(tcp.dport, local_port)
3654 self.assert_packet_checksums_valid(p)
3656 self.logger.error(ppp("Unexpected or invalid packet:", p))
3659 # from service back to client (no translation)
3660 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3661 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3662 TCP(sport=local_port, dport=12346))
3663 self.pg0.add_stream(p)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 capture = self.pg1.get_capture(1)
3671 self.assertEqual(ip.src, server1.ip4)
3672 self.assertEqual(tcp.sport, local_port)
3673 self.assert_packet_checksums_valid(p)
3675 self.logger.error(ppp("Unexpected or invalid packet:", p))
3678 def test_unknown_proto(self):
3679 """ NAT44 translate packet with unknown protocol """
3680 self.nat44_add_address(self.nat_addr)
3681 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3682 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3687 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3688 TCP(sport=self.tcp_port_in, dport=20))
3689 self.pg0.add_stream(p)
3690 self.pg_enable_capture(self.pg_interfaces)
3692 p = self.pg1.get_capture(1)
3694 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3695 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3697 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3698 TCP(sport=1234, dport=1234))
3699 self.pg0.add_stream(p)
3700 self.pg_enable_capture(self.pg_interfaces)
3702 p = self.pg1.get_capture(1)
3705 self.assertEqual(packet[IP].src, self.nat_addr)
3706 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3707 self.assertTrue(packet.haslayer(GRE))
3708 self.assert_packet_checksums_valid(packet)
3710 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3714 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3715 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3717 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3718 TCP(sport=1234, dport=1234))
3719 self.pg1.add_stream(p)
3720 self.pg_enable_capture(self.pg_interfaces)
3722 p = self.pg0.get_capture(1)
3725 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3726 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3727 self.assertTrue(packet.haslayer(GRE))
3728 self.assert_packet_checksums_valid(packet)
3730 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3733 def test_hairpinning_unknown_proto(self):
3734 """ NAT44 translate packet with unknown protocol - hairpinning """
3735 host = self.pg0.remote_hosts[0]
3736 server = self.pg0.remote_hosts[1]
3738 server_out_port = 8765
3739 server_nat_ip = "10.0.0.11"
3741 self.nat44_add_address(self.nat_addr)
3742 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3743 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3746 # add static mapping for server
3747 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3750 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3751 IP(src=host.ip4, dst=server_nat_ip) /
3752 TCP(sport=host_in_port, dport=server_out_port))
3753 self.pg0.add_stream(p)
3754 self.pg_enable_capture(self.pg_interfaces)
3756 self.pg0.get_capture(1)
3758 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3759 IP(src=host.ip4, dst=server_nat_ip) /
3761 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3762 TCP(sport=1234, dport=1234))
3763 self.pg0.add_stream(p)
3764 self.pg_enable_capture(self.pg_interfaces)
3766 p = self.pg0.get_capture(1)
3769 self.assertEqual(packet[IP].src, self.nat_addr)
3770 self.assertEqual(packet[IP].dst, server.ip4)
3771 self.assertTrue(packet.haslayer(GRE))
3772 self.assert_packet_checksums_valid(packet)
3774 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3778 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3779 IP(src=server.ip4, dst=self.nat_addr) /
3781 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3782 TCP(sport=1234, dport=1234))
3783 self.pg0.add_stream(p)
3784 self.pg_enable_capture(self.pg_interfaces)
3786 p = self.pg0.get_capture(1)
3789 self.assertEqual(packet[IP].src, server_nat_ip)
3790 self.assertEqual(packet[IP].dst, host.ip4)
3791 self.assertTrue(packet.haslayer(GRE))
3792 self.assert_packet_checksums_valid(packet)
3794 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3797 def test_output_feature_and_service(self):
3798 """ NAT44 interface output feature and services """
3799 external_addr = '1.2.3.4'
3803 self.vapi.nat44_forwarding_enable_disable(1)
3804 self.nat44_add_address(self.nat_addr)
3805 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3806 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3807 local_port, external_port,
3808 proto=IP_PROTOS.tcp, out2in_only=1)
3809 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3810 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3812 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3815 # from client to service
3816 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3817 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3818 TCP(sport=12345, dport=external_port))
3819 self.pg1.add_stream(p)
3820 self.pg_enable_capture(self.pg_interfaces)
3822 capture = self.pg0.get_capture(1)
3827 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3828 self.assertEqual(tcp.dport, local_port)
3829 self.assert_packet_checksums_valid(p)
3831 self.logger.error(ppp("Unexpected or invalid packet:", p))
3834 # from service back to client
3835 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3836 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3837 TCP(sport=local_port, dport=12345))
3838 self.pg0.add_stream(p)
3839 self.pg_enable_capture(self.pg_interfaces)
3841 capture = self.pg1.get_capture(1)
3846 self.assertEqual(ip.src, external_addr)
3847 self.assertEqual(tcp.sport, external_port)
3848 self.assert_packet_checksums_valid(p)
3850 self.logger.error(ppp("Unexpected or invalid packet:", p))
3853 # from local network host to external network
3854 pkts = self.create_stream_in(self.pg0, self.pg1)
3855 self.pg0.add_stream(pkts)
3856 self.pg_enable_capture(self.pg_interfaces)
3858 capture = self.pg1.get_capture(len(pkts))
3859 self.verify_capture_out(capture)
3860 pkts = self.create_stream_in(self.pg0, self.pg1)
3861 self.pg0.add_stream(pkts)
3862 self.pg_enable_capture(self.pg_interfaces)
3864 capture = self.pg1.get_capture(len(pkts))
3865 self.verify_capture_out(capture)
3867 # from external network back to local network host
3868 pkts = self.create_stream_out(self.pg1)
3869 self.pg1.add_stream(pkts)
3870 self.pg_enable_capture(self.pg_interfaces)
3872 capture = self.pg0.get_capture(len(pkts))
3873 self.verify_capture_in(capture, self.pg0)
3875 def test_output_feature_and_service2(self):
3876 """ NAT44 interface output feature and service host direct access """
3877 self.vapi.nat44_forwarding_enable_disable(1)
3878 self.nat44_add_address(self.nat_addr)
3879 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3882 # session initiaded from service host - translate
3883 pkts = self.create_stream_in(self.pg0, self.pg1)
3884 self.pg0.add_stream(pkts)
3885 self.pg_enable_capture(self.pg_interfaces)
3887 capture = self.pg1.get_capture(len(pkts))
3888 self.verify_capture_out(capture)
3890 pkts = self.create_stream_out(self.pg1)
3891 self.pg1.add_stream(pkts)
3892 self.pg_enable_capture(self.pg_interfaces)
3894 capture = self.pg0.get_capture(len(pkts))
3895 self.verify_capture_in(capture, self.pg0)
3897 # session initiaded from remote host - do not translate
3898 self.tcp_port_in = 60303
3899 self.udp_port_in = 60304
3900 self.icmp_id_in = 60305
3901 pkts = self.create_stream_out(self.pg1,
3902 self.pg0.remote_ip4,
3903 use_inside_ports=True)
3904 self.pg1.add_stream(pkts)
3905 self.pg_enable_capture(self.pg_interfaces)
3907 capture = self.pg0.get_capture(len(pkts))
3908 self.verify_capture_in(capture, self.pg0)
3910 pkts = self.create_stream_in(self.pg0, self.pg1)
3911 self.pg0.add_stream(pkts)
3912 self.pg_enable_capture(self.pg_interfaces)
3914 capture = self.pg1.get_capture(len(pkts))
3915 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3918 def test_output_feature_and_service3(self):
3919 """ NAT44 interface output feature and DST NAT """
3920 external_addr = '1.2.3.4'
3924 self.vapi.nat44_forwarding_enable_disable(1)
3925 self.nat44_add_address(self.nat_addr)
3926 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3927 local_port, external_port,
3928 proto=IP_PROTOS.tcp, out2in_only=1)
3929 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3930 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3932 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3935 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3936 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3937 TCP(sport=12345, dport=external_port))
3938 self.pg0.add_stream(p)
3939 self.pg_enable_capture(self.pg_interfaces)
3941 capture = self.pg1.get_capture(1)
3946 self.assertEqual(ip.src, self.pg0.remote_ip4)
3947 self.assertEqual(tcp.sport, 12345)
3948 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3949 self.assertEqual(tcp.dport, local_port)
3950 self.assert_packet_checksums_valid(p)
3952 self.logger.error(ppp("Unexpected or invalid packet:", p))
3955 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3956 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3957 TCP(sport=local_port, dport=12345))
3958 self.pg1.add_stream(p)
3959 self.pg_enable_capture(self.pg_interfaces)
3961 capture = self.pg0.get_capture(1)
3966 self.assertEqual(ip.src, external_addr)
3967 self.assertEqual(tcp.sport, external_port)
3968 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3969 self.assertEqual(tcp.dport, 12345)
3970 self.assert_packet_checksums_valid(p)
3972 self.logger.error(ppp("Unexpected or invalid packet:", p))
3975 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3977 twice_nat_addr = '10.0.1.3'
3985 port_in1 = port_in+1
3986 port_in2 = port_in+2
3991 server1 = self.pg0.remote_hosts[0]
3992 server2 = self.pg0.remote_hosts[1]
4004 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4007 self.nat44_add_address(self.nat_addr)
4008 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4010 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4012 proto=IP_PROTOS.tcp,
4013 twice_nat=int(not self_twice_nat),
4014 self_twice_nat=int(self_twice_nat))
4016 locals = [{'addr': server1.ip4n,
4020 {'addr': server2.ip4n,
4024 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4025 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4029 not self_twice_nat),
4032 local_num=len(locals),
4034 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4035 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4042 assert client_id is not None
4044 client = self.pg0.remote_hosts[0]
4045 elif client_id == 2:
4046 client = self.pg0.remote_hosts[1]
4048 client = pg1.remote_hosts[0]
4049 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4050 IP(src=client.ip4, dst=self.nat_addr) /
4051 TCP(sport=eh_port_out, dport=port_out))
4053 self.pg_enable_capture(self.pg_interfaces)
4055 capture = pg0.get_capture(1)
4061 if ip.dst == server1.ip4:
4067 self.assertEqual(ip.dst, server.ip4)
4069 self.assertIn(tcp.dport, [port_in1, port_in2])
4071 self.assertEqual(tcp.dport, port_in)
4073 self.assertEqual(ip.src, twice_nat_addr)
4074 self.assertNotEqual(tcp.sport, eh_port_out)
4076 self.assertEqual(ip.src, client.ip4)
4077 self.assertEqual(tcp.sport, eh_port_out)
4079 eh_port_in = tcp.sport
4080 saved_port_in = tcp.dport
4081 self.assert_packet_checksums_valid(p)
4083 self.logger.error(ppp("Unexpected or invalid packet:", p))
4086 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4087 IP(src=server.ip4, dst=eh_addr_in) /
4088 TCP(sport=saved_port_in, dport=eh_port_in))
4090 self.pg_enable_capture(self.pg_interfaces)
4092 capture = pg1.get_capture(1)
4097 self.assertEqual(ip.dst, client.ip4)
4098 self.assertEqual(ip.src, self.nat_addr)
4099 self.assertEqual(tcp.dport, eh_port_out)
4100 self.assertEqual(tcp.sport, port_out)
4101 self.assert_packet_checksums_valid(p)
4103 self.logger.error(ppp("Unexpected or invalid packet:", p))
4107 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4108 self.assertEqual(len(sessions), 1)
4109 self.assertTrue(sessions[0].ext_host_valid)
4110 self.assertTrue(sessions[0].is_twicenat)
4111 self.vapi.nat44_del_session(
4112 sessions[0].inside_ip_address,
4113 sessions[0].inside_port,
4114 sessions[0].protocol,
4115 ext_host_address=sessions[0].ext_host_nat_address,
4116 ext_host_port=sessions[0].ext_host_nat_port)
4117 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4118 self.assertEqual(len(sessions), 0)
4120 def test_twice_nat(self):
4122 self.twice_nat_common()
4124 def test_self_twice_nat_positive(self):
4125 """ Self Twice NAT44 (positive test) """
4126 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4128 def test_self_twice_nat_negative(self):
4129 """ Self Twice NAT44 (negative test) """
4130 self.twice_nat_common(self_twice_nat=True)
4132 def test_twice_nat_lb(self):
4133 """ Twice NAT44 local service load balancing """
4134 self.twice_nat_common(lb=True)
4136 def test_self_twice_nat_lb_positive(self):
4137 """ Self Twice NAT44 local service load balancing (positive test) """
4138 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4141 def test_self_twice_nat_lb_negative(self):
4142 """ Self Twice NAT44 local service load balancing (negative test) """
4143 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4146 def test_twice_nat_interface_addr(self):
4147 """ Acquire twice NAT44 addresses from interface """
4148 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4150 # no address in NAT pool
4151 adresses = self.vapi.nat44_address_dump()
4152 self.assertEqual(0, len(adresses))
4154 # configure interface address and check NAT address pool
4155 self.pg3.config_ip4()
4156 adresses = self.vapi.nat44_address_dump()
4157 self.assertEqual(1, len(adresses))
4158 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4159 self.assertEqual(adresses[0].twice_nat, 1)
4161 # remove interface address and check NAT address pool
4162 self.pg3.unconfig_ip4()
4163 adresses = self.vapi.nat44_address_dump()
4164 self.assertEqual(0, len(adresses))
4166 def test_tcp_session_close_in(self):
4167 """ Close TCP session from inside network """
4168 self.tcp_port_out = 10505
4169 self.nat44_add_address(self.nat_addr)
4170 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4174 proto=IP_PROTOS.tcp,
4176 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4177 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4180 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4181 start_sessnum = len(sessions)
4183 self.initiate_tcp_session(self.pg0, self.pg1)
4185 # FIN packet in -> out
4186 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4187 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4188 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4189 flags="FA", seq=100, ack=300))
4190 self.pg0.add_stream(p)
4191 self.pg_enable_capture(self.pg_interfaces)
4193 self.pg1.get_capture(1)
4197 # ACK packet out -> in
4198 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4199 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4200 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4201 flags="A", seq=300, ack=101))
4204 # FIN packet out -> in
4205 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4206 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4207 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4208 flags="FA", seq=300, ack=101))
4211 self.pg1.add_stream(pkts)
4212 self.pg_enable_capture(self.pg_interfaces)
4214 self.pg0.get_capture(2)
4216 # ACK packet in -> out
4217 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4218 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4219 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4220 flags="A", seq=101, ack=301))
4221 self.pg0.add_stream(p)
4222 self.pg_enable_capture(self.pg_interfaces)
4224 self.pg1.get_capture(1)
4226 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4228 self.assertEqual(len(sessions) - start_sessnum, 0)
4230 def test_tcp_session_close_out(self):
4231 """ Close TCP session from outside network """
4232 self.tcp_port_out = 10505
4233 self.nat44_add_address(self.nat_addr)
4234 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4238 proto=IP_PROTOS.tcp,
4240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4244 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4245 start_sessnum = len(sessions)
4247 self.initiate_tcp_session(self.pg0, self.pg1)
4249 # FIN packet out -> in
4250 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4251 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4252 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4253 flags="FA", seq=100, ack=300))
4254 self.pg1.add_stream(p)
4255 self.pg_enable_capture(self.pg_interfaces)
4257 self.pg0.get_capture(1)
4259 # FIN+ACK packet in -> out
4260 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4261 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4262 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4263 flags="FA", seq=300, ack=101))
4265 self.pg0.add_stream(p)
4266 self.pg_enable_capture(self.pg_interfaces)
4268 self.pg1.get_capture(1)
4270 # ACK packet out -> in
4271 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4272 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4273 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4274 flags="A", seq=101, ack=301))
4275 self.pg1.add_stream(p)
4276 self.pg_enable_capture(self.pg_interfaces)
4278 self.pg0.get_capture(1)
4280 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4282 self.assertEqual(len(sessions) - start_sessnum, 0)
4284 def test_tcp_session_close_simultaneous(self):
4285 """ Close TCP session from inside network """
4286 self.tcp_port_out = 10505
4287 self.nat44_add_address(self.nat_addr)
4288 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4292 proto=IP_PROTOS.tcp,
4294 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4295 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4298 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4299 start_sessnum = len(sessions)
4301 self.initiate_tcp_session(self.pg0, self.pg1)
4303 # FIN packet in -> out
4304 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4305 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4306 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4307 flags="FA", seq=100, ack=300))
4308 self.pg0.add_stream(p)
4309 self.pg_enable_capture(self.pg_interfaces)
4311 self.pg1.get_capture(1)
4313 # FIN packet out -> in
4314 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4315 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4316 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4317 flags="FA", seq=300, ack=100))
4318 self.pg1.add_stream(p)
4319 self.pg_enable_capture(self.pg_interfaces)
4321 self.pg0.get_capture(1)
4323 # ACK packet in -> out
4324 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4325 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4326 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4327 flags="A", seq=101, ack=301))
4328 self.pg0.add_stream(p)
4329 self.pg_enable_capture(self.pg_interfaces)
4331 self.pg1.get_capture(1)
4333 # ACK packet out -> in
4334 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4335 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4336 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4337 flags="A", seq=301, ack=101))
4338 self.pg1.add_stream(p)
4339 self.pg_enable_capture(self.pg_interfaces)
4341 self.pg0.get_capture(1)
4343 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4345 self.assertEqual(len(sessions) - start_sessnum, 0)
4347 def test_one_armed_nat44_static(self):
4348 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4349 remote_host = self.pg4.remote_hosts[0]
4350 local_host = self.pg4.remote_hosts[1]
4355 self.vapi.nat44_forwarding_enable_disable(1)
4356 self.nat44_add_address(self.nat_addr, twice_nat=1)
4357 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4358 local_port, external_port,
4359 proto=IP_PROTOS.tcp, out2in_only=1,
4361 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4362 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4365 # from client to service
4366 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4367 IP(src=remote_host.ip4, dst=self.nat_addr) /
4368 TCP(sport=12345, dport=external_port))
4369 self.pg4.add_stream(p)
4370 self.pg_enable_capture(self.pg_interfaces)
4372 capture = self.pg4.get_capture(1)
4377 self.assertEqual(ip.dst, local_host.ip4)
4378 self.assertEqual(ip.src, self.nat_addr)
4379 self.assertEqual(tcp.dport, local_port)
4380 self.assertNotEqual(tcp.sport, 12345)
4381 eh_port_in = tcp.sport
4382 self.assert_packet_checksums_valid(p)
4384 self.logger.error(ppp("Unexpected or invalid packet:", p))
4387 # from service back to client
4388 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4389 IP(src=local_host.ip4, dst=self.nat_addr) /
4390 TCP(sport=local_port, dport=eh_port_in))
4391 self.pg4.add_stream(p)
4392 self.pg_enable_capture(self.pg_interfaces)
4394 capture = self.pg4.get_capture(1)
4399 self.assertEqual(ip.src, self.nat_addr)
4400 self.assertEqual(ip.dst, remote_host.ip4)
4401 self.assertEqual(tcp.sport, external_port)
4402 self.assertEqual(tcp.dport, 12345)
4403 self.assert_packet_checksums_valid(p)
4405 self.logger.error(ppp("Unexpected or invalid packet:", p))
4408 def test_static_with_port_out2(self):
4409 """ 1:1 NAPT asymmetrical rule """
4414 self.vapi.nat44_forwarding_enable_disable(1)
4415 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4416 local_port, external_port,
4417 proto=IP_PROTOS.tcp, out2in_only=1)
4418 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4419 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4422 # from client to service
4423 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4424 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4425 TCP(sport=12345, dport=external_port))
4426 self.pg1.add_stream(p)
4427 self.pg_enable_capture(self.pg_interfaces)
4429 capture = self.pg0.get_capture(1)
4434 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4435 self.assertEqual(tcp.dport, local_port)
4436 self.assert_packet_checksums_valid(p)
4438 self.logger.error(ppp("Unexpected or invalid packet:", p))
4442 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4443 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4444 ICMP(type=11) / capture[0][IP])
4445 self.pg0.add_stream(p)
4446 self.pg_enable_capture(self.pg_interfaces)
4448 capture = self.pg1.get_capture(1)
4451 self.assertEqual(p[IP].src, self.nat_addr)
4453 self.assertEqual(inner.dst, self.nat_addr)
4454 self.assertEqual(inner[TCPerror].dport, external_port)
4456 self.logger.error(ppp("Unexpected or invalid packet:", p))
4459 # from service back to client
4460 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4461 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4462 TCP(sport=local_port, dport=12345))
4463 self.pg0.add_stream(p)
4464 self.pg_enable_capture(self.pg_interfaces)
4466 capture = self.pg1.get_capture(1)
4471 self.assertEqual(ip.src, self.nat_addr)
4472 self.assertEqual(tcp.sport, external_port)
4473 self.assert_packet_checksums_valid(p)
4475 self.logger.error(ppp("Unexpected or invalid packet:", p))
4479 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4480 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4481 ICMP(type=11) / capture[0][IP])
4482 self.pg1.add_stream(p)
4483 self.pg_enable_capture(self.pg_interfaces)
4485 capture = self.pg0.get_capture(1)
4488 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4490 self.assertEqual(inner.src, self.pg0.remote_ip4)
4491 self.assertEqual(inner[TCPerror].sport, local_port)
4493 self.logger.error(ppp("Unexpected or invalid packet:", p))
4496 # from client to server (no translation)
4497 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4498 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4499 TCP(sport=12346, dport=local_port))
4500 self.pg1.add_stream(p)
4501 self.pg_enable_capture(self.pg_interfaces)
4503 capture = self.pg0.get_capture(1)
4508 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4509 self.assertEqual(tcp.dport, local_port)
4510 self.assert_packet_checksums_valid(p)
4512 self.logger.error(ppp("Unexpected or invalid packet:", p))
4515 # from service back to client (no translation)
4516 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4517 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4518 TCP(sport=local_port, dport=12346))
4519 self.pg0.add_stream(p)
4520 self.pg_enable_capture(self.pg_interfaces)
4522 capture = self.pg1.get_capture(1)
4527 self.assertEqual(ip.src, self.pg0.remote_ip4)
4528 self.assertEqual(tcp.sport, local_port)
4529 self.assert_packet_checksums_valid(p)
4531 self.logger.error(ppp("Unexpected or invalid packet:", p))
4534 def test_output_feature(self):
4535 """ NAT44 interface output feature (in2out postrouting) """
4536 self.vapi.nat44_forwarding_enable_disable(1)
4537 self.nat44_add_address(self.nat_addr)
4538 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4540 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4544 pkts = self.create_stream_in(self.pg0, self.pg1)
4545 self.pg0.add_stream(pkts)
4546 self.pg_enable_capture(self.pg_interfaces)
4548 capture = self.pg1.get_capture(len(pkts))
4549 self.verify_capture_out(capture)
4552 pkts = self.create_stream_out(self.pg1)
4553 self.pg1.add_stream(pkts)
4554 self.pg_enable_capture(self.pg_interfaces)
4556 capture = self.pg0.get_capture(len(pkts))
4557 self.verify_capture_in(capture, self.pg0)
4559 def test_multiple_vrf(self):
4560 """ Multiple VRF setup """
4561 external_addr = '1.2.3.4'
4566 self.vapi.nat44_forwarding_enable_disable(1)
4567 self.nat44_add_address(self.nat_addr)
4568 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4569 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4571 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4573 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4574 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4576 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4578 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4579 local_port, external_port, vrf_id=1,
4580 proto=IP_PROTOS.tcp, out2in_only=1)
4581 self.nat44_add_static_mapping(
4582 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4583 local_port=local_port, vrf_id=0, external_port=external_port,
4584 proto=IP_PROTOS.tcp, out2in_only=1)
4586 # from client to service (both VRF1)
4587 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4588 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4589 TCP(sport=12345, dport=external_port))
4590 self.pg6.add_stream(p)
4591 self.pg_enable_capture(self.pg_interfaces)
4593 capture = self.pg5.get_capture(1)
4598 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4599 self.assertEqual(tcp.dport, local_port)
4600 self.assert_packet_checksums_valid(p)
4602 self.logger.error(ppp("Unexpected or invalid packet:", p))
4605 # from service back to client (both VRF1)
4606 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4607 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4608 TCP(sport=local_port, dport=12345))
4609 self.pg5.add_stream(p)
4610 self.pg_enable_capture(self.pg_interfaces)
4612 capture = self.pg6.get_capture(1)
4617 self.assertEqual(ip.src, external_addr)
4618 self.assertEqual(tcp.sport, external_port)
4619 self.assert_packet_checksums_valid(p)
4621 self.logger.error(ppp("Unexpected or invalid packet:", p))
4624 # dynamic NAT from VRF1 to VRF0 (output-feature)
4625 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4626 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4627 TCP(sport=2345, dport=22))
4628 self.pg5.add_stream(p)
4629 self.pg_enable_capture(self.pg_interfaces)
4631 capture = self.pg1.get_capture(1)
4636 self.assertEqual(ip.src, self.nat_addr)
4637 self.assertNotEqual(tcp.sport, 2345)
4638 self.assert_packet_checksums_valid(p)
4641 self.logger.error(ppp("Unexpected or invalid packet:", p))
4644 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4645 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4646 TCP(sport=22, dport=port))
4647 self.pg1.add_stream(p)
4648 self.pg_enable_capture(self.pg_interfaces)
4650 capture = self.pg5.get_capture(1)
4655 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4656 self.assertEqual(tcp.dport, 2345)
4657 self.assert_packet_checksums_valid(p)
4659 self.logger.error(ppp("Unexpected or invalid packet:", p))
4662 # from client VRF1 to service VRF0
4663 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4664 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4665 TCP(sport=12346, dport=external_port))
4666 self.pg6.add_stream(p)
4667 self.pg_enable_capture(self.pg_interfaces)
4669 capture = self.pg0.get_capture(1)
4674 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4675 self.assertEqual(tcp.dport, local_port)
4676 self.assert_packet_checksums_valid(p)
4678 self.logger.error(ppp("Unexpected or invalid packet:", p))
4681 # from service VRF0 back to client VRF1
4682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4683 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4684 TCP(sport=local_port, dport=12346))
4685 self.pg0.add_stream(p)
4686 self.pg_enable_capture(self.pg_interfaces)
4688 capture = self.pg6.get_capture(1)
4693 self.assertEqual(ip.src, self.pg0.local_ip4)
4694 self.assertEqual(tcp.sport, external_port)
4695 self.assert_packet_checksums_valid(p)
4697 self.logger.error(ppp("Unexpected or invalid packet:", p))
4700 # from client VRF0 to service VRF1
4701 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4702 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4703 TCP(sport=12347, dport=external_port))
4704 self.pg0.add_stream(p)
4705 self.pg_enable_capture(self.pg_interfaces)
4707 capture = self.pg5.get_capture(1)
4712 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4713 self.assertEqual(tcp.dport, local_port)
4714 self.assert_packet_checksums_valid(p)
4716 self.logger.error(ppp("Unexpected or invalid packet:", p))
4719 # from service VRF1 back to client VRF0
4720 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4721 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4722 TCP(sport=local_port, dport=12347))
4723 self.pg5.add_stream(p)
4724 self.pg_enable_capture(self.pg_interfaces)
4726 capture = self.pg0.get_capture(1)
4731 self.assertEqual(ip.src, external_addr)
4732 self.assertEqual(tcp.sport, external_port)
4733 self.assert_packet_checksums_valid(p)
4735 self.logger.error(ppp("Unexpected or invalid packet:", p))
4738 # from client to server (both VRF1, no translation)
4739 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4740 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4741 TCP(sport=12348, dport=local_port))
4742 self.pg6.add_stream(p)
4743 self.pg_enable_capture(self.pg_interfaces)
4745 capture = self.pg5.get_capture(1)
4750 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4751 self.assertEqual(tcp.dport, local_port)
4752 self.assert_packet_checksums_valid(p)
4754 self.logger.error(ppp("Unexpected or invalid packet:", p))
4757 # from server back to client (both VRF1, no translation)
4758 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4759 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4760 TCP(sport=local_port, dport=12348))
4761 self.pg5.add_stream(p)
4762 self.pg_enable_capture(self.pg_interfaces)
4764 capture = self.pg6.get_capture(1)
4769 self.assertEqual(ip.src, self.pg5.remote_ip4)
4770 self.assertEqual(tcp.sport, local_port)
4771 self.assert_packet_checksums_valid(p)
4773 self.logger.error(ppp("Unexpected or invalid packet:", p))
4776 # from client VRF1 to server VRF0 (no translation)
4777 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4778 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4779 TCP(sport=local_port, dport=12349))
4780 self.pg0.add_stream(p)
4781 self.pg_enable_capture(self.pg_interfaces)
4783 capture = self.pg6.get_capture(1)
4788 self.assertEqual(ip.src, self.pg0.remote_ip4)
4789 self.assertEqual(tcp.sport, local_port)
4790 self.assert_packet_checksums_valid(p)
4792 self.logger.error(ppp("Unexpected or invalid packet:", p))
4795 # from server VRF0 back to client VRF1 (no translation)
4796 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4797 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4798 TCP(sport=local_port, dport=12349))
4799 self.pg0.add_stream(p)
4800 self.pg_enable_capture(self.pg_interfaces)
4802 capture = self.pg6.get_capture(1)
4807 self.assertEqual(ip.src, self.pg0.remote_ip4)
4808 self.assertEqual(tcp.sport, local_port)
4809 self.assert_packet_checksums_valid(p)
4811 self.logger.error(ppp("Unexpected or invalid packet:", p))
4814 # from client VRF0 to server VRF1 (no translation)
4815 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4816 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4817 TCP(sport=12344, dport=local_port))
4818 self.pg0.add_stream(p)
4819 self.pg_enable_capture(self.pg_interfaces)
4821 capture = self.pg5.get_capture(1)
4826 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4827 self.assertEqual(tcp.dport, local_port)
4828 self.assert_packet_checksums_valid(p)
4830 self.logger.error(ppp("Unexpected or invalid packet:", p))
4833 # from server VRF1 back to client VRF0 (no translation)
4834 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4835 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4836 TCP(sport=local_port, dport=12344))
4837 self.pg5.add_stream(p)
4838 self.pg_enable_capture(self.pg_interfaces)
4840 capture = self.pg0.get_capture(1)
4845 self.assertEqual(ip.src, self.pg5.remote_ip4)
4846 self.assertEqual(tcp.sport, local_port)
4847 self.assert_packet_checksums_valid(p)
4849 self.logger.error(ppp("Unexpected or invalid packet:", p))
4853 super(TestNAT44EndpointDependent, self).tearDown()
4854 if not self.vpp_dead:
4855 self.logger.info(self.vapi.cli("show nat44 addresses"))
4856 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4857 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4858 self.logger.info(self.vapi.cli("show nat44 interface address"))
4859 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4860 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4862 self.vapi.cli("clear logging")
4865 class TestNAT44Out2InDPO(MethodHolder):
4866 """ NAT44 Test Cases using out2in DPO """
4869 def setUpConstants(cls):
4870 super(TestNAT44Out2InDPO, cls).setUpConstants()
4871 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4874 def setUpClass(cls):
4875 super(TestNAT44Out2InDPO, cls).setUpClass()
4876 cls.vapi.cli("set log class nat level debug")
4879 cls.tcp_port_in = 6303
4880 cls.tcp_port_out = 6303
4881 cls.udp_port_in = 6304
4882 cls.udp_port_out = 6304
4883 cls.icmp_id_in = 6305
4884 cls.icmp_id_out = 6305
4885 cls.nat_addr = '10.0.0.3'
4886 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4887 cls.dst_ip4 = '192.168.70.1'
4889 cls.create_pg_interfaces(range(2))
4892 cls.pg0.config_ip4()
4893 cls.pg0.resolve_arp()
4896 cls.pg1.config_ip6()
4897 cls.pg1.resolve_ndp()
4899 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4900 dst_address_length=0,
4901 next_hop_address=cls.pg1.remote_ip6n,
4902 next_hop_sw_if_index=cls.pg1.sw_if_index)
4905 super(TestNAT44Out2InDPO, cls).tearDownClass()
4908 def configure_xlat(self):
4909 self.dst_ip6_pfx = '1:2:3::'
4910 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4912 self.dst_ip6_pfx_len = 96
4913 self.src_ip6_pfx = '4:5:6::'
4914 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4916 self.src_ip6_pfx_len = 96
4917 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4918 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4919 '\x00\x00\x00\x00', 0, is_translation=1,
4922 def test_464xlat_ce(self):
4923 """ Test 464XLAT CE with NAT44 """
4925 self.configure_xlat()
4927 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4928 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4930 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4931 self.dst_ip6_pfx_len)
4932 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4933 self.src_ip6_pfx_len)
4936 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4937 self.pg0.add_stream(pkts)
4938 self.pg_enable_capture(self.pg_interfaces)
4940 capture = self.pg1.get_capture(len(pkts))
4941 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4944 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4946 self.pg1.add_stream(pkts)
4947 self.pg_enable_capture(self.pg_interfaces)
4949 capture = self.pg0.get_capture(len(pkts))
4950 self.verify_capture_in(capture, self.pg0)
4952 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4954 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4955 self.nat_addr_n, is_add=0)
4957 def test_464xlat_ce_no_nat(self):
4958 """ Test 464XLAT CE without NAT44 """
4960 self.configure_xlat()
4962 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4963 self.dst_ip6_pfx_len)
4964 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4965 self.src_ip6_pfx_len)
4967 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4968 self.pg0.add_stream(pkts)
4969 self.pg_enable_capture(self.pg_interfaces)
4971 capture = self.pg1.get_capture(len(pkts))
4972 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4973 nat_ip=out_dst_ip6, same_port=True)
4975 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4976 self.pg1.add_stream(pkts)
4977 self.pg_enable_capture(self.pg_interfaces)
4979 capture = self.pg0.get_capture(len(pkts))
4980 self.verify_capture_in(capture, self.pg0)
4983 class TestDeterministicNAT(MethodHolder):
4984 """ Deterministic NAT Test Cases """
4987 def setUpConstants(cls):
4988 super(TestDeterministicNAT, cls).setUpConstants()
4989 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4992 def setUpClass(cls):
4993 super(TestDeterministicNAT, cls).setUpClass()
4994 cls.vapi.cli("set log class nat level debug")
4997 cls.tcp_port_in = 6303
4998 cls.tcp_external_port = 6303
4999 cls.udp_port_in = 6304
5000 cls.udp_external_port = 6304
5001 cls.icmp_id_in = 6305
5002 cls.nat_addr = '10.0.0.3'
5004 cls.create_pg_interfaces(range(3))
5005 cls.interfaces = list(cls.pg_interfaces)
5007 for i in cls.interfaces:
5012 cls.pg0.generate_remote_hosts(2)
5013 cls.pg0.configure_ipv4_neighbors()
5016 super(TestDeterministicNAT, cls).tearDownClass()
5019 def create_stream_in(self, in_if, out_if, ttl=64):
5021 Create packet stream for inside network
5023 :param in_if: Inside interface
5024 :param out_if: Outside interface
5025 :param ttl: TTL of generated packets
5029 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5030 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5031 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5035 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5036 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5037 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5041 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5042 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5043 ICMP(id=self.icmp_id_in, type='echo-request'))
5048 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5050 Create packet stream for outside network
5052 :param out_if: Outside interface
5053 :param dst_ip: Destination IP address (Default use global NAT address)
5054 :param ttl: TTL of generated packets
5057 dst_ip = self.nat_addr
5060 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5061 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5062 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5066 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5067 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5068 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5072 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5073 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5074 ICMP(id=self.icmp_external_id, type='echo-reply'))
5079 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5081 Verify captured packets on outside network
5083 :param capture: Captured packets
5084 :param nat_ip: Translated IP address (Default use global NAT address)
5085 :param same_port: Sorce port number is not translated (Default False)
5086 :param packet_num: Expected number of packets (Default 3)
5089 nat_ip = self.nat_addr
5090 self.assertEqual(packet_num, len(capture))
5091 for packet in capture:
5093 self.assertEqual(packet[IP].src, nat_ip)
5094 if packet.haslayer(TCP):
5095 self.tcp_port_out = packet[TCP].sport
5096 elif packet.haslayer(UDP):
5097 self.udp_port_out = packet[UDP].sport
5099 self.icmp_external_id = packet[ICMP].id
5101 self.logger.error(ppp("Unexpected or invalid packet "
5102 "(outside network):", packet))
5105 def verify_ipfix_max_entries_per_user(self, data):
5107 Verify IPFIX maximum entries per user exceeded event
5109 :param data: Decoded IPFIX data records
5111 self.assertEqual(1, len(data))
5114 self.assertEqual(ord(record[230]), 13)
5115 # natQuotaExceededEvent
5116 self.assertEqual('\x03\x00\x00\x00', record[466])
5118 self.assertEqual('\xe8\x03\x00\x00', record[473])
5120 self.assertEqual(self.pg0.remote_ip4n, record[8])
5122 def test_deterministic_mode(self):
5123 """ NAT plugin run deterministic mode """
5124 in_addr = '172.16.255.0'
5125 out_addr = '172.17.255.50'
5126 in_addr_t = '172.16.255.20'
5127 in_addr_n = socket.inet_aton(in_addr)
5128 out_addr_n = socket.inet_aton(out_addr)
5129 in_addr_t_n = socket.inet_aton(in_addr_t)
5133 nat_config = self.vapi.nat_show_config()
5134 self.assertEqual(1, nat_config.deterministic)
5136 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5138 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5139 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5140 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5141 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5143 deterministic_mappings = self.vapi.nat_det_map_dump()
5144 self.assertEqual(len(deterministic_mappings), 1)
5145 dsm = deterministic_mappings[0]
5146 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5147 self.assertEqual(in_plen, dsm.in_plen)
5148 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5149 self.assertEqual(out_plen, dsm.out_plen)
5151 self.clear_nat_det()
5152 deterministic_mappings = self.vapi.nat_det_map_dump()
5153 self.assertEqual(len(deterministic_mappings), 0)
5155 def test_set_timeouts(self):
5156 """ Set deterministic NAT timeouts """
5157 timeouts_before = self.vapi.nat_det_get_timeouts()
5159 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
5160 timeouts_before.tcp_established + 10,
5161 timeouts_before.tcp_transitory + 10,
5162 timeouts_before.icmp + 10)
5164 timeouts_after = self.vapi.nat_det_get_timeouts()
5166 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5167 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5168 self.assertNotEqual(timeouts_before.tcp_established,
5169 timeouts_after.tcp_established)
5170 self.assertNotEqual(timeouts_before.tcp_transitory,
5171 timeouts_after.tcp_transitory)
5173 def test_det_in(self):
5174 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5176 nat_ip = "10.0.0.10"
5178 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5180 socket.inet_aton(nat_ip),
5182 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5183 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5187 pkts = self.create_stream_in(self.pg0, self.pg1)
5188 self.pg0.add_stream(pkts)
5189 self.pg_enable_capture(self.pg_interfaces)
5191 capture = self.pg1.get_capture(len(pkts))
5192 self.verify_capture_out(capture, nat_ip)
5195 pkts = self.create_stream_out(self.pg1, nat_ip)
5196 self.pg1.add_stream(pkts)
5197 self.pg_enable_capture(self.pg_interfaces)
5199 capture = self.pg0.get_capture(len(pkts))
5200 self.verify_capture_in(capture, self.pg0)
5203 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5204 self.assertEqual(len(sessions), 3)
5208 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5209 self.assertEqual(s.in_port, self.tcp_port_in)
5210 self.assertEqual(s.out_port, self.tcp_port_out)
5211 self.assertEqual(s.ext_port, self.tcp_external_port)
5215 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5216 self.assertEqual(s.in_port, self.udp_port_in)
5217 self.assertEqual(s.out_port, self.udp_port_out)
5218 self.assertEqual(s.ext_port, self.udp_external_port)
5222 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5223 self.assertEqual(s.in_port, self.icmp_id_in)
5224 self.assertEqual(s.out_port, self.icmp_external_id)
5226 def test_multiple_users(self):
5227 """ Deterministic NAT multiple users """
5229 nat_ip = "10.0.0.10"
5231 external_port = 6303
5233 host0 = self.pg0.remote_hosts[0]
5234 host1 = self.pg0.remote_hosts[1]
5236 self.vapi.nat_det_add_del_map(host0.ip4n,
5238 socket.inet_aton(nat_ip),
5240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5245 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5246 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5247 TCP(sport=port_in, dport=external_port))
5248 self.pg0.add_stream(p)
5249 self.pg_enable_capture(self.pg_interfaces)
5251 capture = self.pg1.get_capture(1)
5256 self.assertEqual(ip.src, nat_ip)
5257 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5258 self.assertEqual(tcp.dport, external_port)
5259 port_out0 = tcp.sport
5261 self.logger.error(ppp("Unexpected or invalid packet:", p))
5265 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5266 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5267 TCP(sport=port_in, dport=external_port))
5268 self.pg0.add_stream(p)
5269 self.pg_enable_capture(self.pg_interfaces)
5271 capture = self.pg1.get_capture(1)
5276 self.assertEqual(ip.src, nat_ip)
5277 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5278 self.assertEqual(tcp.dport, external_port)
5279 port_out1 = tcp.sport
5281 self.logger.error(ppp("Unexpected or invalid packet:", p))
5284 dms = self.vapi.nat_det_map_dump()
5285 self.assertEqual(1, len(dms))
5286 self.assertEqual(2, dms[0].ses_num)
5289 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5290 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5291 TCP(sport=external_port, dport=port_out0))
5292 self.pg1.add_stream(p)
5293 self.pg_enable_capture(self.pg_interfaces)
5295 capture = self.pg0.get_capture(1)
5300 self.assertEqual(ip.src, self.pg1.remote_ip4)
5301 self.assertEqual(ip.dst, host0.ip4)
5302 self.assertEqual(tcp.dport, port_in)
5303 self.assertEqual(tcp.sport, external_port)
5305 self.logger.error(ppp("Unexpected or invalid packet:", p))
5309 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5310 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5311 TCP(sport=external_port, dport=port_out1))
5312 self.pg1.add_stream(p)
5313 self.pg_enable_capture(self.pg_interfaces)
5315 capture = self.pg0.get_capture(1)
5320 self.assertEqual(ip.src, self.pg1.remote_ip4)
5321 self.assertEqual(ip.dst, host1.ip4)
5322 self.assertEqual(tcp.dport, port_in)
5323 self.assertEqual(tcp.sport, external_port)
5325 self.logger.error(ppp("Unexpected or invalid packet", p))
5328 # session close api test
5329 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5331 self.pg1.remote_ip4n,
5333 dms = self.vapi.nat_det_map_dump()
5334 self.assertEqual(dms[0].ses_num, 1)
5336 self.vapi.nat_det_close_session_in(host0.ip4n,
5338 self.pg1.remote_ip4n,
5340 dms = self.vapi.nat_det_map_dump()
5341 self.assertEqual(dms[0].ses_num, 0)
5343 def test_tcp_session_close_detection_in(self):
5344 """ Deterministic NAT TCP session close from inside network """
5345 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5347 socket.inet_aton(self.nat_addr),
5349 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5350 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5353 self.initiate_tcp_session(self.pg0, self.pg1)
5355 # close the session from inside
5357 # FIN packet in -> out
5358 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5359 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5360 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5362 self.pg0.add_stream(p)
5363 self.pg_enable_capture(self.pg_interfaces)
5365 self.pg1.get_capture(1)
5369 # ACK packet out -> in
5370 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5371 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5372 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5376 # FIN packet out -> in
5377 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5378 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5379 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5383 self.pg1.add_stream(pkts)
5384 self.pg_enable_capture(self.pg_interfaces)
5386 self.pg0.get_capture(2)
5388 # ACK packet in -> out
5389 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5390 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5391 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5393 self.pg0.add_stream(p)
5394 self.pg_enable_capture(self.pg_interfaces)
5396 self.pg1.get_capture(1)
5398 # Check if deterministic NAT44 closed the session
5399 dms = self.vapi.nat_det_map_dump()
5400 self.assertEqual(0, dms[0].ses_num)
5402 self.logger.error("TCP session termination failed")
5405 def test_tcp_session_close_detection_out(self):
5406 """ Deterministic NAT TCP session close from outside network """
5407 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5409 socket.inet_aton(self.nat_addr),
5411 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5412 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5415 self.initiate_tcp_session(self.pg0, self.pg1)
5417 # close the session from outside
5419 # FIN packet out -> in
5420 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5421 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5422 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5424 self.pg1.add_stream(p)
5425 self.pg_enable_capture(self.pg_interfaces)
5427 self.pg0.get_capture(1)
5431 # ACK packet in -> out
5432 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5434 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5438 # ACK packet in -> out
5439 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5440 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5441 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5445 self.pg0.add_stream(pkts)
5446 self.pg_enable_capture(self.pg_interfaces)
5448 self.pg1.get_capture(2)
5450 # ACK packet out -> in
5451 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5452 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5453 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5455 self.pg1.add_stream(p)
5456 self.pg_enable_capture(self.pg_interfaces)
5458 self.pg0.get_capture(1)
5460 # Check if deterministic NAT44 closed the session
5461 dms = self.vapi.nat_det_map_dump()
5462 self.assertEqual(0, dms[0].ses_num)
5464 self.logger.error("TCP session termination failed")
5467 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5468 def test_session_timeout(self):
5469 """ Deterministic NAT session timeouts """
5470 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5472 socket.inet_aton(self.nat_addr),
5474 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5475 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5478 self.initiate_tcp_session(self.pg0, self.pg1)
5479 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5480 pkts = self.create_stream_in(self.pg0, self.pg1)
5481 self.pg0.add_stream(pkts)
5482 self.pg_enable_capture(self.pg_interfaces)
5484 capture = self.pg1.get_capture(len(pkts))
5487 dms = self.vapi.nat_det_map_dump()
5488 self.assertEqual(0, dms[0].ses_num)
5490 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5491 def test_session_limit_per_user(self):
5492 """ Deterministic NAT maximum sessions per user limit """
5493 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5495 socket.inet_aton(self.nat_addr),
5497 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5498 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5500 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5501 src_address=self.pg2.local_ip4n,
5503 template_interval=10)
5504 self.vapi.nat_ipfix()
5507 for port in range(1025, 2025):
5508 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5509 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5510 UDP(sport=port, dport=port))
5513 self.pg0.add_stream(pkts)
5514 self.pg_enable_capture(self.pg_interfaces)
5516 capture = self.pg1.get_capture(len(pkts))
5518 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5519 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5520 UDP(sport=3001, dport=3002))
5521 self.pg0.add_stream(p)
5522 self.pg_enable_capture(self.pg_interfaces)
5524 capture = self.pg1.assert_nothing_captured()
5526 # verify ICMP error packet
5527 capture = self.pg0.get_capture(1)
5529 self.assertTrue(p.haslayer(ICMP))
5531 self.assertEqual(icmp.type, 3)
5532 self.assertEqual(icmp.code, 1)
5533 self.assertTrue(icmp.haslayer(IPerror))
5534 inner_ip = icmp[IPerror]
5535 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5536 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5538 dms = self.vapi.nat_det_map_dump()
5540 self.assertEqual(1000, dms[0].ses_num)
5542 # verify IPFIX logging
5543 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5545 capture = self.pg2.get_capture(2)
5546 ipfix = IPFIXDecoder()
5547 # first load template
5549 self.assertTrue(p.haslayer(IPFIX))
5550 if p.haslayer(Template):
5551 ipfix.add_template(p.getlayer(Template))
5552 # verify events in data set
5554 if p.haslayer(Data):
5555 data = ipfix.decode_data_set(p.getlayer(Set))
5556 self.verify_ipfix_max_entries_per_user(data)
5558 def clear_nat_det(self):
5560 Clear deterministic NAT configuration.
5562 self.vapi.nat_ipfix(enable=0)
5563 self.vapi.nat_det_set_timeouts()
5564 deterministic_mappings = self.vapi.nat_det_map_dump()
5565 for dsm in deterministic_mappings:
5566 self.vapi.nat_det_add_del_map(dsm.in_addr,
5572 interfaces = self.vapi.nat44_interface_dump()
5573 for intf in interfaces:
5574 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5579 super(TestDeterministicNAT, self).tearDown()
5580 if not self.vpp_dead:
5581 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5583 self.vapi.cli("show nat44 deterministic mappings"))
5585 self.vapi.cli("show nat44 deterministic timeouts"))
5587 self.vapi.cli("show nat44 deterministic sessions"))
5588 self.clear_nat_det()
5591 class TestNAT64(MethodHolder):
5592 """ NAT64 Test Cases """
5595 def setUpConstants(cls):
5596 super(TestNAT64, cls).setUpConstants()
5597 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5598 "nat64 st hash buckets 256", "}"])
5601 def setUpClass(cls):
5602 super(TestNAT64, cls).setUpClass()
5605 cls.tcp_port_in = 6303
5606 cls.tcp_port_out = 6303
5607 cls.udp_port_in = 6304
5608 cls.udp_port_out = 6304
5609 cls.icmp_id_in = 6305
5610 cls.icmp_id_out = 6305
5611 cls.nat_addr = '10.0.0.3'
5612 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5614 cls.vrf1_nat_addr = '10.0.10.3'
5615 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5617 cls.ipfix_src_port = 4739
5618 cls.ipfix_domain_id = 1
5620 cls.create_pg_interfaces(range(6))
5621 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5622 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5623 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5625 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5627 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5629 cls.pg0.generate_remote_hosts(2)
5631 for i in cls.ip6_interfaces:
5634 i.configure_ipv6_neighbors()
5636 for i in cls.ip4_interfaces:
5642 cls.pg3.config_ip4()
5643 cls.pg3.resolve_arp()
5644 cls.pg3.config_ip6()
5645 cls.pg3.configure_ipv6_neighbors()
5648 cls.pg5.config_ip6()
5651 super(TestNAT64, cls).tearDownClass()
5654 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5655 """ NAT64 inside interface handles Neighbor Advertisement """
5657 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5660 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5661 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5662 ICMPv6EchoRequest())
5664 self.pg5.add_stream(pkts)
5665 self.pg_enable_capture(self.pg_interfaces)
5668 # Wait for Neighbor Solicitation
5669 capture = self.pg5.get_capture(len(pkts))
5670 self.assertEqual(1, len(capture))
5673 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5674 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5675 tgt = packet[ICMPv6ND_NS].tgt
5677 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5680 # Send Neighbor Advertisement
5681 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5682 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5683 ICMPv6ND_NA(tgt=tgt) /
5684 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5686 self.pg5.add_stream(pkts)
5687 self.pg_enable_capture(self.pg_interfaces)
5690 # Try to send ping again
5692 self.pg5.add_stream(pkts)
5693 self.pg_enable_capture(self.pg_interfaces)
5696 # Wait for ping reply
5697 capture = self.pg5.get_capture(len(pkts))
5698 self.assertEqual(1, len(capture))
5701 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5702 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5703 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5705 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5708 def test_pool(self):
5709 """ Add/delete address to NAT64 pool """
5710 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5712 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5714 addresses = self.vapi.nat64_pool_addr_dump()
5715 self.assertEqual(len(addresses), 1)
5716 self.assertEqual(addresses[0].address, nat_addr)
5718 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5720 addresses = self.vapi.nat64_pool_addr_dump()
5721 self.assertEqual(len(addresses), 0)
5723 def test_interface(self):
5724 """ Enable/disable NAT64 feature on the interface """
5725 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5726 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5728 interfaces = self.vapi.nat64_interface_dump()
5729 self.assertEqual(len(interfaces), 2)
5732 for intf in interfaces:
5733 if intf.sw_if_index == self.pg0.sw_if_index:
5734 self.assertEqual(intf.is_inside, 1)
5736 elif intf.sw_if_index == self.pg1.sw_if_index:
5737 self.assertEqual(intf.is_inside, 0)
5739 self.assertTrue(pg0_found)
5740 self.assertTrue(pg1_found)
5742 features = self.vapi.cli("show interface features pg0")
5743 self.assertNotEqual(features.find('nat64-in2out'), -1)
5744 features = self.vapi.cli("show interface features pg1")
5745 self.assertNotEqual(features.find('nat64-out2in'), -1)
5747 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5748 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5750 interfaces = self.vapi.nat64_interface_dump()
5751 self.assertEqual(len(interfaces), 0)
5753 def test_static_bib(self):
5754 """ Add/delete static BIB entry """
5755 in_addr = socket.inet_pton(socket.AF_INET6,
5756 '2001:db8:85a3::8a2e:370:7334')
5757 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5760 proto = IP_PROTOS.tcp
5762 self.vapi.nat64_add_del_static_bib(in_addr,
5767 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5772 self.assertEqual(bibe.i_addr, in_addr)
5773 self.assertEqual(bibe.o_addr, out_addr)
5774 self.assertEqual(bibe.i_port, in_port)
5775 self.assertEqual(bibe.o_port, out_port)
5776 self.assertEqual(static_bib_num, 1)
5778 self.vapi.nat64_add_del_static_bib(in_addr,
5784 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5789 self.assertEqual(static_bib_num, 0)
5791 def test_set_timeouts(self):
5792 """ Set NAT64 timeouts """
5793 # verify default values
5794 timeouts = self.vapi.nat64_get_timeouts()
5795 self.assertEqual(timeouts.udp, 300)
5796 self.assertEqual(timeouts.icmp, 60)
5797 self.assertEqual(timeouts.tcp_trans, 240)
5798 self.assertEqual(timeouts.tcp_est, 7440)
5799 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5801 # set and verify custom values
5802 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5803 tcp_est=7450, tcp_incoming_syn=10)
5804 timeouts = self.vapi.nat64_get_timeouts()
5805 self.assertEqual(timeouts.udp, 200)
5806 self.assertEqual(timeouts.icmp, 30)
5807 self.assertEqual(timeouts.tcp_trans, 250)
5808 self.assertEqual(timeouts.tcp_est, 7450)
5809 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5811 def test_dynamic(self):
5812 """ NAT64 dynamic translation test """
5813 self.tcp_port_in = 6303
5814 self.udp_port_in = 6304
5815 self.icmp_id_in = 6305
5817 ses_num_start = self.nat64_get_ses_num()
5819 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5821 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5822 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5825 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5826 self.pg0.add_stream(pkts)
5827 self.pg_enable_capture(self.pg_interfaces)
5829 capture = self.pg1.get_capture(len(pkts))
5830 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5831 dst_ip=self.pg1.remote_ip4)
5834 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5835 self.pg1.add_stream(pkts)
5836 self.pg_enable_capture(self.pg_interfaces)
5838 capture = self.pg0.get_capture(len(pkts))
5839 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5840 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5843 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5844 self.pg0.add_stream(pkts)
5845 self.pg_enable_capture(self.pg_interfaces)
5847 capture = self.pg1.get_capture(len(pkts))
5848 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5849 dst_ip=self.pg1.remote_ip4)
5852 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5853 self.pg1.add_stream(pkts)
5854 self.pg_enable_capture(self.pg_interfaces)
5856 capture = self.pg0.get_capture(len(pkts))
5857 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5859 ses_num_end = self.nat64_get_ses_num()
5861 self.assertEqual(ses_num_end - ses_num_start, 3)
5863 # tenant with specific VRF
5864 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5865 self.vrf1_nat_addr_n,
5866 vrf_id=self.vrf1_id)
5867 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5869 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5870 self.pg2.add_stream(pkts)
5871 self.pg_enable_capture(self.pg_interfaces)
5873 capture = self.pg1.get_capture(len(pkts))
5874 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5875 dst_ip=self.pg1.remote_ip4)
5877 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5878 self.pg1.add_stream(pkts)
5879 self.pg_enable_capture(self.pg_interfaces)
5881 capture = self.pg2.get_capture(len(pkts))
5882 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5884 def test_static(self):
5885 """ NAT64 static translation test """
5886 self.tcp_port_in = 60303
5887 self.udp_port_in = 60304
5888 self.icmp_id_in = 60305
5889 self.tcp_port_out = 60303
5890 self.udp_port_out = 60304
5891 self.icmp_id_out = 60305
5893 ses_num_start = self.nat64_get_ses_num()
5895 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5897 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5898 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5900 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5905 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5910 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5917 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5918 self.pg0.add_stream(pkts)
5919 self.pg_enable_capture(self.pg_interfaces)
5921 capture = self.pg1.get_capture(len(pkts))
5922 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5923 dst_ip=self.pg1.remote_ip4, same_port=True)
5926 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5927 self.pg1.add_stream(pkts)
5928 self.pg_enable_capture(self.pg_interfaces)
5930 capture = self.pg0.get_capture(len(pkts))
5931 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5932 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5934 ses_num_end = self.nat64_get_ses_num()
5936 self.assertEqual(ses_num_end - ses_num_start, 3)
5938 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5939 def test_session_timeout(self):
5940 """ NAT64 session timeout """
5941 self.icmp_id_in = 1234
5942 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5944 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5945 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5946 self.vapi.nat64_set_timeouts(icmp=5)
5948 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5949 self.pg0.add_stream(pkts)
5950 self.pg_enable_capture(self.pg_interfaces)
5952 capture = self.pg1.get_capture(len(pkts))
5954 ses_num_before_timeout = self.nat64_get_ses_num()
5958 # ICMP session after timeout
5959 ses_num_after_timeout = self.nat64_get_ses_num()
5960 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5962 def test_icmp_error(self):
5963 """ NAT64 ICMP Error message translation """
5964 self.tcp_port_in = 6303
5965 self.udp_port_in = 6304
5966 self.icmp_id_in = 6305
5968 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5970 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5971 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5973 # send some packets to create sessions
5974 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5975 self.pg0.add_stream(pkts)
5976 self.pg_enable_capture(self.pg_interfaces)
5978 capture_ip4 = self.pg1.get_capture(len(pkts))
5979 self.verify_capture_out(capture_ip4,
5980 nat_ip=self.nat_addr,
5981 dst_ip=self.pg1.remote_ip4)
5983 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5984 self.pg1.add_stream(pkts)
5985 self.pg_enable_capture(self.pg_interfaces)
5987 capture_ip6 = self.pg0.get_capture(len(pkts))
5988 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5989 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5990 self.pg0.remote_ip6)
5993 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5994 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5995 ICMPv6DestUnreach(code=1) /
5996 packet[IPv6] for packet in capture_ip6]
5997 self.pg0.add_stream(pkts)
5998 self.pg_enable_capture(self.pg_interfaces)
6000 capture = self.pg1.get_capture(len(pkts))
6001 for packet in capture:
6003 self.assertEqual(packet[IP].src, self.nat_addr)
6004 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6005 self.assertEqual(packet[ICMP].type, 3)
6006 self.assertEqual(packet[ICMP].code, 13)
6007 inner = packet[IPerror]
6008 self.assertEqual(inner.src, self.pg1.remote_ip4)
6009 self.assertEqual(inner.dst, self.nat_addr)
6010 self.assert_packet_checksums_valid(packet)
6011 if inner.haslayer(TCPerror):
6012 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6013 elif inner.haslayer(UDPerror):
6014 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6016 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6018 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6022 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6023 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6024 ICMP(type=3, code=13) /
6025 packet[IP] for packet in capture_ip4]
6026 self.pg1.add_stream(pkts)
6027 self.pg_enable_capture(self.pg_interfaces)
6029 capture = self.pg0.get_capture(len(pkts))
6030 for packet in capture:
6032 self.assertEqual(packet[IPv6].src, ip.src)
6033 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6034 icmp = packet[ICMPv6DestUnreach]
6035 self.assertEqual(icmp.code, 1)
6036 inner = icmp[IPerror6]
6037 self.assertEqual(inner.src, self.pg0.remote_ip6)
6038 self.assertEqual(inner.dst, ip.src)
6039 self.assert_icmpv6_checksum_valid(packet)
6040 if inner.haslayer(TCPerror):
6041 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6042 elif inner.haslayer(UDPerror):
6043 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6045 self.assertEqual(inner[ICMPv6EchoRequest].id,
6048 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6051 def test_hairpinning(self):
6052 """ NAT64 hairpinning """
6054 client = self.pg0.remote_hosts[0]
6055 server = self.pg0.remote_hosts[1]
6056 server_tcp_in_port = 22
6057 server_tcp_out_port = 4022
6058 server_udp_in_port = 23
6059 server_udp_out_port = 4023
6060 client_tcp_in_port = 1234
6061 client_udp_in_port = 1235
6062 client_tcp_out_port = 0
6063 client_udp_out_port = 0
6064 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6065 nat_addr_ip6 = ip.src
6067 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6069 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6070 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6072 self.vapi.nat64_add_del_static_bib(server.ip6n,
6075 server_tcp_out_port,
6077 self.vapi.nat64_add_del_static_bib(server.ip6n,
6080 server_udp_out_port,
6085 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6086 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6087 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6089 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6090 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6091 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6093 self.pg0.add_stream(pkts)
6094 self.pg_enable_capture(self.pg_interfaces)
6096 capture = self.pg0.get_capture(len(pkts))
6097 for packet in capture:
6099 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6100 self.assertEqual(packet[IPv6].dst, server.ip6)
6101 self.assert_packet_checksums_valid(packet)
6102 if packet.haslayer(TCP):
6103 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6104 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6105 client_tcp_out_port = packet[TCP].sport
6107 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6108 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6109 client_udp_out_port = packet[UDP].sport
6111 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6116 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6117 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6118 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6120 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6121 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6122 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6124 self.pg0.add_stream(pkts)
6125 self.pg_enable_capture(self.pg_interfaces)
6127 capture = self.pg0.get_capture(len(pkts))
6128 for packet in capture:
6130 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6131 self.assertEqual(packet[IPv6].dst, client.ip6)
6132 self.assert_packet_checksums_valid(packet)
6133 if packet.haslayer(TCP):
6134 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6135 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6137 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6138 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6140 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6145 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6146 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6147 ICMPv6DestUnreach(code=1) /
6148 packet[IPv6] for packet in capture]
6149 self.pg0.add_stream(pkts)
6150 self.pg_enable_capture(self.pg_interfaces)
6152 capture = self.pg0.get_capture(len(pkts))
6153 for packet in capture:
6155 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6156 self.assertEqual(packet[IPv6].dst, server.ip6)
6157 icmp = packet[ICMPv6DestUnreach]
6158 self.assertEqual(icmp.code, 1)
6159 inner = icmp[IPerror6]
6160 self.assertEqual(inner.src, server.ip6)
6161 self.assertEqual(inner.dst, nat_addr_ip6)
6162 self.assert_packet_checksums_valid(packet)
6163 if inner.haslayer(TCPerror):
6164 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6165 self.assertEqual(inner[TCPerror].dport,
6166 client_tcp_out_port)
6168 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6169 self.assertEqual(inner[UDPerror].dport,
6170 client_udp_out_port)
6172 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6175 def test_prefix(self):
6176 """ NAT64 Network-Specific Prefix """
6178 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6180 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6181 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6182 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6183 self.vrf1_nat_addr_n,
6184 vrf_id=self.vrf1_id)
6185 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6188 global_pref64 = "2001:db8::"
6189 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6190 global_pref64_len = 32
6191 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6193 prefix = self.vapi.nat64_prefix_dump()
6194 self.assertEqual(len(prefix), 1)
6195 self.assertEqual(prefix[0].prefix, global_pref64_n)
6196 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6197 self.assertEqual(prefix[0].vrf_id, 0)
6199 # Add tenant specific prefix
6200 vrf1_pref64 = "2001:db8:122:300::"
6201 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6202 vrf1_pref64_len = 56
6203 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6205 vrf_id=self.vrf1_id)
6206 prefix = self.vapi.nat64_prefix_dump()
6207 self.assertEqual(len(prefix), 2)
6210 pkts = self.create_stream_in_ip6(self.pg0,
6213 plen=global_pref64_len)
6214 self.pg0.add_stream(pkts)
6215 self.pg_enable_capture(self.pg_interfaces)
6217 capture = self.pg1.get_capture(len(pkts))
6218 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6219 dst_ip=self.pg1.remote_ip4)
6221 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6222 self.pg1.add_stream(pkts)
6223 self.pg_enable_capture(self.pg_interfaces)
6225 capture = self.pg0.get_capture(len(pkts))
6226 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6229 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6231 # Tenant specific prefix
6232 pkts = self.create_stream_in_ip6(self.pg2,
6235 plen=vrf1_pref64_len)
6236 self.pg2.add_stream(pkts)
6237 self.pg_enable_capture(self.pg_interfaces)
6239 capture = self.pg1.get_capture(len(pkts))
6240 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6241 dst_ip=self.pg1.remote_ip4)
6243 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6244 self.pg1.add_stream(pkts)
6245 self.pg_enable_capture(self.pg_interfaces)
6247 capture = self.pg2.get_capture(len(pkts))
6248 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6251 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6253 def test_unknown_proto(self):
6254 """ NAT64 translate packet with unknown protocol """
6256 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6258 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6259 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6260 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6263 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6264 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6265 TCP(sport=self.tcp_port_in, dport=20))
6266 self.pg0.add_stream(p)
6267 self.pg_enable_capture(self.pg_interfaces)
6269 p = self.pg1.get_capture(1)
6271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6272 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6274 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6275 TCP(sport=1234, dport=1234))
6276 self.pg0.add_stream(p)
6277 self.pg_enable_capture(self.pg_interfaces)
6279 p = self.pg1.get_capture(1)
6282 self.assertEqual(packet[IP].src, self.nat_addr)
6283 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6284 self.assertTrue(packet.haslayer(GRE))
6285 self.assert_packet_checksums_valid(packet)
6287 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6291 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6292 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6294 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6295 TCP(sport=1234, dport=1234))
6296 self.pg1.add_stream(p)
6297 self.pg_enable_capture(self.pg_interfaces)
6299 p = self.pg0.get_capture(1)
6302 self.assertEqual(packet[IPv6].src, remote_ip6)
6303 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6304 self.assertEqual(packet[IPv6].nh, 47)
6306 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6309 def test_hairpinning_unknown_proto(self):
6310 """ NAT64 translate packet with unknown protocol - hairpinning """
6312 client = self.pg0.remote_hosts[0]
6313 server = self.pg0.remote_hosts[1]
6314 server_tcp_in_port = 22
6315 server_tcp_out_port = 4022
6316 client_tcp_in_port = 1234
6317 client_tcp_out_port = 1235
6318 server_nat_ip = "10.0.0.100"
6319 client_nat_ip = "10.0.0.110"
6320 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6321 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6322 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6323 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6325 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6327 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6328 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6330 self.vapi.nat64_add_del_static_bib(server.ip6n,
6333 server_tcp_out_port,
6336 self.vapi.nat64_add_del_static_bib(server.ip6n,
6342 self.vapi.nat64_add_del_static_bib(client.ip6n,
6345 client_tcp_out_port,
6349 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6350 IPv6(src=client.ip6, dst=server_nat_ip6) /
6351 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6352 self.pg0.add_stream(p)
6353 self.pg_enable_capture(self.pg_interfaces)
6355 p = self.pg0.get_capture(1)
6357 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6358 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6360 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6361 TCP(sport=1234, dport=1234))
6362 self.pg0.add_stream(p)
6363 self.pg_enable_capture(self.pg_interfaces)
6365 p = self.pg0.get_capture(1)
6368 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6369 self.assertEqual(packet[IPv6].dst, server.ip6)
6370 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6372 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6376 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6377 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6379 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6380 TCP(sport=1234, dport=1234))
6381 self.pg0.add_stream(p)
6382 self.pg_enable_capture(self.pg_interfaces)
6384 p = self.pg0.get_capture(1)
6387 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6388 self.assertEqual(packet[IPv6].dst, client.ip6)
6389 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6391 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6394 def test_one_armed_nat64(self):
6395 """ One armed NAT64 """
6397 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6401 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6403 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6404 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6407 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6408 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6409 TCP(sport=12345, dport=80))
6410 self.pg3.add_stream(p)
6411 self.pg_enable_capture(self.pg_interfaces)
6413 capture = self.pg3.get_capture(1)
6418 self.assertEqual(ip.src, self.nat_addr)
6419 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6420 self.assertNotEqual(tcp.sport, 12345)
6421 external_port = tcp.sport
6422 self.assertEqual(tcp.dport, 80)
6423 self.assert_packet_checksums_valid(p)
6425 self.logger.error(ppp("Unexpected or invalid packet:", p))
6429 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6430 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6431 TCP(sport=80, dport=external_port))
6432 self.pg3.add_stream(p)
6433 self.pg_enable_capture(self.pg_interfaces)
6435 capture = self.pg3.get_capture(1)
6440 self.assertEqual(ip.src, remote_host_ip6)
6441 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6442 self.assertEqual(tcp.sport, 80)
6443 self.assertEqual(tcp.dport, 12345)
6444 self.assert_packet_checksums_valid(p)
6446 self.logger.error(ppp("Unexpected or invalid packet:", p))
6449 def test_frag_in_order(self):
6450 """ NAT64 translate fragments arriving in order """
6451 self.tcp_port_in = random.randint(1025, 65535)
6453 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6455 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6456 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6458 reass = self.vapi.nat_reass_dump()
6459 reass_n_start = len(reass)
6463 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6464 self.tcp_port_in, 20, data)
6465 self.pg0.add_stream(pkts)
6466 self.pg_enable_capture(self.pg_interfaces)
6468 frags = self.pg1.get_capture(len(pkts))
6469 p = self.reass_frags_and_verify(frags,
6471 self.pg1.remote_ip4)
6472 self.assertEqual(p[TCP].dport, 20)
6473 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6474 self.tcp_port_out = p[TCP].sport
6475 self.assertEqual(data, p[Raw].load)
6478 data = "A" * 4 + "b" * 16 + "C" * 3
6479 pkts = self.create_stream_frag(self.pg1,
6484 self.pg1.add_stream(pkts)
6485 self.pg_enable_capture(self.pg_interfaces)
6487 frags = self.pg0.get_capture(len(pkts))
6488 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6489 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6490 self.assertEqual(p[TCP].sport, 20)
6491 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6492 self.assertEqual(data, p[Raw].load)
6494 reass = self.vapi.nat_reass_dump()
6495 reass_n_end = len(reass)
6497 self.assertEqual(reass_n_end - reass_n_start, 2)
6499 def test_reass_hairpinning(self):
6500 """ NAT64 fragments hairpinning """
6502 server = self.pg0.remote_hosts[1]
6503 server_in_port = random.randint(1025, 65535)
6504 server_out_port = random.randint(1025, 65535)
6505 client_in_port = random.randint(1025, 65535)
6506 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6507 nat_addr_ip6 = ip.src
6509 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6511 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6512 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6514 # add static BIB entry for server
6515 self.vapi.nat64_add_del_static_bib(server.ip6n,
6521 # send packet from host to server
6522 pkts = self.create_stream_frag_ip6(self.pg0,
6527 self.pg0.add_stream(pkts)
6528 self.pg_enable_capture(self.pg_interfaces)
6530 frags = self.pg0.get_capture(len(pkts))
6531 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6532 self.assertNotEqual(p[TCP].sport, client_in_port)
6533 self.assertEqual(p[TCP].dport, server_in_port)
6534 self.assertEqual(data, p[Raw].load)
6536 def test_frag_out_of_order(self):
6537 """ NAT64 translate fragments arriving out of order """
6538 self.tcp_port_in = random.randint(1025, 65535)
6540 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6542 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6543 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6547 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6548 self.tcp_port_in, 20, data)
6550 self.pg0.add_stream(pkts)
6551 self.pg_enable_capture(self.pg_interfaces)
6553 frags = self.pg1.get_capture(len(pkts))
6554 p = self.reass_frags_and_verify(frags,
6556 self.pg1.remote_ip4)
6557 self.assertEqual(p[TCP].dport, 20)
6558 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6559 self.tcp_port_out = p[TCP].sport
6560 self.assertEqual(data, p[Raw].load)
6563 data = "A" * 4 + "B" * 16 + "C" * 3
6564 pkts = self.create_stream_frag(self.pg1,
6570 self.pg1.add_stream(pkts)
6571 self.pg_enable_capture(self.pg_interfaces)
6573 frags = self.pg0.get_capture(len(pkts))
6574 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6575 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6576 self.assertEqual(p[TCP].sport, 20)
6577 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6578 self.assertEqual(data, p[Raw].load)
6580 def test_interface_addr(self):
6581 """ Acquire NAT64 pool addresses from interface """
6582 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6584 # no address in NAT64 pool
6585 adresses = self.vapi.nat44_address_dump()
6586 self.assertEqual(0, len(adresses))
6588 # configure interface address and check NAT64 address pool
6589 self.pg4.config_ip4()
6590 addresses = self.vapi.nat64_pool_addr_dump()
6591 self.assertEqual(len(addresses), 1)
6592 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6594 # remove interface address and check NAT64 address pool
6595 self.pg4.unconfig_ip4()
6596 addresses = self.vapi.nat64_pool_addr_dump()
6597 self.assertEqual(0, len(adresses))
6599 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6600 def test_ipfix_max_bibs_sessions(self):
6601 """ IPFIX logging maximum session and BIB entries exceeded """
6604 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6608 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6610 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6611 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6615 for i in range(0, max_bibs):
6616 src = "fd01:aa::%x" % (i)
6617 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6618 IPv6(src=src, dst=remote_host_ip6) /
6619 TCP(sport=12345, dport=80))
6621 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6622 IPv6(src=src, dst=remote_host_ip6) /
6623 TCP(sport=12345, dport=22))
6625 self.pg0.add_stream(pkts)
6626 self.pg_enable_capture(self.pg_interfaces)
6628 self.pg1.get_capture(max_sessions)
6630 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6631 src_address=self.pg3.local_ip4n,
6633 template_interval=10)
6634 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6635 src_port=self.ipfix_src_port)
6637 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6638 IPv6(src=src, dst=remote_host_ip6) /
6639 TCP(sport=12345, dport=25))
6640 self.pg0.add_stream(p)
6641 self.pg_enable_capture(self.pg_interfaces)
6643 self.pg1.assert_nothing_captured()
6645 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6646 capture = self.pg3.get_capture(9)
6647 ipfix = IPFIXDecoder()
6648 # first load template
6650 self.assertTrue(p.haslayer(IPFIX))
6651 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6652 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6653 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6654 self.assertEqual(p[UDP].dport, 4739)
6655 self.assertEqual(p[IPFIX].observationDomainID,
6656 self.ipfix_domain_id)
6657 if p.haslayer(Template):
6658 ipfix.add_template(p.getlayer(Template))
6659 # verify events in data set
6661 if p.haslayer(Data):
6662 data = ipfix.decode_data_set(p.getlayer(Set))
6663 self.verify_ipfix_max_sessions(data, max_sessions)
6665 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6666 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6667 TCP(sport=12345, dport=80))
6668 self.pg0.add_stream(p)
6669 self.pg_enable_capture(self.pg_interfaces)
6671 self.pg1.assert_nothing_captured()
6673 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6674 capture = self.pg3.get_capture(1)
6675 # verify events in data set
6677 self.assertTrue(p.haslayer(IPFIX))
6678 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6679 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6680 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6681 self.assertEqual(p[UDP].dport, 4739)
6682 self.assertEqual(p[IPFIX].observationDomainID,
6683 self.ipfix_domain_id)
6684 if p.haslayer(Data):
6685 data = ipfix.decode_data_set(p.getlayer(Set))
6686 self.verify_ipfix_max_bibs(data, max_bibs)
6688 def test_ipfix_max_frags(self):
6689 """ IPFIX logging maximum fragments pending reassembly exceeded """
6690 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6692 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6693 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6694 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6695 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6696 src_address=self.pg3.local_ip4n,
6698 template_interval=10)
6699 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6700 src_port=self.ipfix_src_port)
6703 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6704 self.tcp_port_in, 20, data)
6705 self.pg0.add_stream(pkts[-1])
6706 self.pg_enable_capture(self.pg_interfaces)
6708 self.pg1.assert_nothing_captured()
6710 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6711 capture = self.pg3.get_capture(9)
6712 ipfix = IPFIXDecoder()
6713 # first load template
6715 self.assertTrue(p.haslayer(IPFIX))
6716 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6717 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6718 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6719 self.assertEqual(p[UDP].dport, 4739)
6720 self.assertEqual(p[IPFIX].observationDomainID,
6721 self.ipfix_domain_id)
6722 if p.haslayer(Template):
6723 ipfix.add_template(p.getlayer(Template))
6724 # verify events in data set
6726 if p.haslayer(Data):
6727 data = ipfix.decode_data_set(p.getlayer(Set))
6728 self.verify_ipfix_max_fragments_ip6(data, 0,
6729 self.pg0.remote_ip6n)
6731 def test_ipfix_bib_ses(self):
6732 """ IPFIX logging NAT64 BIB/session create and delete events """
6733 self.tcp_port_in = random.randint(1025, 65535)
6734 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6738 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6740 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6741 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6742 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6743 src_address=self.pg3.local_ip4n,
6745 template_interval=10)
6746 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6747 src_port=self.ipfix_src_port)
6750 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6751 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6752 TCP(sport=self.tcp_port_in, dport=25))
6753 self.pg0.add_stream(p)
6754 self.pg_enable_capture(self.pg_interfaces)
6756 p = self.pg1.get_capture(1)
6757 self.tcp_port_out = p[0][TCP].sport
6758 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6759 capture = self.pg3.get_capture(10)
6760 ipfix = IPFIXDecoder()
6761 # first load template
6763 self.assertTrue(p.haslayer(IPFIX))
6764 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6765 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6766 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6767 self.assertEqual(p[UDP].dport, 4739)
6768 self.assertEqual(p[IPFIX].observationDomainID,
6769 self.ipfix_domain_id)
6770 if p.haslayer(Template):
6771 ipfix.add_template(p.getlayer(Template))
6772 # verify events in data set
6774 if p.haslayer(Data):
6775 data = ipfix.decode_data_set(p.getlayer(Set))
6776 if ord(data[0][230]) == 10:
6777 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6778 elif ord(data[0][230]) == 6:
6779 self.verify_ipfix_nat64_ses(data,
6781 self.pg0.remote_ip6n,
6782 self.pg1.remote_ip4,
6785 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6788 self.pg_enable_capture(self.pg_interfaces)
6789 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6792 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6793 capture = self.pg3.get_capture(2)
6794 # verify events in data set
6796 self.assertTrue(p.haslayer(IPFIX))
6797 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6798 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6799 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6800 self.assertEqual(p[UDP].dport, 4739)
6801 self.assertEqual(p[IPFIX].observationDomainID,
6802 self.ipfix_domain_id)
6803 if p.haslayer(Data):
6804 data = ipfix.decode_data_set(p.getlayer(Set))
6805 if ord(data[0][230]) == 11:
6806 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6807 elif ord(data[0][230]) == 7:
6808 self.verify_ipfix_nat64_ses(data,
6810 self.pg0.remote_ip6n,
6811 self.pg1.remote_ip4,
6814 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6816 def nat64_get_ses_num(self):
6818 Return number of active NAT64 sessions.
6820 st = self.vapi.nat64_st_dump()
6823 def clear_nat64(self):
6825 Clear NAT64 configuration.
6827 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6828 domain_id=self.ipfix_domain_id)
6829 self.ipfix_src_port = 4739
6830 self.ipfix_domain_id = 1
6832 self.vapi.nat64_set_timeouts()
6834 interfaces = self.vapi.nat64_interface_dump()
6835 for intf in interfaces:
6836 if intf.is_inside > 1:
6837 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6840 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6844 bib = self.vapi.nat64_bib_dump(255)
6847 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6855 adresses = self.vapi.nat64_pool_addr_dump()
6856 for addr in adresses:
6857 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6862 prefixes = self.vapi.nat64_prefix_dump()
6863 for prefix in prefixes:
6864 self.vapi.nat64_add_del_prefix(prefix.prefix,
6866 vrf_id=prefix.vrf_id,
6870 super(TestNAT64, self).tearDown()
6871 if not self.vpp_dead:
6872 self.logger.info(self.vapi.cli("show nat64 pool"))
6873 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6874 self.logger.info(self.vapi.cli("show nat64 prefix"))
6875 self.logger.info(self.vapi.cli("show nat64 bib all"))
6876 self.logger.info(self.vapi.cli("show nat64 session table all"))
6877 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6881 class TestDSlite(MethodHolder):
6882 """ DS-Lite Test Cases """
6885 def setUpClass(cls):
6886 super(TestDSlite, cls).setUpClass()
6889 cls.nat_addr = '10.0.0.3'
6890 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6892 cls.create_pg_interfaces(range(2))
6894 cls.pg0.config_ip4()
6895 cls.pg0.resolve_arp()
6897 cls.pg1.config_ip6()
6898 cls.pg1.generate_remote_hosts(2)
6899 cls.pg1.configure_ipv6_neighbors()
6902 super(TestDSlite, cls).tearDownClass()
6905 def test_dslite(self):
6906 """ Test DS-Lite """
6907 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6909 aftr_ip4 = '192.0.0.1'
6910 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6911 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6912 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6913 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6916 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6917 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6918 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6919 UDP(sport=20000, dport=10000))
6920 self.pg1.add_stream(p)
6921 self.pg_enable_capture(self.pg_interfaces)
6923 capture = self.pg0.get_capture(1)
6924 capture = capture[0]
6925 self.assertFalse(capture.haslayer(IPv6))
6926 self.assertEqual(capture[IP].src, self.nat_addr)
6927 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6928 self.assertNotEqual(capture[UDP].sport, 20000)
6929 self.assertEqual(capture[UDP].dport, 10000)
6930 self.assert_packet_checksums_valid(capture)
6931 out_port = capture[UDP].sport
6933 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6934 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6935 UDP(sport=10000, dport=out_port))
6936 self.pg0.add_stream(p)
6937 self.pg_enable_capture(self.pg_interfaces)
6939 capture = self.pg1.get_capture(1)
6940 capture = capture[0]
6941 self.assertEqual(capture[IPv6].src, aftr_ip6)
6942 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6943 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6944 self.assertEqual(capture[IP].dst, '192.168.1.1')
6945 self.assertEqual(capture[UDP].sport, 10000)
6946 self.assertEqual(capture[UDP].dport, 20000)
6947 self.assert_packet_checksums_valid(capture)
6950 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6951 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6952 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6953 TCP(sport=20001, dport=10001))
6954 self.pg1.add_stream(p)
6955 self.pg_enable_capture(self.pg_interfaces)
6957 capture = self.pg0.get_capture(1)
6958 capture = capture[0]
6959 self.assertFalse(capture.haslayer(IPv6))
6960 self.assertEqual(capture[IP].src, self.nat_addr)
6961 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6962 self.assertNotEqual(capture[TCP].sport, 20001)
6963 self.assertEqual(capture[TCP].dport, 10001)
6964 self.assert_packet_checksums_valid(capture)
6965 out_port = capture[TCP].sport
6967 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6968 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6969 TCP(sport=10001, dport=out_port))
6970 self.pg0.add_stream(p)
6971 self.pg_enable_capture(self.pg_interfaces)
6973 capture = self.pg1.get_capture(1)
6974 capture = capture[0]
6975 self.assertEqual(capture[IPv6].src, aftr_ip6)
6976 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6977 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6978 self.assertEqual(capture[IP].dst, '192.168.1.1')
6979 self.assertEqual(capture[TCP].sport, 10001)
6980 self.assertEqual(capture[TCP].dport, 20001)
6981 self.assert_packet_checksums_valid(capture)
6984 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6985 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6986 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6987 ICMP(id=4000, type='echo-request'))
6988 self.pg1.add_stream(p)
6989 self.pg_enable_capture(self.pg_interfaces)
6991 capture = self.pg0.get_capture(1)
6992 capture = capture[0]
6993 self.assertFalse(capture.haslayer(IPv6))
6994 self.assertEqual(capture[IP].src, self.nat_addr)
6995 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6996 self.assertNotEqual(capture[ICMP].id, 4000)
6997 self.assert_packet_checksums_valid(capture)
6998 out_id = capture[ICMP].id
7000 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7001 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7002 ICMP(id=out_id, type='echo-reply'))
7003 self.pg0.add_stream(p)
7004 self.pg_enable_capture(self.pg_interfaces)
7006 capture = self.pg1.get_capture(1)
7007 capture = capture[0]
7008 self.assertEqual(capture[IPv6].src, aftr_ip6)
7009 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7010 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7011 self.assertEqual(capture[IP].dst, '192.168.1.1')
7012 self.assertEqual(capture[ICMP].id, 4000)
7013 self.assert_packet_checksums_valid(capture)
7015 # ping DS-Lite AFTR tunnel endpoint address
7016 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7017 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7018 ICMPv6EchoRequest())
7019 self.pg1.add_stream(p)
7020 self.pg_enable_capture(self.pg_interfaces)
7022 capture = self.pg1.get_capture(1)
7023 self.assertEqual(1, len(capture))
7024 capture = capture[0]
7025 self.assertEqual(capture[IPv6].src, aftr_ip6)
7026 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7027 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7030 super(TestDSlite, self).tearDown()
7031 if not self.vpp_dead:
7032 self.logger.info(self.vapi.cli("show dslite pool"))
7034 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7035 self.logger.info(self.vapi.cli("show dslite sessions"))
7038 class TestDSliteCE(MethodHolder):
7039 """ DS-Lite CE Test Cases """
7042 def setUpConstants(cls):
7043 super(TestDSliteCE, cls).setUpConstants()
7044 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7047 def setUpClass(cls):
7048 super(TestDSliteCE, cls).setUpClass()
7051 cls.create_pg_interfaces(range(2))
7053 cls.pg0.config_ip4()
7054 cls.pg0.resolve_arp()
7056 cls.pg1.config_ip6()
7057 cls.pg1.generate_remote_hosts(1)
7058 cls.pg1.configure_ipv6_neighbors()
7061 super(TestDSliteCE, cls).tearDownClass()
7064 def test_dslite_ce(self):
7065 """ Test DS-Lite CE """
7067 b4_ip4 = '192.0.0.2'
7068 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7069 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7070 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7071 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7073 aftr_ip4 = '192.0.0.1'
7074 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7075 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7076 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7077 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7079 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7080 dst_address_length=128,
7081 next_hop_address=self.pg1.remote_ip6n,
7082 next_hop_sw_if_index=self.pg1.sw_if_index,
7086 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7087 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7088 UDP(sport=10000, dport=20000))
7089 self.pg0.add_stream(p)
7090 self.pg_enable_capture(self.pg_interfaces)
7092 capture = self.pg1.get_capture(1)
7093 capture = capture[0]
7094 self.assertEqual(capture[IPv6].src, b4_ip6)
7095 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7096 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7097 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7098 self.assertEqual(capture[UDP].sport, 10000)
7099 self.assertEqual(capture[UDP].dport, 20000)
7100 self.assert_packet_checksums_valid(capture)
7103 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7104 IPv6(dst=b4_ip6, src=aftr_ip6) /
7105 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7106 UDP(sport=20000, dport=10000))
7107 self.pg1.add_stream(p)
7108 self.pg_enable_capture(self.pg_interfaces)
7110 capture = self.pg0.get_capture(1)
7111 capture = capture[0]
7112 self.assertFalse(capture.haslayer(IPv6))
7113 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7114 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7115 self.assertEqual(capture[UDP].sport, 20000)
7116 self.assertEqual(capture[UDP].dport, 10000)
7117 self.assert_packet_checksums_valid(capture)
7119 # ping DS-Lite B4 tunnel endpoint address
7120 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7121 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7122 ICMPv6EchoRequest())
7123 self.pg1.add_stream(p)
7124 self.pg_enable_capture(self.pg_interfaces)
7126 capture = self.pg1.get_capture(1)
7127 self.assertEqual(1, len(capture))
7128 capture = capture[0]
7129 self.assertEqual(capture[IPv6].src, b4_ip6)
7130 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7131 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7134 super(TestDSliteCE, self).tearDown()
7135 if not self.vpp_dead:
7137 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7139 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7142 class TestNAT66(MethodHolder):
7143 """ NAT66 Test Cases """
7146 def setUpClass(cls):
7147 super(TestNAT66, cls).setUpClass()
7150 cls.nat_addr = 'fd01:ff::2'
7151 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7153 cls.create_pg_interfaces(range(2))
7154 cls.interfaces = list(cls.pg_interfaces)
7156 for i in cls.interfaces:
7159 i.configure_ipv6_neighbors()
7162 super(TestNAT66, cls).tearDownClass()
7165 def test_static(self):
7166 """ 1:1 NAT66 test """
7167 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7168 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7169 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7174 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7175 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7178 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7179 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7182 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7183 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7184 ICMPv6EchoRequest())
7186 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7187 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7188 GRE() / IP() / TCP())
7190 self.pg0.add_stream(pkts)
7191 self.pg_enable_capture(self.pg_interfaces)
7193 capture = self.pg1.get_capture(len(pkts))
7194 for packet in capture:
7196 self.assertEqual(packet[IPv6].src, self.nat_addr)
7197 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7198 self.assert_packet_checksums_valid(packet)
7200 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7206 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7209 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7210 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7213 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7214 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7217 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7218 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7219 GRE() / IP() / TCP())
7221 self.pg1.add_stream(pkts)
7222 self.pg_enable_capture(self.pg_interfaces)
7224 capture = self.pg0.get_capture(len(pkts))
7225 for packet in capture:
7227 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7228 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7229 self.assert_packet_checksums_valid(packet)
7231 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7234 sm = self.vapi.nat66_static_mapping_dump()
7235 self.assertEqual(len(sm), 1)
7236 self.assertEqual(sm[0].total_pkts, 8)
7238 def test_check_no_translate(self):
7239 """ NAT66 translate only when egress interface is outside interface """
7240 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7241 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7242 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7246 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7247 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7249 self.pg0.add_stream([p])
7250 self.pg_enable_capture(self.pg_interfaces)
7252 capture = self.pg1.get_capture(1)
7255 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7256 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7258 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7261 def clear_nat66(self):
7263 Clear NAT66 configuration.
7265 interfaces = self.vapi.nat66_interface_dump()
7266 for intf in interfaces:
7267 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7271 static_mappings = self.vapi.nat66_static_mapping_dump()
7272 for sm in static_mappings:
7273 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7274 sm.external_ip_address,
7279 super(TestNAT66, self).tearDown()
7280 if not self.vpp_dead:
7281 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7282 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7286 if __name__ == '__main__':
7287 unittest.main(testRunner=VppTestRunner)