9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
140 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
141 local_port=0, external_port=0, vrf_id=0,
142 is_add=1, external_sw_if_index=0xFFFFFFFF,
143 proto=0, twice_nat=0, self_twice_nat=0,
144 out2in_only=0, tag=""):
146 Add/delete NAT44 static mapping
148 :param local_ip: Local IP address
149 :param external_ip: External IP address
150 :param local_port: Local port number (Optional)
151 :param external_port: External port number (Optional)
152 :param vrf_id: VRF ID (Default 0)
153 :param is_add: 1 if add, 0 if delete (Default add)
154 :param external_sw_if_index: External interface instead of IP address
155 :param proto: IP protocol (Mandatory if port specified)
156 :param twice_nat: 1 if translate external host address and port
157 :param self_twice_nat: 1 if translate external host address and port
158 whenever external host address equals
159 local address of internal host
160 :param out2in_only: if 1 rule is matching only out2in direction
161 :param tag: Opaque string tag
164 if local_port and external_port:
166 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
167 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
168 self.vapi.nat44_add_del_static_mapping(
171 external_sw_if_index,
183 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
185 Add/delete NAT44 address
187 :param ip: IP address
188 :param is_add: 1 if add, 0 if delete (Default add)
189 :param twice_nat: twice NAT address for extenal hosts
191 nat_addr = socket.inet_pton(socket.AF_INET, ip)
192 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
196 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
198 Create packet stream for inside network
200 :param in_if: Inside interface
201 :param out_if: Outside interface
202 :param dst_ip: Destination address
203 :param ttl: TTL of generated packets
206 dst_ip = out_if.remote_ip4
210 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
211 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
212 TCP(sport=self.tcp_port_in, dport=20))
216 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
217 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
218 UDP(sport=self.udp_port_in, dport=20))
222 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
223 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
224 ICMP(id=self.icmp_id_in, type='echo-request'))
229 def compose_ip6(self, ip4, pref, plen):
231 Compose IPv4-embedded IPv6 addresses
233 :param ip4: IPv4 address
234 :param pref: IPv6 prefix
235 :param plen: IPv6 prefix length
236 :returns: IPv4-embedded IPv6 addresses
238 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
239 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
254 pref_n[10] = ip4_n[3]
258 pref_n[10] = ip4_n[2]
259 pref_n[11] = ip4_n[3]
262 pref_n[10] = ip4_n[1]
263 pref_n[11] = ip4_n[2]
264 pref_n[12] = ip4_n[3]
266 pref_n[12] = ip4_n[0]
267 pref_n[13] = ip4_n[1]
268 pref_n[14] = ip4_n[2]
269 pref_n[15] = ip4_n[3]
270 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
272 def extract_ip4(self, ip6, plen):
274 Extract IPv4 address embedded in IPv6 addresses
276 :param ip6: IPv6 address
277 :param plen: IPv6 prefix length
278 :returns: extracted IPv4 address
280 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
312 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
314 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
316 Create IPv6 packet stream for inside network
318 :param in_if: Inside interface
319 :param out_if: Outside interface
320 :param ttl: Hop Limit of generated packets
321 :param pref: NAT64 prefix
322 :param plen: NAT64 prefix length
326 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
328 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
331 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
332 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
333 TCP(sport=self.tcp_port_in, dport=20))
337 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
338 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
339 UDP(sport=self.udp_port_in, dport=20))
343 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
344 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
345 ICMPv6EchoRequest(id=self.icmp_id_in))
350 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
351 use_inside_ports=False):
353 Create packet stream for outside network
355 :param out_if: Outside interface
356 :param dst_ip: Destination IP address (Default use global NAT address)
357 :param ttl: TTL of generated packets
358 :param use_inside_ports: Use inside NAT ports as destination ports
359 instead of outside ports
362 dst_ip = self.nat_addr
363 if not use_inside_ports:
364 tcp_port = self.tcp_port_out
365 udp_port = self.udp_port_out
366 icmp_id = self.icmp_id_out
368 tcp_port = self.tcp_port_in
369 udp_port = self.udp_port_in
370 icmp_id = self.icmp_id_in
373 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
374 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
375 TCP(dport=tcp_port, sport=20))
379 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
380 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
381 UDP(dport=udp_port, sport=20))
385 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
386 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
387 ICMP(id=icmp_id, type='echo-reply'))
392 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
394 Create packet stream for outside network
396 :param out_if: Outside interface
397 :param dst_ip: Destination IP address (Default use global NAT address)
398 :param hl: HL of generated packets
402 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
403 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
404 TCP(dport=self.tcp_port_out, sport=20))
408 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
409 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
410 UDP(dport=self.udp_port_out, sport=20))
414 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
415 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
416 ICMPv6EchoReply(id=self.icmp_id_out))
421 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
422 packet_num=3, dst_ip=None, is_ip6=False):
424 Verify captured packets on outside network
426 :param capture: Captured packets
427 :param nat_ip: Translated IP address (Default use global NAT address)
428 :param same_port: Sorce port number is not translated (Default False)
429 :param packet_num: Expected number of packets (Default 3)
430 :param dst_ip: Destination IP address (Default do not verify)
431 :param is_ip6: If L3 protocol is IPv6 (Default False)
435 ICMP46 = ICMPv6EchoRequest
440 nat_ip = self.nat_addr
441 self.assertEqual(packet_num, len(capture))
442 for packet in capture:
445 self.assert_packet_checksums_valid(packet)
446 self.assertEqual(packet[IP46].src, nat_ip)
447 if dst_ip is not None:
448 self.assertEqual(packet[IP46].dst, dst_ip)
449 if packet.haslayer(TCP):
451 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
454 packet[TCP].sport, self.tcp_port_in)
455 self.tcp_port_out = packet[TCP].sport
456 self.assert_packet_checksums_valid(packet)
457 elif packet.haslayer(UDP):
459 self.assertEqual(packet[UDP].sport, self.udp_port_in)
462 packet[UDP].sport, self.udp_port_in)
463 self.udp_port_out = packet[UDP].sport
466 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
468 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
469 self.icmp_id_out = packet[ICMP46].id
470 self.assert_packet_checksums_valid(packet)
472 self.logger.error(ppp("Unexpected or invalid packet "
473 "(outside network):", packet))
476 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
477 packet_num=3, dst_ip=None):
479 Verify captured packets on outside network
481 :param capture: Captured packets
482 :param nat_ip: Translated IP address
483 :param same_port: Sorce port number is not translated (Default False)
484 :param packet_num: Expected number of packets (Default 3)
485 :param dst_ip: Destination IP address (Default do not verify)
487 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
490 def verify_capture_in(self, capture, in_if, packet_num=3):
492 Verify captured packets on inside network
494 :param capture: Captured packets
495 :param in_if: Inside interface
496 :param packet_num: Expected number of packets (Default 3)
498 self.assertEqual(packet_num, len(capture))
499 for packet in capture:
501 self.assert_packet_checksums_valid(packet)
502 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
503 if packet.haslayer(TCP):
504 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
505 elif packet.haslayer(UDP):
506 self.assertEqual(packet[UDP].dport, self.udp_port_in)
508 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
510 self.logger.error(ppp("Unexpected or invalid packet "
511 "(inside network):", packet))
514 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
516 Verify captured IPv6 packets on inside network
518 :param capture: Captured packets
519 :param src_ip: Source IP
520 :param dst_ip: Destination IP address
521 :param packet_num: Expected number of packets (Default 3)
523 self.assertEqual(packet_num, len(capture))
524 for packet in capture:
526 self.assertEqual(packet[IPv6].src, src_ip)
527 self.assertEqual(packet[IPv6].dst, dst_ip)
528 self.assert_packet_checksums_valid(packet)
529 if packet.haslayer(TCP):
530 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
531 elif packet.haslayer(UDP):
532 self.assertEqual(packet[UDP].dport, self.udp_port_in)
534 self.assertEqual(packet[ICMPv6EchoReply].id,
537 self.logger.error(ppp("Unexpected or invalid packet "
538 "(inside network):", packet))
541 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
543 Verify captured packet that don't have to be translated
545 :param capture: Captured packets
546 :param ingress_if: Ingress interface
547 :param egress_if: Egress interface
549 for packet in capture:
551 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
552 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
553 if packet.haslayer(TCP):
554 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
555 elif packet.haslayer(UDP):
556 self.assertEqual(packet[UDP].sport, self.udp_port_in)
558 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
560 self.logger.error(ppp("Unexpected or invalid packet "
561 "(inside network):", packet))
564 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
565 packet_num=3, icmp_type=11):
567 Verify captured packets with ICMP errors on outside network
569 :param capture: Captured packets
570 :param src_ip: Translated IP address or IP address of VPP
571 (Default use global NAT address)
572 :param packet_num: Expected number of packets (Default 3)
573 :param icmp_type: Type of error ICMP packet
574 we are expecting (Default 11)
577 src_ip = self.nat_addr
578 self.assertEqual(packet_num, len(capture))
579 for packet in capture:
581 self.assertEqual(packet[IP].src, src_ip)
582 self.assertTrue(packet.haslayer(ICMP))
584 self.assertEqual(icmp.type, icmp_type)
585 self.assertTrue(icmp.haslayer(IPerror))
586 inner_ip = icmp[IPerror]
587 if inner_ip.haslayer(TCPerror):
588 self.assertEqual(inner_ip[TCPerror].dport,
590 elif inner_ip.haslayer(UDPerror):
591 self.assertEqual(inner_ip[UDPerror].dport,
594 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
596 self.logger.error(ppp("Unexpected or invalid packet "
597 "(outside network):", packet))
600 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
603 Verify captured packets with ICMP errors on inside network
605 :param capture: Captured packets
606 :param in_if: Inside interface
607 :param packet_num: Expected number of packets (Default 3)
608 :param icmp_type: Type of error ICMP packet
609 we are expecting (Default 11)
611 self.assertEqual(packet_num, len(capture))
612 for packet in capture:
614 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
615 self.assertTrue(packet.haslayer(ICMP))
617 self.assertEqual(icmp.type, icmp_type)
618 self.assertTrue(icmp.haslayer(IPerror))
619 inner_ip = icmp[IPerror]
620 if inner_ip.haslayer(TCPerror):
621 self.assertEqual(inner_ip[TCPerror].sport,
623 elif inner_ip.haslayer(UDPerror):
624 self.assertEqual(inner_ip[UDPerror].sport,
627 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
629 self.logger.error(ppp("Unexpected or invalid packet "
630 "(inside network):", packet))
633 def create_stream_frag(self, src_if, dst, sport, dport, data):
635 Create fragmented packet stream
637 :param src_if: Source interface
638 :param dst: Destination IPv4 address
639 :param sport: Source TCP port
640 :param dport: Destination TCP port
641 :param data: Payload data
644 id = random.randint(0, 65535)
645 p = (IP(src=src_if.remote_ip4, dst=dst) /
646 TCP(sport=sport, dport=dport) /
648 p = p.__class__(str(p))
649 chksum = p['TCP'].chksum
651 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
652 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
653 TCP(sport=sport, dport=dport, chksum=chksum) /
656 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
657 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
658 proto=IP_PROTOS.tcp) /
661 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
662 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
668 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
669 pref=None, plen=0, frag_size=128):
671 Create fragmented packet stream
673 :param src_if: Source interface
674 :param dst: Destination IPv4 address
675 :param sport: Source TCP port
676 :param dport: Destination TCP port
677 :param data: Payload data
678 :param pref: NAT64 prefix
679 :param plen: NAT64 prefix length
680 :param fragsize: size of fragments
684 dst_ip6 = ''.join(['64:ff9b::', dst])
686 dst_ip6 = self.compose_ip6(dst, pref, plen)
688 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
689 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
690 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
691 TCP(sport=sport, dport=dport) /
694 return fragment6(p, frag_size)
696 def reass_frags_and_verify(self, frags, src, dst):
698 Reassemble and verify fragmented packet
700 :param frags: Captured fragments
701 :param src: Source IPv4 address to verify
702 :param dst: Destination IPv4 address to verify
704 :returns: Reassembled IPv4 packet
706 buffer = StringIO.StringIO()
708 self.assertEqual(p[IP].src, src)
709 self.assertEqual(p[IP].dst, dst)
710 self.assert_ip_checksum_valid(p)
711 buffer.seek(p[IP].frag * 8)
712 buffer.write(p[IP].payload)
713 ip = frags[0].getlayer(IP)
714 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
715 proto=frags[0][IP].proto)
716 if ip.proto == IP_PROTOS.tcp:
717 p = (ip / TCP(buffer.getvalue()))
718 self.assert_tcp_checksum_valid(p)
719 elif ip.proto == IP_PROTOS.udp:
720 p = (ip / UDP(buffer.getvalue()))
723 def reass_frags_and_verify_ip6(self, frags, src, dst):
725 Reassemble and verify fragmented packet
727 :param frags: Captured fragments
728 :param src: Source IPv6 address to verify
729 :param dst: Destination IPv6 address to verify
731 :returns: Reassembled IPv6 packet
733 buffer = StringIO.StringIO()
735 self.assertEqual(p[IPv6].src, src)
736 self.assertEqual(p[IPv6].dst, dst)
737 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
738 buffer.write(p[IPv6ExtHdrFragment].payload)
739 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
740 nh=frags[0][IPv6ExtHdrFragment].nh)
741 if ip.nh == IP_PROTOS.tcp:
742 p = (ip / TCP(buffer.getvalue()))
743 elif ip.nh == IP_PROTOS.udp:
744 p = (ip / UDP(buffer.getvalue()))
745 self.assert_packet_checksums_valid(p)
748 def initiate_tcp_session(self, in_if, out_if):
750 Initiates TCP session
752 :param in_if: Inside interface
753 :param out_if: Outside interface
757 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
758 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
759 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
762 self.pg_enable_capture(self.pg_interfaces)
764 capture = out_if.get_capture(1)
766 self.tcp_port_out = p[TCP].sport
768 # SYN + ACK packet out->in
769 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
770 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
771 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
774 self.pg_enable_capture(self.pg_interfaces)
779 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
780 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
781 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
784 self.pg_enable_capture(self.pg_interfaces)
786 out_if.get_capture(1)
789 self.logger.error("TCP 3 way handshake failed")
792 def verify_ipfix_nat44_ses(self, data):
794 Verify IPFIX NAT44 session create/delete event
796 :param data: Decoded IPFIX data records
798 nat44_ses_create_num = 0
799 nat44_ses_delete_num = 0
800 self.assertEqual(6, len(data))
803 self.assertIn(ord(record[230]), [4, 5])
804 if ord(record[230]) == 4:
805 nat44_ses_create_num += 1
807 nat44_ses_delete_num += 1
809 self.assertEqual(self.pg0.remote_ip4n, record[8])
810 # postNATSourceIPv4Address
811 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
814 self.assertEqual(struct.pack("!I", 0), record[234])
815 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
816 if IP_PROTOS.icmp == ord(record[4]):
817 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
818 self.assertEqual(struct.pack("!H", self.icmp_id_out),
820 elif IP_PROTOS.tcp == ord(record[4]):
821 self.assertEqual(struct.pack("!H", self.tcp_port_in),
823 self.assertEqual(struct.pack("!H", self.tcp_port_out),
825 elif IP_PROTOS.udp == ord(record[4]):
826 self.assertEqual(struct.pack("!H", self.udp_port_in),
828 self.assertEqual(struct.pack("!H", self.udp_port_out),
831 self.fail("Invalid protocol")
832 self.assertEqual(3, nat44_ses_create_num)
833 self.assertEqual(3, nat44_ses_delete_num)
835 def verify_ipfix_addr_exhausted(self, data):
837 Verify IPFIX NAT addresses event
839 :param data: Decoded IPFIX data records
841 self.assertEqual(1, len(data))
844 self.assertEqual(ord(record[230]), 3)
846 self.assertEqual(struct.pack("!I", 0), record[283])
848 def verify_ipfix_max_sessions(self, data, limit):
850 Verify IPFIX maximum session entries exceeded event
852 :param data: Decoded IPFIX data records
853 :param limit: Number of maximum session entries that can be created.
855 self.assertEqual(1, len(data))
858 self.assertEqual(ord(record[230]), 13)
859 # natQuotaExceededEvent
860 self.assertEqual(struct.pack("I", 1), record[466])
862 self.assertEqual(struct.pack("I", limit), record[471])
864 def verify_ipfix_max_bibs(self, data, limit):
866 Verify IPFIX maximum BIB entries exceeded event
868 :param data: Decoded IPFIX data records
869 :param limit: Number of maximum BIB entries that can be created.
871 self.assertEqual(1, len(data))
874 self.assertEqual(ord(record[230]), 13)
875 # natQuotaExceededEvent
876 self.assertEqual(struct.pack("I", 2), record[466])
878 self.assertEqual(struct.pack("I", limit), record[472])
880 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
882 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
884 :param data: Decoded IPFIX data records
885 :param limit: Number of maximum fragments pending reassembly
886 :param src_addr: IPv6 source address
888 self.assertEqual(1, len(data))
891 self.assertEqual(ord(record[230]), 13)
892 # natQuotaExceededEvent
893 self.assertEqual(struct.pack("I", 5), record[466])
894 # maxFragmentsPendingReassembly
895 self.assertEqual(struct.pack("I", limit), record[475])
897 self.assertEqual(src_addr, record[27])
899 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
901 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
903 :param data: Decoded IPFIX data records
904 :param limit: Number of maximum fragments pending reassembly
905 :param src_addr: IPv4 source address
907 self.assertEqual(1, len(data))
910 self.assertEqual(ord(record[230]), 13)
911 # natQuotaExceededEvent
912 self.assertEqual(struct.pack("I", 5), record[466])
913 # maxFragmentsPendingReassembly
914 self.assertEqual(struct.pack("I", limit), record[475])
916 self.assertEqual(src_addr, record[8])
918 def verify_ipfix_bib(self, data, is_create, src_addr):
920 Verify IPFIX NAT64 BIB create and delete events
922 :param data: Decoded IPFIX data records
923 :param is_create: Create event if nonzero value otherwise delete event
924 :param src_addr: IPv6 source address
926 self.assertEqual(1, len(data))
930 self.assertEqual(ord(record[230]), 10)
932 self.assertEqual(ord(record[230]), 11)
934 self.assertEqual(src_addr, record[27])
935 # postNATSourceIPv4Address
936 self.assertEqual(self.nat_addr_n, record[225])
938 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
940 self.assertEqual(struct.pack("!I", 0), record[234])
941 # sourceTransportPort
942 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
943 # postNAPTSourceTransportPort
944 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
946 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
949 Verify IPFIX NAT64 session create and delete events
951 :param data: Decoded IPFIX data records
952 :param is_create: Create event if nonzero value otherwise delete event
953 :param src_addr: IPv6 source address
954 :param dst_addr: IPv4 destination address
955 :param dst_port: destination TCP port
957 self.assertEqual(1, len(data))
961 self.assertEqual(ord(record[230]), 6)
963 self.assertEqual(ord(record[230]), 7)
965 self.assertEqual(src_addr, record[27])
966 # destinationIPv6Address
967 self.assertEqual(socket.inet_pton(socket.AF_INET6,
968 self.compose_ip6(dst_addr,
972 # postNATSourceIPv4Address
973 self.assertEqual(self.nat_addr_n, record[225])
974 # postNATDestinationIPv4Address
975 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
978 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
980 self.assertEqual(struct.pack("!I", 0), record[234])
981 # sourceTransportPort
982 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
983 # postNAPTSourceTransportPort
984 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
985 # destinationTransportPort
986 self.assertEqual(struct.pack("!H", dst_port), record[11])
987 # postNAPTDestinationTransportPort
988 self.assertEqual(struct.pack("!H", dst_port), record[228])
991 class TestNAT44(MethodHolder):
992 """ NAT44 Test Cases """
996 super(TestNAT44, cls).setUpClass()
997 cls.vapi.cli("set log class nat level debug")
1000 cls.tcp_port_in = 6303
1001 cls.tcp_port_out = 6303
1002 cls.udp_port_in = 6304
1003 cls.udp_port_out = 6304
1004 cls.icmp_id_in = 6305
1005 cls.icmp_id_out = 6305
1006 cls.nat_addr = '10.0.0.3'
1007 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1008 cls.ipfix_src_port = 4739
1009 cls.ipfix_domain_id = 1
1010 cls.tcp_external_port = 80
1012 cls.create_pg_interfaces(range(10))
1013 cls.interfaces = list(cls.pg_interfaces[0:4])
1015 for i in cls.interfaces:
1020 cls.pg0.generate_remote_hosts(3)
1021 cls.pg0.configure_ipv4_neighbors()
1023 cls.pg1.generate_remote_hosts(1)
1024 cls.pg1.configure_ipv4_neighbors()
1026 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1027 cls.vapi.ip_table_add_del(10, is_add=1)
1028 cls.vapi.ip_table_add_del(20, is_add=1)
1030 cls.pg4._local_ip4 = "172.16.255.1"
1031 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1032 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1033 cls.pg4.set_table_ip4(10)
1034 cls.pg5._local_ip4 = "172.17.255.3"
1035 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1036 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1037 cls.pg5.set_table_ip4(10)
1038 cls.pg6._local_ip4 = "172.16.255.1"
1039 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1040 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1041 cls.pg6.set_table_ip4(20)
1042 for i in cls.overlapping_interfaces:
1050 cls.pg9.generate_remote_hosts(2)
1051 cls.pg9.config_ip4()
1052 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1053 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1057 cls.pg9.resolve_arp()
1058 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1059 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1060 cls.pg9.resolve_arp()
1063 super(TestNAT44, cls).tearDownClass()
1066 def test_dynamic(self):
1067 """ NAT44 dynamic translation test """
1069 self.nat44_add_address(self.nat_addr)
1070 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1071 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075 pkts = self.create_stream_in(self.pg0, self.pg1)
1076 self.pg0.add_stream(pkts)
1077 self.pg_enable_capture(self.pg_interfaces)
1079 capture = self.pg1.get_capture(len(pkts))
1080 self.verify_capture_out(capture)
1083 pkts = self.create_stream_out(self.pg1)
1084 self.pg1.add_stream(pkts)
1085 self.pg_enable_capture(self.pg_interfaces)
1087 capture = self.pg0.get_capture(len(pkts))
1088 self.verify_capture_in(capture, self.pg0)
1090 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1091 """ NAT44 handling of client packets with TTL=1 """
1093 self.nat44_add_address(self.nat_addr)
1094 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1095 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1098 # Client side - generate traffic
1099 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1100 self.pg0.add_stream(pkts)
1101 self.pg_enable_capture(self.pg_interfaces)
1104 # Client side - verify ICMP type 11 packets
1105 capture = self.pg0.get_capture(len(pkts))
1106 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1108 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1109 """ NAT44 handling of server packets with TTL=1 """
1111 self.nat44_add_address(self.nat_addr)
1112 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1113 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1116 # Client side - create sessions
1117 pkts = self.create_stream_in(self.pg0, self.pg1)
1118 self.pg0.add_stream(pkts)
1119 self.pg_enable_capture(self.pg_interfaces)
1122 # Server side - generate traffic
1123 capture = self.pg1.get_capture(len(pkts))
1124 self.verify_capture_out(capture)
1125 pkts = self.create_stream_out(self.pg1, ttl=1)
1126 self.pg1.add_stream(pkts)
1127 self.pg_enable_capture(self.pg_interfaces)
1130 # Server side - verify ICMP type 11 packets
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out_with_icmp_errors(capture,
1133 src_ip=self.pg1.local_ip4)
1135 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1136 """ NAT44 handling of error responses to client packets with TTL=2 """
1138 self.nat44_add_address(self.nat_addr)
1139 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1140 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1143 # Client side - generate traffic
1144 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1145 self.pg0.add_stream(pkts)
1146 self.pg_enable_capture(self.pg_interfaces)
1149 # Server side - simulate ICMP type 11 response
1150 capture = self.pg1.get_capture(len(pkts))
1151 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1152 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1153 ICMP(type=11) / packet[IP] for packet in capture]
1154 self.pg1.add_stream(pkts)
1155 self.pg_enable_capture(self.pg_interfaces)
1158 # Client side - verify ICMP type 11 packets
1159 capture = self.pg0.get_capture(len(pkts))
1160 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1162 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1163 """ NAT44 handling of error responses to server packets with TTL=2 """
1165 self.nat44_add_address(self.nat_addr)
1166 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1167 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1170 # Client side - create sessions
1171 pkts = self.create_stream_in(self.pg0, self.pg1)
1172 self.pg0.add_stream(pkts)
1173 self.pg_enable_capture(self.pg_interfaces)
1176 # Server side - generate traffic
1177 capture = self.pg1.get_capture(len(pkts))
1178 self.verify_capture_out(capture)
1179 pkts = self.create_stream_out(self.pg1, ttl=2)
1180 self.pg1.add_stream(pkts)
1181 self.pg_enable_capture(self.pg_interfaces)
1184 # Client side - simulate ICMP type 11 response
1185 capture = self.pg0.get_capture(len(pkts))
1186 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1187 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1188 ICMP(type=11) / packet[IP] for packet in capture]
1189 self.pg0.add_stream(pkts)
1190 self.pg_enable_capture(self.pg_interfaces)
1193 # Server side - verify ICMP type 11 packets
1194 capture = self.pg1.get_capture(len(pkts))
1195 self.verify_capture_out_with_icmp_errors(capture)
1197 def test_ping_out_interface_from_outside(self):
1198 """ Ping NAT44 out interface from outside network """
1200 self.nat44_add_address(self.nat_addr)
1201 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1202 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1206 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1207 ICMP(id=self.icmp_id_out, type='echo-request'))
1209 self.pg1.add_stream(pkts)
1210 self.pg_enable_capture(self.pg_interfaces)
1212 capture = self.pg1.get_capture(len(pkts))
1213 self.assertEqual(1, len(capture))
1216 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1217 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1218 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1219 self.assertEqual(packet[ICMP].type, 0) # echo reply
1221 self.logger.error(ppp("Unexpected or invalid packet "
1222 "(outside network):", packet))
1225 def test_ping_internal_host_from_outside(self):
1226 """ Ping internal host from outside network """
1228 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1229 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1230 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1234 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1235 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1236 ICMP(id=self.icmp_id_out, type='echo-request'))
1237 self.pg1.add_stream(pkt)
1238 self.pg_enable_capture(self.pg_interfaces)
1240 capture = self.pg0.get_capture(1)
1241 self.verify_capture_in(capture, self.pg0, packet_num=1)
1242 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1245 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1246 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1247 ICMP(id=self.icmp_id_in, type='echo-reply'))
1248 self.pg0.add_stream(pkt)
1249 self.pg_enable_capture(self.pg_interfaces)
1251 capture = self.pg1.get_capture(1)
1252 self.verify_capture_out(capture, same_port=True, packet_num=1)
1253 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1255 def test_forwarding(self):
1256 """ NAT44 forwarding test """
1258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 self.vapi.nat44_forwarding_enable_disable(1)
1263 real_ip = self.pg0.remote_ip4n
1264 alias_ip = self.nat_addr_n
1265 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1266 external_ip=alias_ip)
1269 # static mapping match
1271 pkts = self.create_stream_out(self.pg1)
1272 self.pg1.add_stream(pkts)
1273 self.pg_enable_capture(self.pg_interfaces)
1275 capture = self.pg0.get_capture(len(pkts))
1276 self.verify_capture_in(capture, self.pg0)
1278 pkts = self.create_stream_in(self.pg0, self.pg1)
1279 self.pg0.add_stream(pkts)
1280 self.pg_enable_capture(self.pg_interfaces)
1282 capture = self.pg1.get_capture(len(pkts))
1283 self.verify_capture_out(capture, same_port=True)
1285 # no static mapping match
1287 host0 = self.pg0.remote_hosts[0]
1288 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1290 pkts = self.create_stream_out(self.pg1,
1291 dst_ip=self.pg0.remote_ip4,
1292 use_inside_ports=True)
1293 self.pg1.add_stream(pkts)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(len(pkts))
1297 self.verify_capture_in(capture, self.pg0)
1299 pkts = self.create_stream_in(self.pg0, self.pg1)
1300 self.pg0.add_stream(pkts)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(len(pkts))
1304 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1307 self.pg0.remote_hosts[0] = host0
1310 self.vapi.nat44_forwarding_enable_disable(0)
1311 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1312 external_ip=alias_ip,
1315 def test_static_in(self):
1316 """ 1:1 NAT initialized from inside network """
1318 nat_ip = "10.0.0.10"
1319 self.tcp_port_out = 6303
1320 self.udp_port_out = 6304
1321 self.icmp_id_out = 6305
1323 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1324 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1325 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1327 sm = self.vapi.nat44_static_mapping_dump()
1328 self.assertEqual(len(sm), 1)
1329 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1330 self.assertEqual(sm[0].protocol, 0)
1331 self.assertEqual(sm[0].local_port, 0)
1332 self.assertEqual(sm[0].external_port, 0)
1335 pkts = self.create_stream_in(self.pg0, self.pg1)
1336 self.pg0.add_stream(pkts)
1337 self.pg_enable_capture(self.pg_interfaces)
1339 capture = self.pg1.get_capture(len(pkts))
1340 self.verify_capture_out(capture, nat_ip, True)
1343 pkts = self.create_stream_out(self.pg1, nat_ip)
1344 self.pg1.add_stream(pkts)
1345 self.pg_enable_capture(self.pg_interfaces)
1347 capture = self.pg0.get_capture(len(pkts))
1348 self.verify_capture_in(capture, self.pg0)
1350 def test_static_out(self):
1351 """ 1:1 NAT initialized from outside network """
1353 nat_ip = "10.0.0.20"
1354 self.tcp_port_out = 6303
1355 self.udp_port_out = 6304
1356 self.icmp_id_out = 6305
1359 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1360 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1361 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1363 sm = self.vapi.nat44_static_mapping_dump()
1364 self.assertEqual(len(sm), 1)
1365 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1368 pkts = self.create_stream_out(self.pg1, nat_ip)
1369 self.pg1.add_stream(pkts)
1370 self.pg_enable_capture(self.pg_interfaces)
1372 capture = self.pg0.get_capture(len(pkts))
1373 self.verify_capture_in(capture, self.pg0)
1376 pkts = self.create_stream_in(self.pg0, self.pg1)
1377 self.pg0.add_stream(pkts)
1378 self.pg_enable_capture(self.pg_interfaces)
1380 capture = self.pg1.get_capture(len(pkts))
1381 self.verify_capture_out(capture, nat_ip, True)
1383 def test_static_with_port_in(self):
1384 """ 1:1 NAPT initialized from inside network """
1386 self.tcp_port_out = 3606
1387 self.udp_port_out = 3607
1388 self.icmp_id_out = 3608
1390 self.nat44_add_address(self.nat_addr)
1391 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1392 self.tcp_port_in, self.tcp_port_out,
1393 proto=IP_PROTOS.tcp)
1394 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1395 self.udp_port_in, self.udp_port_out,
1396 proto=IP_PROTOS.udp)
1397 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1398 self.icmp_id_in, self.icmp_id_out,
1399 proto=IP_PROTOS.icmp)
1400 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1401 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1405 pkts = self.create_stream_in(self.pg0, self.pg1)
1406 self.pg0.add_stream(pkts)
1407 self.pg_enable_capture(self.pg_interfaces)
1409 capture = self.pg1.get_capture(len(pkts))
1410 self.verify_capture_out(capture)
1413 pkts = self.create_stream_out(self.pg1)
1414 self.pg1.add_stream(pkts)
1415 self.pg_enable_capture(self.pg_interfaces)
1417 capture = self.pg0.get_capture(len(pkts))
1418 self.verify_capture_in(capture, self.pg0)
1420 def test_static_with_port_out(self):
1421 """ 1:1 NAPT initialized from outside network """
1423 self.tcp_port_out = 30606
1424 self.udp_port_out = 30607
1425 self.icmp_id_out = 30608
1427 self.nat44_add_address(self.nat_addr)
1428 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1429 self.tcp_port_in, self.tcp_port_out,
1430 proto=IP_PROTOS.tcp)
1431 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1432 self.udp_port_in, self.udp_port_out,
1433 proto=IP_PROTOS.udp)
1434 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1435 self.icmp_id_in, self.icmp_id_out,
1436 proto=IP_PROTOS.icmp)
1437 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1438 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1442 pkts = self.create_stream_out(self.pg1)
1443 self.pg1.add_stream(pkts)
1444 self.pg_enable_capture(self.pg_interfaces)
1446 capture = self.pg0.get_capture(len(pkts))
1447 self.verify_capture_in(capture, self.pg0)
1450 pkts = self.create_stream_in(self.pg0, self.pg1)
1451 self.pg0.add_stream(pkts)
1452 self.pg_enable_capture(self.pg_interfaces)
1454 capture = self.pg1.get_capture(len(pkts))
1455 self.verify_capture_out(capture)
1457 def test_static_vrf_aware(self):
1458 """ 1:1 NAT VRF awareness """
1460 nat_ip1 = "10.0.0.30"
1461 nat_ip2 = "10.0.0.40"
1462 self.tcp_port_out = 6303
1463 self.udp_port_out = 6304
1464 self.icmp_id_out = 6305
1466 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1468 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1470 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1472 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1473 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1475 # inside interface VRF match NAT44 static mapping VRF
1476 pkts = self.create_stream_in(self.pg4, self.pg3)
1477 self.pg4.add_stream(pkts)
1478 self.pg_enable_capture(self.pg_interfaces)
1480 capture = self.pg3.get_capture(len(pkts))
1481 self.verify_capture_out(capture, nat_ip1, True)
1483 # inside interface VRF don't match NAT44 static mapping VRF (packets
1485 pkts = self.create_stream_in(self.pg0, self.pg3)
1486 self.pg0.add_stream(pkts)
1487 self.pg_enable_capture(self.pg_interfaces)
1489 self.pg3.assert_nothing_captured()
1491 def test_dynamic_to_static(self):
1492 """ Switch from dynamic translation to 1:1NAT """
1493 nat_ip = "10.0.0.10"
1494 self.tcp_port_out = 6303
1495 self.udp_port_out = 6304
1496 self.icmp_id_out = 6305
1498 self.nat44_add_address(self.nat_addr)
1499 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1500 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1504 pkts = self.create_stream_in(self.pg0, self.pg1)
1505 self.pg0.add_stream(pkts)
1506 self.pg_enable_capture(self.pg_interfaces)
1508 capture = self.pg1.get_capture(len(pkts))
1509 self.verify_capture_out(capture)
1512 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1513 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1514 self.assertEqual(len(sessions), 0)
1515 pkts = self.create_stream_in(self.pg0, self.pg1)
1516 self.pg0.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 capture = self.pg1.get_capture(len(pkts))
1520 self.verify_capture_out(capture, nat_ip, True)
1522 def test_identity_nat(self):
1523 """ Identity NAT """
1525 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1527 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1530 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1531 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1532 TCP(sport=12345, dport=56789))
1533 self.pg1.add_stream(p)
1534 self.pg_enable_capture(self.pg_interfaces)
1536 capture = self.pg0.get_capture(1)
1541 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1542 self.assertEqual(ip.src, self.pg1.remote_ip4)
1543 self.assertEqual(tcp.dport, 56789)
1544 self.assertEqual(tcp.sport, 12345)
1545 self.assert_packet_checksums_valid(p)
1547 self.logger.error(ppp("Unexpected or invalid packet:", p))
1550 def test_multiple_inside_interfaces(self):
1551 """ NAT44 multiple non-overlapping address space inside interfaces """
1553 self.nat44_add_address(self.nat_addr)
1554 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1555 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1556 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1559 # between two NAT44 inside interfaces (no translation)
1560 pkts = self.create_stream_in(self.pg0, self.pg1)
1561 self.pg0.add_stream(pkts)
1562 self.pg_enable_capture(self.pg_interfaces)
1564 capture = self.pg1.get_capture(len(pkts))
1565 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1567 # from NAT44 inside to interface without NAT44 feature (no translation)
1568 pkts = self.create_stream_in(self.pg0, self.pg2)
1569 self.pg0.add_stream(pkts)
1570 self.pg_enable_capture(self.pg_interfaces)
1572 capture = self.pg2.get_capture(len(pkts))
1573 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1575 # in2out 1st interface
1576 pkts = self.create_stream_in(self.pg0, self.pg3)
1577 self.pg0.add_stream(pkts)
1578 self.pg_enable_capture(self.pg_interfaces)
1580 capture = self.pg3.get_capture(len(pkts))
1581 self.verify_capture_out(capture)
1583 # out2in 1st interface
1584 pkts = self.create_stream_out(self.pg3)
1585 self.pg3.add_stream(pkts)
1586 self.pg_enable_capture(self.pg_interfaces)
1588 capture = self.pg0.get_capture(len(pkts))
1589 self.verify_capture_in(capture, self.pg0)
1591 # in2out 2nd interface
1592 pkts = self.create_stream_in(self.pg1, self.pg3)
1593 self.pg1.add_stream(pkts)
1594 self.pg_enable_capture(self.pg_interfaces)
1596 capture = self.pg3.get_capture(len(pkts))
1597 self.verify_capture_out(capture)
1599 # out2in 2nd interface
1600 pkts = self.create_stream_out(self.pg3)
1601 self.pg3.add_stream(pkts)
1602 self.pg_enable_capture(self.pg_interfaces)
1604 capture = self.pg1.get_capture(len(pkts))
1605 self.verify_capture_in(capture, self.pg1)
1607 def test_inside_overlapping_interfaces(self):
1608 """ NAT44 multiple inside interfaces with overlapping address space """
1610 static_nat_ip = "10.0.0.10"
1611 self.nat44_add_address(self.nat_addr)
1612 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1614 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1615 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1616 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1617 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1620 # between NAT44 inside interfaces with same VRF (no translation)
1621 pkts = self.create_stream_in(self.pg4, self.pg5)
1622 self.pg4.add_stream(pkts)
1623 self.pg_enable_capture(self.pg_interfaces)
1625 capture = self.pg5.get_capture(len(pkts))
1626 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1628 # between NAT44 inside interfaces with different VRF (hairpinning)
1629 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1630 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1631 TCP(sport=1234, dport=5678))
1632 self.pg4.add_stream(p)
1633 self.pg_enable_capture(self.pg_interfaces)
1635 capture = self.pg6.get_capture(1)
1640 self.assertEqual(ip.src, self.nat_addr)
1641 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1642 self.assertNotEqual(tcp.sport, 1234)
1643 self.assertEqual(tcp.dport, 5678)
1645 self.logger.error(ppp("Unexpected or invalid packet:", p))
1648 # in2out 1st interface
1649 pkts = self.create_stream_in(self.pg4, self.pg3)
1650 self.pg4.add_stream(pkts)
1651 self.pg_enable_capture(self.pg_interfaces)
1653 capture = self.pg3.get_capture(len(pkts))
1654 self.verify_capture_out(capture)
1656 # out2in 1st interface
1657 pkts = self.create_stream_out(self.pg3)
1658 self.pg3.add_stream(pkts)
1659 self.pg_enable_capture(self.pg_interfaces)
1661 capture = self.pg4.get_capture(len(pkts))
1662 self.verify_capture_in(capture, self.pg4)
1664 # in2out 2nd interface
1665 pkts = self.create_stream_in(self.pg5, self.pg3)
1666 self.pg5.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg3.get_capture(len(pkts))
1670 self.verify_capture_out(capture)
1672 # out2in 2nd interface
1673 pkts = self.create_stream_out(self.pg3)
1674 self.pg3.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 capture = self.pg5.get_capture(len(pkts))
1678 self.verify_capture_in(capture, self.pg5)
1681 addresses = self.vapi.nat44_address_dump()
1682 self.assertEqual(len(addresses), 1)
1683 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1684 self.assertEqual(len(sessions), 3)
1685 for session in sessions:
1686 self.assertFalse(session.is_static)
1687 self.assertEqual(session.inside_ip_address[0:4],
1688 self.pg5.remote_ip4n)
1689 self.assertEqual(session.outside_ip_address,
1690 addresses[0].ip_address)
1691 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1692 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1693 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1694 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1695 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1696 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1697 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1698 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1699 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1701 # in2out 3rd interface
1702 pkts = self.create_stream_in(self.pg6, self.pg3)
1703 self.pg6.add_stream(pkts)
1704 self.pg_enable_capture(self.pg_interfaces)
1706 capture = self.pg3.get_capture(len(pkts))
1707 self.verify_capture_out(capture, static_nat_ip, True)
1709 # out2in 3rd interface
1710 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1711 self.pg3.add_stream(pkts)
1712 self.pg_enable_capture(self.pg_interfaces)
1714 capture = self.pg6.get_capture(len(pkts))
1715 self.verify_capture_in(capture, self.pg6)
1717 # general user and session dump verifications
1718 users = self.vapi.nat44_user_dump()
1719 self.assertTrue(len(users) >= 3)
1720 addresses = self.vapi.nat44_address_dump()
1721 self.assertEqual(len(addresses), 1)
1723 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1725 for session in sessions:
1726 self.assertEqual(user.ip_address, session.inside_ip_address)
1727 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1728 self.assertTrue(session.protocol in
1729 [IP_PROTOS.tcp, IP_PROTOS.udp,
1731 self.assertFalse(session.ext_host_valid)
1734 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1735 self.assertTrue(len(sessions) >= 4)
1736 for session in sessions:
1737 self.assertFalse(session.is_static)
1738 self.assertEqual(session.inside_ip_address[0:4],
1739 self.pg4.remote_ip4n)
1740 self.assertEqual(session.outside_ip_address,
1741 addresses[0].ip_address)
1744 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1745 self.assertTrue(len(sessions) >= 3)
1746 for session in sessions:
1747 self.assertTrue(session.is_static)
1748 self.assertEqual(session.inside_ip_address[0:4],
1749 self.pg6.remote_ip4n)
1750 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1751 map(int, static_nat_ip.split('.')))
1752 self.assertTrue(session.inside_port in
1753 [self.tcp_port_in, self.udp_port_in,
1756 def test_hairpinning(self):
1757 """ NAT44 hairpinning - 1:1 NAPT """
1759 host = self.pg0.remote_hosts[0]
1760 server = self.pg0.remote_hosts[1]
1763 server_in_port = 5678
1764 server_out_port = 8765
1766 self.nat44_add_address(self.nat_addr)
1767 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1768 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1770 # add static mapping for server
1771 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1772 server_in_port, server_out_port,
1773 proto=IP_PROTOS.tcp)
1775 # send packet from host to server
1776 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1777 IP(src=host.ip4, dst=self.nat_addr) /
1778 TCP(sport=host_in_port, dport=server_out_port))
1779 self.pg0.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1787 self.assertEqual(ip.src, self.nat_addr)
1788 self.assertEqual(ip.dst, server.ip4)
1789 self.assertNotEqual(tcp.sport, host_in_port)
1790 self.assertEqual(tcp.dport, server_in_port)
1791 self.assert_packet_checksums_valid(p)
1792 host_out_port = tcp.sport
1794 self.logger.error(ppp("Unexpected or invalid packet:", p))
1797 # send reply from server to host
1798 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1799 IP(src=server.ip4, dst=self.nat_addr) /
1800 TCP(sport=server_in_port, dport=host_out_port))
1801 self.pg0.add_stream(p)
1802 self.pg_enable_capture(self.pg_interfaces)
1804 capture = self.pg0.get_capture(1)
1809 self.assertEqual(ip.src, self.nat_addr)
1810 self.assertEqual(ip.dst, host.ip4)
1811 self.assertEqual(tcp.sport, server_out_port)
1812 self.assertEqual(tcp.dport, host_in_port)
1813 self.assert_packet_checksums_valid(p)
1815 self.logger.error(ppp("Unexpected or invalid packet:", p))
1818 def test_hairpinning2(self):
1819 """ NAT44 hairpinning - 1:1 NAT"""
1821 server1_nat_ip = "10.0.0.10"
1822 server2_nat_ip = "10.0.0.11"
1823 host = self.pg0.remote_hosts[0]
1824 server1 = self.pg0.remote_hosts[1]
1825 server2 = self.pg0.remote_hosts[2]
1826 server_tcp_port = 22
1827 server_udp_port = 20
1829 self.nat44_add_address(self.nat_addr)
1830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1834 # add static mapping for servers
1835 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1836 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1840 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1841 IP(src=host.ip4, dst=server1_nat_ip) /
1842 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1844 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1845 IP(src=host.ip4, dst=server1_nat_ip) /
1846 UDP(sport=self.udp_port_in, dport=server_udp_port))
1848 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1849 IP(src=host.ip4, dst=server1_nat_ip) /
1850 ICMP(id=self.icmp_id_in, type='echo-request'))
1852 self.pg0.add_stream(pkts)
1853 self.pg_enable_capture(self.pg_interfaces)
1855 capture = self.pg0.get_capture(len(pkts))
1856 for packet in capture:
1858 self.assertEqual(packet[IP].src, self.nat_addr)
1859 self.assertEqual(packet[IP].dst, server1.ip4)
1860 if packet.haslayer(TCP):
1861 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1862 self.assertEqual(packet[TCP].dport, server_tcp_port)
1863 self.tcp_port_out = packet[TCP].sport
1864 self.assert_packet_checksums_valid(packet)
1865 elif packet.haslayer(UDP):
1866 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1867 self.assertEqual(packet[UDP].dport, server_udp_port)
1868 self.udp_port_out = packet[UDP].sport
1870 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1871 self.icmp_id_out = packet[ICMP].id
1873 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1878 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1879 IP(src=server1.ip4, dst=self.nat_addr) /
1880 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1882 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1883 IP(src=server1.ip4, dst=self.nat_addr) /
1884 UDP(sport=server_udp_port, dport=self.udp_port_out))
1886 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1887 IP(src=server1.ip4, dst=self.nat_addr) /
1888 ICMP(id=self.icmp_id_out, type='echo-reply'))
1890 self.pg0.add_stream(pkts)
1891 self.pg_enable_capture(self.pg_interfaces)
1893 capture = self.pg0.get_capture(len(pkts))
1894 for packet in capture:
1896 self.assertEqual(packet[IP].src, server1_nat_ip)
1897 self.assertEqual(packet[IP].dst, host.ip4)
1898 if packet.haslayer(TCP):
1899 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1900 self.assertEqual(packet[TCP].sport, server_tcp_port)
1901 self.assert_packet_checksums_valid(packet)
1902 elif packet.haslayer(UDP):
1903 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1904 self.assertEqual(packet[UDP].sport, server_udp_port)
1906 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1908 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1911 # server2 to server1
1913 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1914 IP(src=server2.ip4, dst=server1_nat_ip) /
1915 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1917 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1918 IP(src=server2.ip4, dst=server1_nat_ip) /
1919 UDP(sport=self.udp_port_in, dport=server_udp_port))
1921 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1922 IP(src=server2.ip4, dst=server1_nat_ip) /
1923 ICMP(id=self.icmp_id_in, type='echo-request'))
1925 self.pg0.add_stream(pkts)
1926 self.pg_enable_capture(self.pg_interfaces)
1928 capture = self.pg0.get_capture(len(pkts))
1929 for packet in capture:
1931 self.assertEqual(packet[IP].src, server2_nat_ip)
1932 self.assertEqual(packet[IP].dst, server1.ip4)
1933 if packet.haslayer(TCP):
1934 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1935 self.assertEqual(packet[TCP].dport, server_tcp_port)
1936 self.tcp_port_out = packet[TCP].sport
1937 self.assert_packet_checksums_valid(packet)
1938 elif packet.haslayer(UDP):
1939 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1940 self.assertEqual(packet[UDP].dport, server_udp_port)
1941 self.udp_port_out = packet[UDP].sport
1943 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1944 self.icmp_id_out = packet[ICMP].id
1946 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1949 # server1 to server2
1951 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1952 IP(src=server1.ip4, dst=server2_nat_ip) /
1953 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1955 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1956 IP(src=server1.ip4, dst=server2_nat_ip) /
1957 UDP(sport=server_udp_port, dport=self.udp_port_out))
1959 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1960 IP(src=server1.ip4, dst=server2_nat_ip) /
1961 ICMP(id=self.icmp_id_out, type='echo-reply'))
1963 self.pg0.add_stream(pkts)
1964 self.pg_enable_capture(self.pg_interfaces)
1966 capture = self.pg0.get_capture(len(pkts))
1967 for packet in capture:
1969 self.assertEqual(packet[IP].src, server1_nat_ip)
1970 self.assertEqual(packet[IP].dst, server2.ip4)
1971 if packet.haslayer(TCP):
1972 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1973 self.assertEqual(packet[TCP].sport, server_tcp_port)
1974 self.assert_packet_checksums_valid(packet)
1975 elif packet.haslayer(UDP):
1976 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1977 self.assertEqual(packet[UDP].sport, server_udp_port)
1979 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1981 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1984 def test_max_translations_per_user(self):
1985 """ MAX translations per user - recycle the least recently used """
1987 self.nat44_add_address(self.nat_addr)
1988 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1989 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1992 # get maximum number of translations per user
1993 nat44_config = self.vapi.nat_show_config()
1995 # send more than maximum number of translations per user packets
1996 pkts_num = nat44_config.max_translations_per_user + 5
1998 for port in range(0, pkts_num):
1999 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2000 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2001 TCP(sport=1025 + port))
2003 self.pg0.add_stream(pkts)
2004 self.pg_enable_capture(self.pg_interfaces)
2007 # verify number of translated packet
2008 self.pg1.get_capture(pkts_num)
2010 users = self.vapi.nat44_user_dump()
2012 if user.ip_address == self.pg0.remote_ip4n:
2013 self.assertEqual(user.nsessions,
2014 nat44_config.max_translations_per_user)
2015 self.assertEqual(user.nstaticsessions, 0)
2018 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2020 proto=IP_PROTOS.tcp)
2021 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2022 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2023 TCP(sport=tcp_port))
2024 self.pg0.add_stream(p)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 self.pg1.get_capture(1)
2028 users = self.vapi.nat44_user_dump()
2030 if user.ip_address == self.pg0.remote_ip4n:
2031 self.assertEqual(user.nsessions,
2032 nat44_config.max_translations_per_user - 1)
2033 self.assertEqual(user.nstaticsessions, 1)
2035 def test_interface_addr(self):
2036 """ Acquire NAT44 addresses from interface """
2037 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2039 # no address in NAT pool
2040 adresses = self.vapi.nat44_address_dump()
2041 self.assertEqual(0, len(adresses))
2043 # configure interface address and check NAT address pool
2044 self.pg7.config_ip4()
2045 adresses = self.vapi.nat44_address_dump()
2046 self.assertEqual(1, len(adresses))
2047 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2049 # remove interface address and check NAT address pool
2050 self.pg7.unconfig_ip4()
2051 adresses = self.vapi.nat44_address_dump()
2052 self.assertEqual(0, len(adresses))
2054 def test_interface_addr_static_mapping(self):
2055 """ Static mapping with addresses from interface """
2058 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2059 self.nat44_add_static_mapping(
2061 external_sw_if_index=self.pg7.sw_if_index,
2064 # static mappings with external interface
2065 static_mappings = self.vapi.nat44_static_mapping_dump()
2066 self.assertEqual(1, len(static_mappings))
2067 self.assertEqual(self.pg7.sw_if_index,
2068 static_mappings[0].external_sw_if_index)
2069 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2071 # configure interface address and check static mappings
2072 self.pg7.config_ip4()
2073 static_mappings = self.vapi.nat44_static_mapping_dump()
2074 self.assertEqual(2, len(static_mappings))
2076 for sm in static_mappings:
2077 if sm.external_sw_if_index == 0xFFFFFFFF:
2078 self.assertEqual(sm.external_ip_address[0:4],
2079 self.pg7.local_ip4n)
2080 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2082 self.assertTrue(resolved)
2084 # remove interface address and check static mappings
2085 self.pg7.unconfig_ip4()
2086 static_mappings = self.vapi.nat44_static_mapping_dump()
2087 self.assertEqual(1, len(static_mappings))
2088 self.assertEqual(self.pg7.sw_if_index,
2089 static_mappings[0].external_sw_if_index)
2090 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2092 # configure interface address again and check static mappings
2093 self.pg7.config_ip4()
2094 static_mappings = self.vapi.nat44_static_mapping_dump()
2095 self.assertEqual(2, len(static_mappings))
2097 for sm in static_mappings:
2098 if sm.external_sw_if_index == 0xFFFFFFFF:
2099 self.assertEqual(sm.external_ip_address[0:4],
2100 self.pg7.local_ip4n)
2101 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2103 self.assertTrue(resolved)
2105 # remove static mapping
2106 self.nat44_add_static_mapping(
2108 external_sw_if_index=self.pg7.sw_if_index,
2111 static_mappings = self.vapi.nat44_static_mapping_dump()
2112 self.assertEqual(0, len(static_mappings))
2114 def test_interface_addr_identity_nat(self):
2115 """ Identity NAT with addresses from interface """
2118 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2119 self.vapi.nat44_add_del_identity_mapping(
2120 sw_if_index=self.pg7.sw_if_index,
2122 protocol=IP_PROTOS.tcp,
2125 # identity mappings with external interface
2126 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2127 self.assertEqual(1, len(identity_mappings))
2128 self.assertEqual(self.pg7.sw_if_index,
2129 identity_mappings[0].sw_if_index)
2131 # configure interface address and check identity mappings
2132 self.pg7.config_ip4()
2133 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2135 self.assertEqual(2, len(identity_mappings))
2136 for sm in identity_mappings:
2137 if sm.sw_if_index == 0xFFFFFFFF:
2138 self.assertEqual(identity_mappings[0].ip_address,
2139 self.pg7.local_ip4n)
2140 self.assertEqual(port, identity_mappings[0].port)
2141 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2143 self.assertTrue(resolved)
2145 # remove interface address and check identity mappings
2146 self.pg7.unconfig_ip4()
2147 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2148 self.assertEqual(1, len(identity_mappings))
2149 self.assertEqual(self.pg7.sw_if_index,
2150 identity_mappings[0].sw_if_index)
2152 def test_ipfix_nat44_sess(self):
2153 """ IPFIX logging NAT44 session created/delted """
2154 self.ipfix_domain_id = 10
2155 self.ipfix_src_port = 20202
2156 colector_port = 30303
2157 bind_layers(UDP, IPFIX, dport=30303)
2158 self.nat44_add_address(self.nat_addr)
2159 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2160 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2162 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2163 src_address=self.pg3.local_ip4n,
2165 template_interval=10,
2166 collector_port=colector_port)
2167 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2168 src_port=self.ipfix_src_port)
2170 pkts = self.create_stream_in(self.pg0, self.pg1)
2171 self.pg0.add_stream(pkts)
2172 self.pg_enable_capture(self.pg_interfaces)
2174 capture = self.pg1.get_capture(len(pkts))
2175 self.verify_capture_out(capture)
2176 self.nat44_add_address(self.nat_addr, is_add=0)
2177 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2178 capture = self.pg3.get_capture(9)
2179 ipfix = IPFIXDecoder()
2180 # first load template
2182 self.assertTrue(p.haslayer(IPFIX))
2183 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2184 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2185 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2186 self.assertEqual(p[UDP].dport, colector_port)
2187 self.assertEqual(p[IPFIX].observationDomainID,
2188 self.ipfix_domain_id)
2189 if p.haslayer(Template):
2190 ipfix.add_template(p.getlayer(Template))
2191 # verify events in data set
2193 if p.haslayer(Data):
2194 data = ipfix.decode_data_set(p.getlayer(Set))
2195 self.verify_ipfix_nat44_ses(data)
2197 def test_ipfix_addr_exhausted(self):
2198 """ IPFIX logging NAT addresses exhausted """
2199 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2200 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2202 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2203 src_address=self.pg3.local_ip4n,
2205 template_interval=10)
2206 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2207 src_port=self.ipfix_src_port)
2209 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2210 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2212 self.pg0.add_stream(p)
2213 self.pg_enable_capture(self.pg_interfaces)
2215 self.pg1.assert_nothing_captured()
2217 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2218 capture = self.pg3.get_capture(9)
2219 ipfix = IPFIXDecoder()
2220 # first load template
2222 self.assertTrue(p.haslayer(IPFIX))
2223 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2224 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2225 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2226 self.assertEqual(p[UDP].dport, 4739)
2227 self.assertEqual(p[IPFIX].observationDomainID,
2228 self.ipfix_domain_id)
2229 if p.haslayer(Template):
2230 ipfix.add_template(p.getlayer(Template))
2231 # verify events in data set
2233 if p.haslayer(Data):
2234 data = ipfix.decode_data_set(p.getlayer(Set))
2235 self.verify_ipfix_addr_exhausted(data)
2237 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2238 def test_ipfix_max_sessions(self):
2239 """ IPFIX logging maximum session entries exceeded """
2240 self.nat44_add_address(self.nat_addr)
2241 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2242 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2245 nat44_config = self.vapi.nat_show_config()
2246 max_sessions = 10 * nat44_config.translation_buckets
2249 for i in range(0, max_sessions):
2250 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2251 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2252 IP(src=src, dst=self.pg1.remote_ip4) /
2255 self.pg0.add_stream(pkts)
2256 self.pg_enable_capture(self.pg_interfaces)
2259 self.pg1.get_capture(max_sessions)
2260 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2261 src_address=self.pg3.local_ip4n,
2263 template_interval=10)
2264 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2265 src_port=self.ipfix_src_port)
2267 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2268 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2270 self.pg0.add_stream(p)
2271 self.pg_enable_capture(self.pg_interfaces)
2273 self.pg1.assert_nothing_captured()
2275 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2276 capture = self.pg3.get_capture(9)
2277 ipfix = IPFIXDecoder()
2278 # first load template
2280 self.assertTrue(p.haslayer(IPFIX))
2281 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2282 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2283 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2284 self.assertEqual(p[UDP].dport, 4739)
2285 self.assertEqual(p[IPFIX].observationDomainID,
2286 self.ipfix_domain_id)
2287 if p.haslayer(Template):
2288 ipfix.add_template(p.getlayer(Template))
2289 # verify events in data set
2291 if p.haslayer(Data):
2292 data = ipfix.decode_data_set(p.getlayer(Set))
2293 self.verify_ipfix_max_sessions(data, max_sessions)
2295 def test_pool_addr_fib(self):
2296 """ NAT44 add pool addresses to FIB """
2297 static_addr = '10.0.0.10'
2298 self.nat44_add_address(self.nat_addr)
2299 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2300 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2302 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2305 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2306 ARP(op=ARP.who_has, pdst=self.nat_addr,
2307 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2308 self.pg1.add_stream(p)
2309 self.pg_enable_capture(self.pg_interfaces)
2311 capture = self.pg1.get_capture(1)
2312 self.assertTrue(capture[0].haslayer(ARP))
2313 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2316 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2317 ARP(op=ARP.who_has, pdst=static_addr,
2318 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2319 self.pg1.add_stream(p)
2320 self.pg_enable_capture(self.pg_interfaces)
2322 capture = self.pg1.get_capture(1)
2323 self.assertTrue(capture[0].haslayer(ARP))
2324 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2326 # send ARP to non-NAT44 interface
2327 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2328 ARP(op=ARP.who_has, pdst=self.nat_addr,
2329 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2330 self.pg2.add_stream(p)
2331 self.pg_enable_capture(self.pg_interfaces)
2333 self.pg1.assert_nothing_captured()
2335 # remove addresses and verify
2336 self.nat44_add_address(self.nat_addr, is_add=0)
2337 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2340 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2341 ARP(op=ARP.who_has, pdst=self.nat_addr,
2342 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2343 self.pg1.add_stream(p)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 self.pg1.assert_nothing_captured()
2348 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2349 ARP(op=ARP.who_has, pdst=static_addr,
2350 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2351 self.pg1.add_stream(p)
2352 self.pg_enable_capture(self.pg_interfaces)
2354 self.pg1.assert_nothing_captured()
2356 def test_vrf_mode(self):
2357 """ NAT44 tenant VRF aware address pool mode """
2361 nat_ip1 = "10.0.0.10"
2362 nat_ip2 = "10.0.0.11"
2364 self.pg0.unconfig_ip4()
2365 self.pg1.unconfig_ip4()
2366 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2367 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2368 self.pg0.set_table_ip4(vrf_id1)
2369 self.pg1.set_table_ip4(vrf_id2)
2370 self.pg0.config_ip4()
2371 self.pg1.config_ip4()
2372 self.pg0.resolve_arp()
2373 self.pg1.resolve_arp()
2375 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2376 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2377 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2378 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2379 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2384 pkts = self.create_stream_in(self.pg0, self.pg2)
2385 self.pg0.add_stream(pkts)
2386 self.pg_enable_capture(self.pg_interfaces)
2388 capture = self.pg2.get_capture(len(pkts))
2389 self.verify_capture_out(capture, nat_ip1)
2392 pkts = self.create_stream_in(self.pg1, self.pg2)
2393 self.pg1.add_stream(pkts)
2394 self.pg_enable_capture(self.pg_interfaces)
2396 capture = self.pg2.get_capture(len(pkts))
2397 self.verify_capture_out(capture, nat_ip2)
2400 self.pg0.unconfig_ip4()
2401 self.pg1.unconfig_ip4()
2402 self.pg0.set_table_ip4(0)
2403 self.pg1.set_table_ip4(0)
2404 self.pg0.config_ip4()
2405 self.pg1.config_ip4()
2406 self.pg0.resolve_arp()
2407 self.pg1.resolve_arp()
2408 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2409 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2411 def test_vrf_feature_independent(self):
2412 """ NAT44 tenant VRF independent address pool mode """
2414 nat_ip1 = "10.0.0.10"
2415 nat_ip2 = "10.0.0.11"
2417 self.nat44_add_address(nat_ip1)
2418 self.nat44_add_address(nat_ip2, vrf_id=99)
2419 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2420 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2421 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2425 pkts = self.create_stream_in(self.pg0, self.pg2)
2426 self.pg0.add_stream(pkts)
2427 self.pg_enable_capture(self.pg_interfaces)
2429 capture = self.pg2.get_capture(len(pkts))
2430 self.verify_capture_out(capture, nat_ip1)
2433 pkts = self.create_stream_in(self.pg1, self.pg2)
2434 self.pg1.add_stream(pkts)
2435 self.pg_enable_capture(self.pg_interfaces)
2437 capture = self.pg2.get_capture(len(pkts))
2438 self.verify_capture_out(capture, nat_ip1)
2440 def test_dynamic_ipless_interfaces(self):
2441 """ NAT44 interfaces without configured IP address """
2443 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2444 mactobinary(self.pg7.remote_mac),
2445 self.pg7.remote_ip4n,
2447 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2448 mactobinary(self.pg8.remote_mac),
2449 self.pg8.remote_ip4n,
2452 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2453 dst_address_length=32,
2454 next_hop_address=self.pg7.remote_ip4n,
2455 next_hop_sw_if_index=self.pg7.sw_if_index)
2456 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2457 dst_address_length=32,
2458 next_hop_address=self.pg8.remote_ip4n,
2459 next_hop_sw_if_index=self.pg8.sw_if_index)
2461 self.nat44_add_address(self.nat_addr)
2462 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2463 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2467 pkts = self.create_stream_in(self.pg7, self.pg8)
2468 self.pg7.add_stream(pkts)
2469 self.pg_enable_capture(self.pg_interfaces)
2471 capture = self.pg8.get_capture(len(pkts))
2472 self.verify_capture_out(capture)
2475 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2476 self.pg8.add_stream(pkts)
2477 self.pg_enable_capture(self.pg_interfaces)
2479 capture = self.pg7.get_capture(len(pkts))
2480 self.verify_capture_in(capture, self.pg7)
2482 def test_static_ipless_interfaces(self):
2483 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2485 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2486 mactobinary(self.pg7.remote_mac),
2487 self.pg7.remote_ip4n,
2489 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2490 mactobinary(self.pg8.remote_mac),
2491 self.pg8.remote_ip4n,
2494 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2495 dst_address_length=32,
2496 next_hop_address=self.pg7.remote_ip4n,
2497 next_hop_sw_if_index=self.pg7.sw_if_index)
2498 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2499 dst_address_length=32,
2500 next_hop_address=self.pg8.remote_ip4n,
2501 next_hop_sw_if_index=self.pg8.sw_if_index)
2503 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2504 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2505 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2509 pkts = self.create_stream_out(self.pg8)
2510 self.pg8.add_stream(pkts)
2511 self.pg_enable_capture(self.pg_interfaces)
2513 capture = self.pg7.get_capture(len(pkts))
2514 self.verify_capture_in(capture, self.pg7)
2517 pkts = self.create_stream_in(self.pg7, self.pg8)
2518 self.pg7.add_stream(pkts)
2519 self.pg_enable_capture(self.pg_interfaces)
2521 capture = self.pg8.get_capture(len(pkts))
2522 self.verify_capture_out(capture, self.nat_addr, True)
2524 def test_static_with_port_ipless_interfaces(self):
2525 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2527 self.tcp_port_out = 30606
2528 self.udp_port_out = 30607
2529 self.icmp_id_out = 30608
2531 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2532 mactobinary(self.pg7.remote_mac),
2533 self.pg7.remote_ip4n,
2535 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2536 mactobinary(self.pg8.remote_mac),
2537 self.pg8.remote_ip4n,
2540 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2541 dst_address_length=32,
2542 next_hop_address=self.pg7.remote_ip4n,
2543 next_hop_sw_if_index=self.pg7.sw_if_index)
2544 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2545 dst_address_length=32,
2546 next_hop_address=self.pg8.remote_ip4n,
2547 next_hop_sw_if_index=self.pg8.sw_if_index)
2549 self.nat44_add_address(self.nat_addr)
2550 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2551 self.tcp_port_in, self.tcp_port_out,
2552 proto=IP_PROTOS.tcp)
2553 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2554 self.udp_port_in, self.udp_port_out,
2555 proto=IP_PROTOS.udp)
2556 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2557 self.icmp_id_in, self.icmp_id_out,
2558 proto=IP_PROTOS.icmp)
2559 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2560 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2564 pkts = self.create_stream_out(self.pg8)
2565 self.pg8.add_stream(pkts)
2566 self.pg_enable_capture(self.pg_interfaces)
2568 capture = self.pg7.get_capture(len(pkts))
2569 self.verify_capture_in(capture, self.pg7)
2572 pkts = self.create_stream_in(self.pg7, self.pg8)
2573 self.pg7.add_stream(pkts)
2574 self.pg_enable_capture(self.pg_interfaces)
2576 capture = self.pg8.get_capture(len(pkts))
2577 self.verify_capture_out(capture)
2579 def test_static_unknown_proto(self):
2580 """ 1:1 NAT translate packet with unknown protocol """
2581 nat_ip = "10.0.0.10"
2582 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2583 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2584 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2588 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2589 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2591 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2592 TCP(sport=1234, dport=1234))
2593 self.pg0.add_stream(p)
2594 self.pg_enable_capture(self.pg_interfaces)
2596 p = self.pg1.get_capture(1)
2599 self.assertEqual(packet[IP].src, nat_ip)
2600 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2601 self.assertTrue(packet.haslayer(GRE))
2602 self.assert_packet_checksums_valid(packet)
2604 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2608 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2609 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2611 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2612 TCP(sport=1234, dport=1234))
2613 self.pg1.add_stream(p)
2614 self.pg_enable_capture(self.pg_interfaces)
2616 p = self.pg0.get_capture(1)
2619 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2620 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2621 self.assertTrue(packet.haslayer(GRE))
2622 self.assert_packet_checksums_valid(packet)
2624 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2627 def test_hairpinning_static_unknown_proto(self):
2628 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2630 host = self.pg0.remote_hosts[0]
2631 server = self.pg0.remote_hosts[1]
2633 host_nat_ip = "10.0.0.10"
2634 server_nat_ip = "10.0.0.11"
2636 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2637 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2638 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2639 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2643 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2644 IP(src=host.ip4, dst=server_nat_ip) /
2646 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2647 TCP(sport=1234, dport=1234))
2648 self.pg0.add_stream(p)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 p = self.pg0.get_capture(1)
2654 self.assertEqual(packet[IP].src, host_nat_ip)
2655 self.assertEqual(packet[IP].dst, server.ip4)
2656 self.assertTrue(packet.haslayer(GRE))
2657 self.assert_packet_checksums_valid(packet)
2659 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2663 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2664 IP(src=server.ip4, dst=host_nat_ip) /
2666 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2667 TCP(sport=1234, dport=1234))
2668 self.pg0.add_stream(p)
2669 self.pg_enable_capture(self.pg_interfaces)
2671 p = self.pg0.get_capture(1)
2674 self.assertEqual(packet[IP].src, server_nat_ip)
2675 self.assertEqual(packet[IP].dst, host.ip4)
2676 self.assertTrue(packet.haslayer(GRE))
2677 self.assert_packet_checksums_valid(packet)
2679 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2682 def test_output_feature(self):
2683 """ NAT44 interface output feature (in2out postrouting) """
2684 self.nat44_add_address(self.nat_addr)
2685 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2686 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2687 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2691 pkts = self.create_stream_in(self.pg0, self.pg3)
2692 self.pg0.add_stream(pkts)
2693 self.pg_enable_capture(self.pg_interfaces)
2695 capture = self.pg3.get_capture(len(pkts))
2696 self.verify_capture_out(capture)
2699 pkts = self.create_stream_out(self.pg3)
2700 self.pg3.add_stream(pkts)
2701 self.pg_enable_capture(self.pg_interfaces)
2703 capture = self.pg0.get_capture(len(pkts))
2704 self.verify_capture_in(capture, self.pg0)
2706 # from non-NAT interface to NAT inside interface
2707 pkts = self.create_stream_in(self.pg2, self.pg0)
2708 self.pg2.add_stream(pkts)
2709 self.pg_enable_capture(self.pg_interfaces)
2711 capture = self.pg0.get_capture(len(pkts))
2712 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2714 def test_output_feature_vrf_aware(self):
2715 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2716 nat_ip_vrf10 = "10.0.0.10"
2717 nat_ip_vrf20 = "10.0.0.20"
2719 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2720 dst_address_length=32,
2721 next_hop_address=self.pg3.remote_ip4n,
2722 next_hop_sw_if_index=self.pg3.sw_if_index,
2724 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2725 dst_address_length=32,
2726 next_hop_address=self.pg3.remote_ip4n,
2727 next_hop_sw_if_index=self.pg3.sw_if_index,
2730 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2731 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2732 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2733 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2734 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2738 pkts = self.create_stream_in(self.pg4, self.pg3)
2739 self.pg4.add_stream(pkts)
2740 self.pg_enable_capture(self.pg_interfaces)
2742 capture = self.pg3.get_capture(len(pkts))
2743 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2746 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2747 self.pg3.add_stream(pkts)
2748 self.pg_enable_capture(self.pg_interfaces)
2750 capture = self.pg4.get_capture(len(pkts))
2751 self.verify_capture_in(capture, self.pg4)
2754 pkts = self.create_stream_in(self.pg6, self.pg3)
2755 self.pg6.add_stream(pkts)
2756 self.pg_enable_capture(self.pg_interfaces)
2758 capture = self.pg3.get_capture(len(pkts))
2759 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2762 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2763 self.pg3.add_stream(pkts)
2764 self.pg_enable_capture(self.pg_interfaces)
2766 capture = self.pg6.get_capture(len(pkts))
2767 self.verify_capture_in(capture, self.pg6)
2769 def test_output_feature_hairpinning(self):
2770 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2771 host = self.pg0.remote_hosts[0]
2772 server = self.pg0.remote_hosts[1]
2775 server_in_port = 5678
2776 server_out_port = 8765
2778 self.nat44_add_address(self.nat_addr)
2779 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2780 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2783 # add static mapping for server
2784 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2785 server_in_port, server_out_port,
2786 proto=IP_PROTOS.tcp)
2788 # send packet from host to server
2789 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2790 IP(src=host.ip4, dst=self.nat_addr) /
2791 TCP(sport=host_in_port, dport=server_out_port))
2792 self.pg0.add_stream(p)
2793 self.pg_enable_capture(self.pg_interfaces)
2795 capture = self.pg0.get_capture(1)
2800 self.assertEqual(ip.src, self.nat_addr)
2801 self.assertEqual(ip.dst, server.ip4)
2802 self.assertNotEqual(tcp.sport, host_in_port)
2803 self.assertEqual(tcp.dport, server_in_port)
2804 self.assert_packet_checksums_valid(p)
2805 host_out_port = tcp.sport
2807 self.logger.error(ppp("Unexpected or invalid packet:", p))
2810 # send reply from server to host
2811 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2812 IP(src=server.ip4, dst=self.nat_addr) /
2813 TCP(sport=server_in_port, dport=host_out_port))
2814 self.pg0.add_stream(p)
2815 self.pg_enable_capture(self.pg_interfaces)
2817 capture = self.pg0.get_capture(1)
2822 self.assertEqual(ip.src, self.nat_addr)
2823 self.assertEqual(ip.dst, host.ip4)
2824 self.assertEqual(tcp.sport, server_out_port)
2825 self.assertEqual(tcp.dport, host_in_port)
2826 self.assert_packet_checksums_valid(p)
2828 self.logger.error(ppp("Unexpected or invalid packet:", p))
2831 def test_one_armed_nat44(self):
2832 """ One armed NAT44 """
2833 remote_host = self.pg9.remote_hosts[0]
2834 local_host = self.pg9.remote_hosts[1]
2837 self.nat44_add_address(self.nat_addr)
2838 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2839 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2843 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2844 IP(src=local_host.ip4, dst=remote_host.ip4) /
2845 TCP(sport=12345, dport=80))
2846 self.pg9.add_stream(p)
2847 self.pg_enable_capture(self.pg_interfaces)
2849 capture = self.pg9.get_capture(1)
2854 self.assertEqual(ip.src, self.nat_addr)
2855 self.assertEqual(ip.dst, remote_host.ip4)
2856 self.assertNotEqual(tcp.sport, 12345)
2857 external_port = tcp.sport
2858 self.assertEqual(tcp.dport, 80)
2859 self.assert_packet_checksums_valid(p)
2861 self.logger.error(ppp("Unexpected or invalid packet:", p))
2865 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2866 IP(src=remote_host.ip4, dst=self.nat_addr) /
2867 TCP(sport=80, dport=external_port))
2868 self.pg9.add_stream(p)
2869 self.pg_enable_capture(self.pg_interfaces)
2871 capture = self.pg9.get_capture(1)
2876 self.assertEqual(ip.src, remote_host.ip4)
2877 self.assertEqual(ip.dst, local_host.ip4)
2878 self.assertEqual(tcp.sport, 80)
2879 self.assertEqual(tcp.dport, 12345)
2880 self.assert_packet_checksums_valid(p)
2882 self.logger.error(ppp("Unexpected or invalid packet:", p))
2885 def test_del_session(self):
2886 """ Delete NAT44 session """
2887 self.nat44_add_address(self.nat_addr)
2888 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2889 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2892 pkts = self.create_stream_in(self.pg0, self.pg1)
2893 self.pg0.add_stream(pkts)
2894 self.pg_enable_capture(self.pg_interfaces)
2896 self.pg1.get_capture(len(pkts))
2898 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2899 nsessions = len(sessions)
2901 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2902 sessions[0].inside_port,
2903 sessions[0].protocol)
2904 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2905 sessions[1].outside_port,
2906 sessions[1].protocol,
2909 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2910 self.assertEqual(nsessions - len(sessions), 2)
2912 def test_set_get_reass(self):
2913 """ NAT44 set/get virtual fragmentation reassembly """
2914 reas_cfg1 = self.vapi.nat_get_reass()
2916 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2917 max_reass=reas_cfg1.ip4_max_reass * 2,
2918 max_frag=reas_cfg1.ip4_max_frag * 2)
2920 reas_cfg2 = self.vapi.nat_get_reass()
2922 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2923 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2924 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2926 self.vapi.nat_set_reass(drop_frag=1)
2927 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2929 def test_frag_in_order(self):
2930 """ NAT44 translate fragments arriving in order """
2931 self.nat44_add_address(self.nat_addr)
2932 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2933 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2936 data = "A" * 4 + "B" * 16 + "C" * 3
2937 self.tcp_port_in = random.randint(1025, 65535)
2939 reass = self.vapi.nat_reass_dump()
2940 reass_n_start = len(reass)
2943 pkts = self.create_stream_frag(self.pg0,
2944 self.pg1.remote_ip4,
2948 self.pg0.add_stream(pkts)
2949 self.pg_enable_capture(self.pg_interfaces)
2951 frags = self.pg1.get_capture(len(pkts))
2952 p = self.reass_frags_and_verify(frags,
2954 self.pg1.remote_ip4)
2955 self.assertEqual(p[TCP].dport, 20)
2956 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2957 self.tcp_port_out = p[TCP].sport
2958 self.assertEqual(data, p[Raw].load)
2961 pkts = self.create_stream_frag(self.pg1,
2966 self.pg1.add_stream(pkts)
2967 self.pg_enable_capture(self.pg_interfaces)
2969 frags = self.pg0.get_capture(len(pkts))
2970 p = self.reass_frags_and_verify(frags,
2971 self.pg1.remote_ip4,
2972 self.pg0.remote_ip4)
2973 self.assertEqual(p[TCP].sport, 20)
2974 self.assertEqual(p[TCP].dport, self.tcp_port_in)
2975 self.assertEqual(data, p[Raw].load)
2977 reass = self.vapi.nat_reass_dump()
2978 reass_n_end = len(reass)
2980 self.assertEqual(reass_n_end - reass_n_start, 2)
2982 def test_reass_hairpinning(self):
2983 """ NAT44 fragments hairpinning """
2984 server = self.pg0.remote_hosts[1]
2985 host_in_port = random.randint(1025, 65535)
2986 server_in_port = random.randint(1025, 65535)
2987 server_out_port = random.randint(1025, 65535)
2988 data = "A" * 4 + "B" * 16 + "C" * 3
2990 self.nat44_add_address(self.nat_addr)
2991 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2992 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2994 # add static mapping for server
2995 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2996 server_in_port, server_out_port,
2997 proto=IP_PROTOS.tcp)
2999 # send packet from host to server
3000 pkts = self.create_stream_frag(self.pg0,
3005 self.pg0.add_stream(pkts)
3006 self.pg_enable_capture(self.pg_interfaces)
3008 frags = self.pg0.get_capture(len(pkts))
3009 p = self.reass_frags_and_verify(frags,
3012 self.assertNotEqual(p[TCP].sport, host_in_port)
3013 self.assertEqual(p[TCP].dport, server_in_port)
3014 self.assertEqual(data, p[Raw].load)
3016 def test_frag_out_of_order(self):
3017 """ NAT44 translate fragments arriving out of order """
3018 self.nat44_add_address(self.nat_addr)
3019 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3020 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3023 data = "A" * 4 + "B" * 16 + "C" * 3
3024 random.randint(1025, 65535)
3027 pkts = self.create_stream_frag(self.pg0,
3028 self.pg1.remote_ip4,
3033 self.pg0.add_stream(pkts)
3034 self.pg_enable_capture(self.pg_interfaces)
3036 frags = self.pg1.get_capture(len(pkts))
3037 p = self.reass_frags_and_verify(frags,
3039 self.pg1.remote_ip4)
3040 self.assertEqual(p[TCP].dport, 20)
3041 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3042 self.tcp_port_out = p[TCP].sport
3043 self.assertEqual(data, p[Raw].load)
3046 pkts = self.create_stream_frag(self.pg1,
3052 self.pg1.add_stream(pkts)
3053 self.pg_enable_capture(self.pg_interfaces)
3055 frags = self.pg0.get_capture(len(pkts))
3056 p = self.reass_frags_and_verify(frags,
3057 self.pg1.remote_ip4,
3058 self.pg0.remote_ip4)
3059 self.assertEqual(p[TCP].sport, 20)
3060 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3061 self.assertEqual(data, p[Raw].load)
3063 def test_port_restricted(self):
3064 """ Port restricted NAT44 (MAP-E CE) """
3065 self.nat44_add_address(self.nat_addr)
3066 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3067 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3069 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3070 "psid-offset 6 psid-len 6")
3072 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3073 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3074 TCP(sport=4567, dport=22))
3075 self.pg0.add_stream(p)
3076 self.pg_enable_capture(self.pg_interfaces)
3078 capture = self.pg1.get_capture(1)
3083 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3084 self.assertEqual(ip.src, self.nat_addr)
3085 self.assertEqual(tcp.dport, 22)
3086 self.assertNotEqual(tcp.sport, 4567)
3087 self.assertEqual((tcp.sport >> 6) & 63, 10)
3088 self.assert_packet_checksums_valid(p)
3090 self.logger.error(ppp("Unexpected or invalid packet:", p))
3093 def test_ipfix_max_frags(self):
3094 """ IPFIX logging maximum fragments pending reassembly exceeded """
3095 self.nat44_add_address(self.nat_addr)
3096 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3097 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3099 self.vapi.nat_set_reass(max_frag=0)
3100 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3101 src_address=self.pg3.local_ip4n,
3103 template_interval=10)
3104 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3105 src_port=self.ipfix_src_port)
3107 data = "A" * 4 + "B" * 16 + "C" * 3
3108 self.tcp_port_in = random.randint(1025, 65535)
3109 pkts = self.create_stream_frag(self.pg0,
3110 self.pg1.remote_ip4,
3114 self.pg0.add_stream(pkts[-1])
3115 self.pg_enable_capture(self.pg_interfaces)
3117 self.pg1.assert_nothing_captured()
3119 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3120 capture = self.pg3.get_capture(9)
3121 ipfix = IPFIXDecoder()
3122 # first load template
3124 self.assertTrue(p.haslayer(IPFIX))
3125 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3126 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3127 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3128 self.assertEqual(p[UDP].dport, 4739)
3129 self.assertEqual(p[IPFIX].observationDomainID,
3130 self.ipfix_domain_id)
3131 if p.haslayer(Template):
3132 ipfix.add_template(p.getlayer(Template))
3133 # verify events in data set
3135 if p.haslayer(Data):
3136 data = ipfix.decode_data_set(p.getlayer(Set))
3137 self.verify_ipfix_max_fragments_ip4(data, 0,
3138 self.pg0.remote_ip4n)
3140 def test_multiple_outside_vrf(self):
3141 """ Multiple outside VRF """
3145 self.pg1.unconfig_ip4()
3146 self.pg2.unconfig_ip4()
3147 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3148 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3149 self.pg1.set_table_ip4(vrf_id1)
3150 self.pg2.set_table_ip4(vrf_id2)
3151 self.pg1.config_ip4()
3152 self.pg2.config_ip4()
3153 self.pg1.resolve_arp()
3154 self.pg2.resolve_arp()
3156 self.nat44_add_address(self.nat_addr)
3157 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3158 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3160 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3165 pkts = self.create_stream_in(self.pg0, self.pg1)
3166 self.pg0.add_stream(pkts)
3167 self.pg_enable_capture(self.pg_interfaces)
3169 capture = self.pg1.get_capture(len(pkts))
3170 self.verify_capture_out(capture, self.nat_addr)
3172 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3173 self.pg1.add_stream(pkts)
3174 self.pg_enable_capture(self.pg_interfaces)
3176 capture = self.pg0.get_capture(len(pkts))
3177 self.verify_capture_in(capture, self.pg0)
3179 self.tcp_port_in = 60303
3180 self.udp_port_in = 60304
3181 self.icmp_id_in = 60305
3184 pkts = self.create_stream_in(self.pg0, self.pg2)
3185 self.pg0.add_stream(pkts)
3186 self.pg_enable_capture(self.pg_interfaces)
3188 capture = self.pg2.get_capture(len(pkts))
3189 self.verify_capture_out(capture, self.nat_addr)
3191 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3192 self.pg2.add_stream(pkts)
3193 self.pg_enable_capture(self.pg_interfaces)
3195 capture = self.pg0.get_capture(len(pkts))
3196 self.verify_capture_in(capture, self.pg0)
3199 self.pg1.unconfig_ip4()
3200 self.pg2.unconfig_ip4()
3201 self.pg1.set_table_ip4(0)
3202 self.pg2.set_table_ip4(0)
3203 self.pg1.config_ip4()
3204 self.pg2.config_ip4()
3205 self.pg1.resolve_arp()
3206 self.pg2.resolve_arp()
3209 super(TestNAT44, self).tearDown()
3210 if not self.vpp_dead:
3211 self.logger.info(self.vapi.cli("show nat44 addresses"))
3212 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3213 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3214 self.logger.info(self.vapi.cli("show nat44 interface address"))
3215 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3216 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3217 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3218 self.vapi.cli("nat addr-port-assignment-alg default")
3220 self.vapi.cli("clear logging")
3223 class TestNAT44EndpointDependent(MethodHolder):
3224 """ Endpoint-Dependent mapping and filtering test cases """
3227 def setUpConstants(cls):
3228 super(TestNAT44EndpointDependent, cls).setUpConstants()
3229 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3232 def setUpClass(cls):
3233 super(TestNAT44EndpointDependent, cls).setUpClass()
3234 cls.vapi.cli("set log class nat level debug")
3236 cls.tcp_port_in = 6303
3237 cls.tcp_port_out = 6303
3238 cls.udp_port_in = 6304
3239 cls.udp_port_out = 6304
3240 cls.icmp_id_in = 6305
3241 cls.icmp_id_out = 6305
3242 cls.nat_addr = '10.0.0.3'
3243 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3244 cls.ipfix_src_port = 4739
3245 cls.ipfix_domain_id = 1
3246 cls.tcp_external_port = 80
3248 cls.create_pg_interfaces(range(7))
3249 cls.interfaces = list(cls.pg_interfaces[0:3])
3251 for i in cls.interfaces:
3256 cls.pg0.generate_remote_hosts(3)
3257 cls.pg0.configure_ipv4_neighbors()
3261 cls.pg4.generate_remote_hosts(2)
3262 cls.pg4.config_ip4()
3263 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3264 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3268 cls.pg4.resolve_arp()
3269 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3270 cls.pg4.resolve_arp()
3272 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3273 cls.vapi.ip_table_add_del(1, is_add=1)
3275 cls.pg5._local_ip4 = "10.1.1.1"
3276 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3278 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3279 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3280 socket.AF_INET, cls.pg5.remote_ip4)
3281 cls.pg5.set_table_ip4(1)
3282 cls.pg5.config_ip4()
3284 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3285 dst_address_length=32,
3287 next_hop_sw_if_index=cls.pg5.sw_if_index,
3288 next_hop_address=zero_ip4n)
3290 cls.pg6._local_ip4 = "10.1.2.1"
3291 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3293 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3294 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3295 socket.AF_INET, cls.pg6.remote_ip4)
3296 cls.pg6.set_table_ip4(1)
3297 cls.pg6.config_ip4()
3299 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3300 dst_address_length=32,
3302 next_hop_sw_if_index=cls.pg6.sw_if_index,
3303 next_hop_address=zero_ip4n)
3305 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3306 dst_address_length=16,
3307 next_hop_address=zero_ip4n,
3309 next_hop_table_id=1)
3310 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3311 dst_address_length=0,
3312 next_hop_address=zero_ip4n,
3314 next_hop_table_id=0)
3315 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3316 dst_address_length=0,
3318 next_hop_sw_if_index=cls.pg1.sw_if_index,
3319 next_hop_address=cls.pg1.local_ip4n)
3321 cls.pg5.resolve_arp()
3322 cls.pg6.resolve_arp()
3325 super(TestNAT44EndpointDependent, cls).tearDownClass()
3328 def test_dynamic(self):
3329 """ NAT44 dynamic translation test """
3331 self.nat44_add_address(self.nat_addr)
3332 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3333 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3337 pkts = self.create_stream_in(self.pg0, self.pg1)
3338 self.pg0.add_stream(pkts)
3339 self.pg_enable_capture(self.pg_interfaces)
3341 capture = self.pg1.get_capture(len(pkts))
3342 self.verify_capture_out(capture)
3345 pkts = self.create_stream_out(self.pg1)
3346 self.pg1.add_stream(pkts)
3347 self.pg_enable_capture(self.pg_interfaces)
3349 capture = self.pg0.get_capture(len(pkts))
3350 self.verify_capture_in(capture, self.pg0)
3352 def test_forwarding(self):
3353 """ NAT44 forwarding test """
3355 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3356 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3358 self.vapi.nat44_forwarding_enable_disable(1)
3360 real_ip = self.pg0.remote_ip4n
3361 alias_ip = self.nat_addr_n
3362 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3363 external_ip=alias_ip)
3366 # in2out - static mapping match
3368 pkts = self.create_stream_out(self.pg1)
3369 self.pg1.add_stream(pkts)
3370 self.pg_enable_capture(self.pg_interfaces)
3372 capture = self.pg0.get_capture(len(pkts))
3373 self.verify_capture_in(capture, self.pg0)
3375 pkts = self.create_stream_in(self.pg0, self.pg1)
3376 self.pg0.add_stream(pkts)
3377 self.pg_enable_capture(self.pg_interfaces)
3379 capture = self.pg1.get_capture(len(pkts))
3380 self.verify_capture_out(capture, same_port=True)
3382 # in2out - no static mapping match
3384 host0 = self.pg0.remote_hosts[0]
3385 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3387 pkts = self.create_stream_out(self.pg1,
3388 dst_ip=self.pg0.remote_ip4,
3389 use_inside_ports=True)
3390 self.pg1.add_stream(pkts)
3391 self.pg_enable_capture(self.pg_interfaces)
3393 capture = self.pg0.get_capture(len(pkts))
3394 self.verify_capture_in(capture, self.pg0)
3396 pkts = self.create_stream_in(self.pg0, self.pg1)
3397 self.pg0.add_stream(pkts)
3398 self.pg_enable_capture(self.pg_interfaces)
3400 capture = self.pg1.get_capture(len(pkts))
3401 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3404 self.pg0.remote_hosts[0] = host0
3406 user = self.pg0.remote_hosts[1]
3407 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3408 self.assertEqual(len(sessions), 3)
3409 self.assertTrue(sessions[0].ext_host_valid)
3410 self.vapi.nat44_del_session(
3411 sessions[0].inside_ip_address,
3412 sessions[0].inside_port,
3413 sessions[0].protocol,
3414 ext_host_address=sessions[0].ext_host_address,
3415 ext_host_port=sessions[0].ext_host_port)
3416 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3417 self.assertEqual(len(sessions), 2)
3420 self.vapi.nat44_forwarding_enable_disable(0)
3421 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3422 external_ip=alias_ip,
3425 def test_static_lb(self):
3426 """ NAT44 local service load balancing """
3427 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3430 server1 = self.pg0.remote_hosts[0]
3431 server2 = self.pg0.remote_hosts[1]
3433 locals = [{'addr': server1.ip4n,
3437 {'addr': server2.ip4n,
3442 self.nat44_add_address(self.nat_addr)
3443 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3446 local_num=len(locals),
3448 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3449 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3452 # from client to service
3453 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3454 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3455 TCP(sport=12345, dport=external_port))
3456 self.pg1.add_stream(p)
3457 self.pg_enable_capture(self.pg_interfaces)
3459 capture = self.pg0.get_capture(1)
3465 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3466 if ip.dst == server1.ip4:
3470 self.assertEqual(tcp.dport, local_port)
3471 self.assert_packet_checksums_valid(p)
3473 self.logger.error(ppp("Unexpected or invalid packet:", p))
3476 # from service back to client
3477 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3478 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3479 TCP(sport=local_port, dport=12345))
3480 self.pg0.add_stream(p)
3481 self.pg_enable_capture(self.pg_interfaces)
3483 capture = self.pg1.get_capture(1)
3488 self.assertEqual(ip.src, self.nat_addr)
3489 self.assertEqual(tcp.sport, external_port)
3490 self.assert_packet_checksums_valid(p)
3492 self.logger.error(ppp("Unexpected or invalid packet:", p))
3495 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3496 self.assertEqual(len(sessions), 1)
3497 self.assertTrue(sessions[0].ext_host_valid)
3498 self.vapi.nat44_del_session(
3499 sessions[0].inside_ip_address,
3500 sessions[0].inside_port,
3501 sessions[0].protocol,
3502 ext_host_address=sessions[0].ext_host_address,
3503 ext_host_port=sessions[0].ext_host_port)
3504 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3505 self.assertEqual(len(sessions), 0)
3507 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3508 def test_static_lb_multi_clients(self):
3509 """ NAT44 local service load balancing - multiple clients"""
3511 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3514 server1 = self.pg0.remote_hosts[0]
3515 server2 = self.pg0.remote_hosts[1]
3517 locals = [{'addr': server1.ip4n,
3521 {'addr': server2.ip4n,
3526 self.nat44_add_address(self.nat_addr)
3527 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3530 local_num=len(locals),
3532 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3533 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3538 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3540 for client in clients:
3541 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3542 IP(src=client, dst=self.nat_addr) /
3543 TCP(sport=12345, dport=external_port))
3545 self.pg1.add_stream(pkts)
3546 self.pg_enable_capture(self.pg_interfaces)
3548 capture = self.pg0.get_capture(len(pkts))
3550 if p[IP].dst == server1.ip4:
3554 self.assertTrue(server1_n > server2_n)
3556 def test_static_lb_2(self):
3557 """ NAT44 local service load balancing (asymmetrical rule) """
3558 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3561 server1 = self.pg0.remote_hosts[0]
3562 server2 = self.pg0.remote_hosts[1]
3564 locals = [{'addr': server1.ip4n,
3568 {'addr': server2.ip4n,
3573 self.vapi.nat44_forwarding_enable_disable(1)
3574 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3578 local_num=len(locals),
3580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3584 # from client to service
3585 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3586 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3587 TCP(sport=12345, dport=external_port))
3588 self.pg1.add_stream(p)
3589 self.pg_enable_capture(self.pg_interfaces)
3591 capture = self.pg0.get_capture(1)
3597 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3598 if ip.dst == server1.ip4:
3602 self.assertEqual(tcp.dport, local_port)
3603 self.assert_packet_checksums_valid(p)
3605 self.logger.error(ppp("Unexpected or invalid packet:", p))
3608 # from service back to client
3609 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3610 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3611 TCP(sport=local_port, dport=12345))
3612 self.pg0.add_stream(p)
3613 self.pg_enable_capture(self.pg_interfaces)
3615 capture = self.pg1.get_capture(1)
3620 self.assertEqual(ip.src, self.nat_addr)
3621 self.assertEqual(tcp.sport, external_port)
3622 self.assert_packet_checksums_valid(p)
3624 self.logger.error(ppp("Unexpected or invalid packet:", p))
3627 # from client to server (no translation)
3628 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3629 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3630 TCP(sport=12346, dport=local_port))
3631 self.pg1.add_stream(p)
3632 self.pg_enable_capture(self.pg_interfaces)
3634 capture = self.pg0.get_capture(1)
3640 self.assertEqual(ip.dst, server1.ip4)
3641 self.assertEqual(tcp.dport, local_port)
3642 self.assert_packet_checksums_valid(p)
3644 self.logger.error(ppp("Unexpected or invalid packet:", p))
3647 # from service back to client (no translation)
3648 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3649 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3650 TCP(sport=local_port, dport=12346))
3651 self.pg0.add_stream(p)
3652 self.pg_enable_capture(self.pg_interfaces)
3654 capture = self.pg1.get_capture(1)
3659 self.assertEqual(ip.src, server1.ip4)
3660 self.assertEqual(tcp.sport, local_port)
3661 self.assert_packet_checksums_valid(p)
3663 self.logger.error(ppp("Unexpected or invalid packet:", p))
3666 def test_unknown_proto(self):
3667 """ NAT44 translate packet with unknown protocol """
3668 self.nat44_add_address(self.nat_addr)
3669 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3670 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3674 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3675 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3676 TCP(sport=self.tcp_port_in, dport=20))
3677 self.pg0.add_stream(p)
3678 self.pg_enable_capture(self.pg_interfaces)
3680 p = self.pg1.get_capture(1)
3682 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3685 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3686 TCP(sport=1234, dport=1234))
3687 self.pg0.add_stream(p)
3688 self.pg_enable_capture(self.pg_interfaces)
3690 p = self.pg1.get_capture(1)
3693 self.assertEqual(packet[IP].src, self.nat_addr)
3694 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3695 self.assertTrue(packet.haslayer(GRE))
3696 self.assert_packet_checksums_valid(packet)
3698 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3702 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3703 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3705 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3706 TCP(sport=1234, dport=1234))
3707 self.pg1.add_stream(p)
3708 self.pg_enable_capture(self.pg_interfaces)
3710 p = self.pg0.get_capture(1)
3713 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3714 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3715 self.assertTrue(packet.haslayer(GRE))
3716 self.assert_packet_checksums_valid(packet)
3718 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3721 def test_hairpinning_unknown_proto(self):
3722 """ NAT44 translate packet with unknown protocol - hairpinning """
3723 host = self.pg0.remote_hosts[0]
3724 server = self.pg0.remote_hosts[1]
3726 server_out_port = 8765
3727 server_nat_ip = "10.0.0.11"
3729 self.nat44_add_address(self.nat_addr)
3730 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3731 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3734 # add static mapping for server
3735 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3738 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3739 IP(src=host.ip4, dst=server_nat_ip) /
3740 TCP(sport=host_in_port, dport=server_out_port))
3741 self.pg0.add_stream(p)
3742 self.pg_enable_capture(self.pg_interfaces)
3744 self.pg0.get_capture(1)
3746 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3747 IP(src=host.ip4, dst=server_nat_ip) /
3749 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3750 TCP(sport=1234, dport=1234))
3751 self.pg0.add_stream(p)
3752 self.pg_enable_capture(self.pg_interfaces)
3754 p = self.pg0.get_capture(1)
3757 self.assertEqual(packet[IP].src, self.nat_addr)
3758 self.assertEqual(packet[IP].dst, server.ip4)
3759 self.assertTrue(packet.haslayer(GRE))
3760 self.assert_packet_checksums_valid(packet)
3762 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3766 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3767 IP(src=server.ip4, dst=self.nat_addr) /
3769 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3770 TCP(sport=1234, dport=1234))
3771 self.pg0.add_stream(p)
3772 self.pg_enable_capture(self.pg_interfaces)
3774 p = self.pg0.get_capture(1)
3777 self.assertEqual(packet[IP].src, server_nat_ip)
3778 self.assertEqual(packet[IP].dst, host.ip4)
3779 self.assertTrue(packet.haslayer(GRE))
3780 self.assert_packet_checksums_valid(packet)
3782 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3785 def test_output_feature_and_service(self):
3786 """ NAT44 interface output feature and services """
3787 external_addr = '1.2.3.4'
3791 self.vapi.nat44_forwarding_enable_disable(1)
3792 self.nat44_add_address(self.nat_addr)
3793 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3794 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3795 local_port, external_port,
3796 proto=IP_PROTOS.tcp, out2in_only=1)
3797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3798 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3800 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3803 # from client to service
3804 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3805 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3806 TCP(sport=12345, dport=external_port))
3807 self.pg1.add_stream(p)
3808 self.pg_enable_capture(self.pg_interfaces)
3810 capture = self.pg0.get_capture(1)
3815 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3816 self.assertEqual(tcp.dport, local_port)
3817 self.assert_packet_checksums_valid(p)
3819 self.logger.error(ppp("Unexpected or invalid packet:", p))
3822 # from service back to client
3823 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3824 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3825 TCP(sport=local_port, dport=12345))
3826 self.pg0.add_stream(p)
3827 self.pg_enable_capture(self.pg_interfaces)
3829 capture = self.pg1.get_capture(1)
3834 self.assertEqual(ip.src, external_addr)
3835 self.assertEqual(tcp.sport, external_port)
3836 self.assert_packet_checksums_valid(p)
3838 self.logger.error(ppp("Unexpected or invalid packet:", p))
3841 # from local network host to external network
3842 pkts = self.create_stream_in(self.pg0, self.pg1)
3843 self.pg0.add_stream(pkts)
3844 self.pg_enable_capture(self.pg_interfaces)
3846 capture = self.pg1.get_capture(len(pkts))
3847 self.verify_capture_out(capture)
3848 pkts = self.create_stream_in(self.pg0, self.pg1)
3849 self.pg0.add_stream(pkts)
3850 self.pg_enable_capture(self.pg_interfaces)
3852 capture = self.pg1.get_capture(len(pkts))
3853 self.verify_capture_out(capture)
3855 # from external network back to local network host
3856 pkts = self.create_stream_out(self.pg1)
3857 self.pg1.add_stream(pkts)
3858 self.pg_enable_capture(self.pg_interfaces)
3860 capture = self.pg0.get_capture(len(pkts))
3861 self.verify_capture_in(capture, self.pg0)
3863 def test_output_feature_and_service2(self):
3864 """ NAT44 interface output feature and service host direct access """
3865 self.vapi.nat44_forwarding_enable_disable(1)
3866 self.nat44_add_address(self.nat_addr)
3867 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3870 # session initiaded from service host - translate
3871 pkts = self.create_stream_in(self.pg0, self.pg1)
3872 self.pg0.add_stream(pkts)
3873 self.pg_enable_capture(self.pg_interfaces)
3875 capture = self.pg1.get_capture(len(pkts))
3876 self.verify_capture_out(capture)
3878 pkts = self.create_stream_out(self.pg1)
3879 self.pg1.add_stream(pkts)
3880 self.pg_enable_capture(self.pg_interfaces)
3882 capture = self.pg0.get_capture(len(pkts))
3883 self.verify_capture_in(capture, self.pg0)
3885 # session initiaded from remote host - do not translate
3886 self.tcp_port_in = 60303
3887 self.udp_port_in = 60304
3888 self.icmp_id_in = 60305
3889 pkts = self.create_stream_out(self.pg1,
3890 self.pg0.remote_ip4,
3891 use_inside_ports=True)
3892 self.pg1.add_stream(pkts)
3893 self.pg_enable_capture(self.pg_interfaces)
3895 capture = self.pg0.get_capture(len(pkts))
3896 self.verify_capture_in(capture, self.pg0)
3898 pkts = self.create_stream_in(self.pg0, self.pg1)
3899 self.pg0.add_stream(pkts)
3900 self.pg_enable_capture(self.pg_interfaces)
3902 capture = self.pg1.get_capture(len(pkts))
3903 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3906 def test_output_feature_and_service3(self):
3907 """ NAT44 interface output feature and DST NAT """
3908 external_addr = '1.2.3.4'
3912 self.vapi.nat44_forwarding_enable_disable(1)
3913 self.nat44_add_address(self.nat_addr)
3914 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3915 local_port, external_port,
3916 proto=IP_PROTOS.tcp, out2in_only=1)
3917 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3918 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3920 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3923 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3924 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3925 TCP(sport=12345, dport=external_port))
3926 self.pg0.add_stream(p)
3927 self.pg_enable_capture(self.pg_interfaces)
3929 capture = self.pg1.get_capture(1)
3934 self.assertEqual(ip.src, self.pg0.remote_ip4)
3935 self.assertEqual(tcp.sport, 12345)
3936 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3937 self.assertEqual(tcp.dport, local_port)
3938 self.assert_packet_checksums_valid(p)
3940 self.logger.error(ppp("Unexpected or invalid packet:", p))
3943 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3944 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3945 TCP(sport=local_port, dport=12345))
3946 self.pg1.add_stream(p)
3947 self.pg_enable_capture(self.pg_interfaces)
3949 capture = self.pg0.get_capture(1)
3954 self.assertEqual(ip.src, external_addr)
3955 self.assertEqual(tcp.sport, external_port)
3956 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3957 self.assertEqual(tcp.dport, 12345)
3958 self.assert_packet_checksums_valid(p)
3960 self.logger.error(ppp("Unexpected or invalid packet:", p))
3963 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3965 twice_nat_addr = '10.0.1.3'
3973 port_in1 = port_in+1
3974 port_in2 = port_in+2
3979 server1 = self.pg0.remote_hosts[0]
3980 server2 = self.pg0.remote_hosts[1]
3992 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3995 self.nat44_add_address(self.nat_addr)
3996 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3998 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4000 proto=IP_PROTOS.tcp,
4001 twice_nat=int(not self_twice_nat),
4002 self_twice_nat=int(self_twice_nat))
4004 locals = [{'addr': server1.ip4n,
4008 {'addr': server2.ip4n,
4012 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4013 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4017 not self_twice_nat),
4020 local_num=len(locals),
4022 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4023 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4030 assert client_id is not None
4032 client = self.pg0.remote_hosts[0]
4033 elif client_id == 2:
4034 client = self.pg0.remote_hosts[1]
4036 client = pg1.remote_hosts[0]
4037 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4038 IP(src=client.ip4, dst=self.nat_addr) /
4039 TCP(sport=eh_port_out, dport=port_out))
4041 self.pg_enable_capture(self.pg_interfaces)
4043 capture = pg0.get_capture(1)
4049 if ip.dst == server1.ip4:
4055 self.assertEqual(ip.dst, server.ip4)
4057 self.assertIn(tcp.dport, [port_in1, port_in2])
4059 self.assertEqual(tcp.dport, port_in)
4061 self.assertEqual(ip.src, twice_nat_addr)
4062 self.assertNotEqual(tcp.sport, eh_port_out)
4064 self.assertEqual(ip.src, client.ip4)
4065 self.assertEqual(tcp.sport, eh_port_out)
4067 eh_port_in = tcp.sport
4068 saved_port_in = tcp.dport
4069 self.assert_packet_checksums_valid(p)
4071 self.logger.error(ppp("Unexpected or invalid packet:", p))
4074 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4075 IP(src=server.ip4, dst=eh_addr_in) /
4076 TCP(sport=saved_port_in, dport=eh_port_in))
4078 self.pg_enable_capture(self.pg_interfaces)
4080 capture = pg1.get_capture(1)
4085 self.assertEqual(ip.dst, client.ip4)
4086 self.assertEqual(ip.src, self.nat_addr)
4087 self.assertEqual(tcp.dport, eh_port_out)
4088 self.assertEqual(tcp.sport, port_out)
4089 self.assert_packet_checksums_valid(p)
4091 self.logger.error(ppp("Unexpected or invalid packet:", p))
4095 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4096 self.assertEqual(len(sessions), 1)
4097 self.assertTrue(sessions[0].ext_host_valid)
4098 self.assertTrue(sessions[0].is_twicenat)
4099 self.vapi.nat44_del_session(
4100 sessions[0].inside_ip_address,
4101 sessions[0].inside_port,
4102 sessions[0].protocol,
4103 ext_host_address=sessions[0].ext_host_nat_address,
4104 ext_host_port=sessions[0].ext_host_nat_port)
4105 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4106 self.assertEqual(len(sessions), 0)
4108 def test_twice_nat(self):
4110 self.twice_nat_common()
4112 def test_self_twice_nat_positive(self):
4113 """ Self Twice NAT44 (positive test) """
4114 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4116 def test_self_twice_nat_negative(self):
4117 """ Self Twice NAT44 (negative test) """
4118 self.twice_nat_common(self_twice_nat=True)
4120 def test_twice_nat_lb(self):
4121 """ Twice NAT44 local service load balancing """
4122 self.twice_nat_common(lb=True)
4124 def test_self_twice_nat_lb_positive(self):
4125 """ Self Twice NAT44 local service load balancing (positive test) """
4126 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4129 def test_self_twice_nat_lb_negative(self):
4130 """ Self Twice NAT44 local service load balancing (negative test) """
4131 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4134 def test_twice_nat_interface_addr(self):
4135 """ Acquire twice NAT44 addresses from interface """
4136 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4138 # no address in NAT pool
4139 adresses = self.vapi.nat44_address_dump()
4140 self.assertEqual(0, len(adresses))
4142 # configure interface address and check NAT address pool
4143 self.pg3.config_ip4()
4144 adresses = self.vapi.nat44_address_dump()
4145 self.assertEqual(1, len(adresses))
4146 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4147 self.assertEqual(adresses[0].twice_nat, 1)
4149 # remove interface address and check NAT address pool
4150 self.pg3.unconfig_ip4()
4151 adresses = self.vapi.nat44_address_dump()
4152 self.assertEqual(0, len(adresses))
4154 def test_tcp_session_close_in(self):
4155 """ Close TCP session from inside network """
4156 self.tcp_port_out = 10505
4157 self.nat44_add_address(self.nat_addr)
4158 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4162 proto=IP_PROTOS.tcp,
4164 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4165 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4168 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4169 start_sessnum = len(sessions)
4171 self.initiate_tcp_session(self.pg0, self.pg1)
4173 # FIN packet in -> out
4174 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4175 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4176 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4177 flags="FA", seq=100, ack=300))
4178 self.pg0.add_stream(p)
4179 self.pg_enable_capture(self.pg_interfaces)
4181 self.pg1.get_capture(1)
4185 # ACK packet out -> in
4186 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4187 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4188 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4189 flags="A", seq=300, ack=101))
4192 # FIN packet out -> in
4193 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4194 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4195 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4196 flags="FA", seq=300, ack=101))
4199 self.pg1.add_stream(pkts)
4200 self.pg_enable_capture(self.pg_interfaces)
4202 self.pg0.get_capture(2)
4204 # ACK packet in -> out
4205 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4206 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4207 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4208 flags="A", seq=101, ack=301))
4209 self.pg0.add_stream(p)
4210 self.pg_enable_capture(self.pg_interfaces)
4212 self.pg1.get_capture(1)
4214 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4216 self.assertEqual(len(sessions) - start_sessnum, 0)
4218 def test_tcp_session_close_out(self):
4219 """ Close TCP session from outside network """
4220 self.tcp_port_out = 10505
4221 self.nat44_add_address(self.nat_addr)
4222 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4226 proto=IP_PROTOS.tcp,
4228 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4229 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4232 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4233 start_sessnum = len(sessions)
4235 self.initiate_tcp_session(self.pg0, self.pg1)
4237 # FIN packet out -> in
4238 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4239 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4240 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4241 flags="FA", seq=100, ack=300))
4242 self.pg1.add_stream(p)
4243 self.pg_enable_capture(self.pg_interfaces)
4245 self.pg0.get_capture(1)
4247 # FIN+ACK packet in -> out
4248 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4249 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4250 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4251 flags="FA", seq=300, ack=101))
4253 self.pg0.add_stream(p)
4254 self.pg_enable_capture(self.pg_interfaces)
4256 self.pg1.get_capture(1)
4258 # ACK packet out -> in
4259 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4260 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4261 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4262 flags="A", seq=101, ack=301))
4263 self.pg1.add_stream(p)
4264 self.pg_enable_capture(self.pg_interfaces)
4266 self.pg0.get_capture(1)
4268 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4270 self.assertEqual(len(sessions) - start_sessnum, 0)
4272 def test_tcp_session_close_simultaneous(self):
4273 """ Close TCP session from inside network """
4274 self.tcp_port_out = 10505
4275 self.nat44_add_address(self.nat_addr)
4276 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4280 proto=IP_PROTOS.tcp,
4282 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4283 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4286 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4287 start_sessnum = len(sessions)
4289 self.initiate_tcp_session(self.pg0, self.pg1)
4291 # FIN packet in -> out
4292 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4293 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4294 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4295 flags="FA", seq=100, ack=300))
4296 self.pg0.add_stream(p)
4297 self.pg_enable_capture(self.pg_interfaces)
4299 self.pg1.get_capture(1)
4301 # FIN packet out -> in
4302 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4303 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4304 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4305 flags="FA", seq=300, ack=100))
4306 self.pg1.add_stream(p)
4307 self.pg_enable_capture(self.pg_interfaces)
4309 self.pg0.get_capture(1)
4311 # ACK packet in -> out
4312 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4313 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4314 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4315 flags="A", seq=101, ack=301))
4316 self.pg0.add_stream(p)
4317 self.pg_enable_capture(self.pg_interfaces)
4319 self.pg1.get_capture(1)
4321 # ACK packet out -> in
4322 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4323 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4324 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4325 flags="A", seq=301, ack=101))
4326 self.pg1.add_stream(p)
4327 self.pg_enable_capture(self.pg_interfaces)
4329 self.pg0.get_capture(1)
4331 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4333 self.assertEqual(len(sessions) - start_sessnum, 0)
4335 def test_one_armed_nat44_static(self):
4336 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4337 remote_host = self.pg4.remote_hosts[0]
4338 local_host = self.pg4.remote_hosts[1]
4343 self.vapi.nat44_forwarding_enable_disable(1)
4344 self.nat44_add_address(self.nat_addr, twice_nat=1)
4345 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4346 local_port, external_port,
4347 proto=IP_PROTOS.tcp, out2in_only=1,
4349 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4350 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4353 # from client to service
4354 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4355 IP(src=remote_host.ip4, dst=self.nat_addr) /
4356 TCP(sport=12345, dport=external_port))
4357 self.pg4.add_stream(p)
4358 self.pg_enable_capture(self.pg_interfaces)
4360 capture = self.pg4.get_capture(1)
4365 self.assertEqual(ip.dst, local_host.ip4)
4366 self.assertEqual(ip.src, self.nat_addr)
4367 self.assertEqual(tcp.dport, local_port)
4368 self.assertNotEqual(tcp.sport, 12345)
4369 eh_port_in = tcp.sport
4370 self.assert_packet_checksums_valid(p)
4372 self.logger.error(ppp("Unexpected or invalid packet:", p))
4375 # from service back to client
4376 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4377 IP(src=local_host.ip4, dst=self.nat_addr) /
4378 TCP(sport=local_port, dport=eh_port_in))
4379 self.pg4.add_stream(p)
4380 self.pg_enable_capture(self.pg_interfaces)
4382 capture = self.pg4.get_capture(1)
4387 self.assertEqual(ip.src, self.nat_addr)
4388 self.assertEqual(ip.dst, remote_host.ip4)
4389 self.assertEqual(tcp.sport, external_port)
4390 self.assertEqual(tcp.dport, 12345)
4391 self.assert_packet_checksums_valid(p)
4393 self.logger.error(ppp("Unexpected or invalid packet:", p))
4396 def test_static_with_port_out2(self):
4397 """ 1:1 NAPT asymmetrical rule """
4402 self.vapi.nat44_forwarding_enable_disable(1)
4403 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4404 local_port, external_port,
4405 proto=IP_PROTOS.tcp, out2in_only=1)
4406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4410 # from client to service
4411 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4412 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4413 TCP(sport=12345, dport=external_port))
4414 self.pg1.add_stream(p)
4415 self.pg_enable_capture(self.pg_interfaces)
4417 capture = self.pg0.get_capture(1)
4422 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4423 self.assertEqual(tcp.dport, local_port)
4424 self.assert_packet_checksums_valid(p)
4426 self.logger.error(ppp("Unexpected or invalid packet:", p))
4430 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4431 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4432 ICMP(type=11) / capture[0][IP])
4433 self.pg0.add_stream(p)
4434 self.pg_enable_capture(self.pg_interfaces)
4436 capture = self.pg1.get_capture(1)
4439 self.assertEqual(p[IP].src, self.nat_addr)
4441 self.assertEqual(inner.dst, self.nat_addr)
4442 self.assertEqual(inner[TCPerror].dport, external_port)
4444 self.logger.error(ppp("Unexpected or invalid packet:", p))
4447 # from service back to client
4448 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4449 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4450 TCP(sport=local_port, dport=12345))
4451 self.pg0.add_stream(p)
4452 self.pg_enable_capture(self.pg_interfaces)
4454 capture = self.pg1.get_capture(1)
4459 self.assertEqual(ip.src, self.nat_addr)
4460 self.assertEqual(tcp.sport, external_port)
4461 self.assert_packet_checksums_valid(p)
4463 self.logger.error(ppp("Unexpected or invalid packet:", p))
4467 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4468 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4469 ICMP(type=11) / capture[0][IP])
4470 self.pg1.add_stream(p)
4471 self.pg_enable_capture(self.pg_interfaces)
4473 capture = self.pg0.get_capture(1)
4476 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4478 self.assertEqual(inner.src, self.pg0.remote_ip4)
4479 self.assertEqual(inner[TCPerror].sport, local_port)
4481 self.logger.error(ppp("Unexpected or invalid packet:", p))
4484 # from client to server (no translation)
4485 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4486 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4487 TCP(sport=12346, dport=local_port))
4488 self.pg1.add_stream(p)
4489 self.pg_enable_capture(self.pg_interfaces)
4491 capture = self.pg0.get_capture(1)
4496 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4497 self.assertEqual(tcp.dport, local_port)
4498 self.assert_packet_checksums_valid(p)
4500 self.logger.error(ppp("Unexpected or invalid packet:", p))
4503 # from service back to client (no translation)
4504 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4505 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4506 TCP(sport=local_port, dport=12346))
4507 self.pg0.add_stream(p)
4508 self.pg_enable_capture(self.pg_interfaces)
4510 capture = self.pg1.get_capture(1)
4515 self.assertEqual(ip.src, self.pg0.remote_ip4)
4516 self.assertEqual(tcp.sport, local_port)
4517 self.assert_packet_checksums_valid(p)
4519 self.logger.error(ppp("Unexpected or invalid packet:", p))
4522 def test_output_feature(self):
4523 """ NAT44 interface output feature (in2out postrouting) """
4524 self.vapi.nat44_forwarding_enable_disable(1)
4525 self.nat44_add_address(self.nat_addr)
4526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4528 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4532 pkts = self.create_stream_in(self.pg0, self.pg1)
4533 self.pg0.add_stream(pkts)
4534 self.pg_enable_capture(self.pg_interfaces)
4536 capture = self.pg1.get_capture(len(pkts))
4537 self.verify_capture_out(capture)
4540 pkts = self.create_stream_out(self.pg1)
4541 self.pg1.add_stream(pkts)
4542 self.pg_enable_capture(self.pg_interfaces)
4544 capture = self.pg0.get_capture(len(pkts))
4545 self.verify_capture_in(capture, self.pg0)
4547 def test_multiple_vrf(self):
4548 """ Multiple VRF setup """
4549 external_addr = '1.2.3.4'
4554 self.vapi.nat44_forwarding_enable_disable(1)
4555 self.nat44_add_address(self.nat_addr)
4556 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4557 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4559 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4561 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4562 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4564 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4566 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4567 local_port, external_port, vrf_id=1,
4568 proto=IP_PROTOS.tcp, out2in_only=1)
4569 self.nat44_add_static_mapping(
4570 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4571 local_port=local_port, vrf_id=0, external_port=external_port,
4572 proto=IP_PROTOS.tcp, out2in_only=1)
4574 # from client to service (both VRF1)
4575 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4576 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4577 TCP(sport=12345, dport=external_port))
4578 self.pg6.add_stream(p)
4579 self.pg_enable_capture(self.pg_interfaces)
4581 capture = self.pg5.get_capture(1)
4586 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4587 self.assertEqual(tcp.dport, local_port)
4588 self.assert_packet_checksums_valid(p)
4590 self.logger.error(ppp("Unexpected or invalid packet:", p))
4593 # from service back to client (both VRF1)
4594 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4595 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4596 TCP(sport=local_port, dport=12345))
4597 self.pg5.add_stream(p)
4598 self.pg_enable_capture(self.pg_interfaces)
4600 capture = self.pg6.get_capture(1)
4605 self.assertEqual(ip.src, external_addr)
4606 self.assertEqual(tcp.sport, external_port)
4607 self.assert_packet_checksums_valid(p)
4609 self.logger.error(ppp("Unexpected or invalid packet:", p))
4612 # dynamic NAT from VRF1 to VRF0 (output-feature)
4613 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4614 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4615 TCP(sport=2345, dport=22))
4616 self.pg5.add_stream(p)
4617 self.pg_enable_capture(self.pg_interfaces)
4619 capture = self.pg1.get_capture(1)
4624 self.assertEqual(ip.src, self.nat_addr)
4625 self.assertNotEqual(tcp.sport, 2345)
4626 self.assert_packet_checksums_valid(p)
4629 self.logger.error(ppp("Unexpected or invalid packet:", p))
4632 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4633 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4634 TCP(sport=22, dport=port))
4635 self.pg1.add_stream(p)
4636 self.pg_enable_capture(self.pg_interfaces)
4638 capture = self.pg5.get_capture(1)
4643 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4644 self.assertEqual(tcp.dport, 2345)
4645 self.assert_packet_checksums_valid(p)
4647 self.logger.error(ppp("Unexpected or invalid packet:", p))
4650 # from client VRF1 to service VRF0
4651 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4652 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4653 TCP(sport=12346, dport=external_port))
4654 self.pg6.add_stream(p)
4655 self.pg_enable_capture(self.pg_interfaces)
4657 capture = self.pg0.get_capture(1)
4662 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4663 self.assertEqual(tcp.dport, local_port)
4664 self.assert_packet_checksums_valid(p)
4666 self.logger.error(ppp("Unexpected or invalid packet:", p))
4669 # from service VRF0 back to client VRF1
4670 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4671 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4672 TCP(sport=local_port, dport=12346))
4673 self.pg0.add_stream(p)
4674 self.pg_enable_capture(self.pg_interfaces)
4676 capture = self.pg6.get_capture(1)
4681 self.assertEqual(ip.src, self.pg0.local_ip4)
4682 self.assertEqual(tcp.sport, external_port)
4683 self.assert_packet_checksums_valid(p)
4685 self.logger.error(ppp("Unexpected or invalid packet:", p))
4688 # from client VRF0 to service VRF1
4689 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4690 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4691 TCP(sport=12347, dport=external_port))
4692 self.pg0.add_stream(p)
4693 self.pg_enable_capture(self.pg_interfaces)
4695 capture = self.pg5.get_capture(1)
4700 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4701 self.assertEqual(tcp.dport, local_port)
4702 self.assert_packet_checksums_valid(p)
4704 self.logger.error(ppp("Unexpected or invalid packet:", p))
4707 # from service VRF1 back to client VRF0
4708 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4709 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4710 TCP(sport=local_port, dport=12347))
4711 self.pg5.add_stream(p)
4712 self.pg_enable_capture(self.pg_interfaces)
4714 capture = self.pg0.get_capture(1)
4719 self.assertEqual(ip.src, external_addr)
4720 self.assertEqual(tcp.sport, external_port)
4721 self.assert_packet_checksums_valid(p)
4723 self.logger.error(ppp("Unexpected or invalid packet:", p))
4726 # from client to server (both VRF1, no translation)
4727 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4728 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4729 TCP(sport=12348, dport=local_port))
4730 self.pg6.add_stream(p)
4731 self.pg_enable_capture(self.pg_interfaces)
4733 capture = self.pg5.get_capture(1)
4738 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4739 self.assertEqual(tcp.dport, local_port)
4740 self.assert_packet_checksums_valid(p)
4742 self.logger.error(ppp("Unexpected or invalid packet:", p))
4745 # from server back to client (both VRF1, no translation)
4746 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4747 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4748 TCP(sport=local_port, dport=12348))
4749 self.pg5.add_stream(p)
4750 self.pg_enable_capture(self.pg_interfaces)
4752 capture = self.pg6.get_capture(1)
4757 self.assertEqual(ip.src, self.pg5.remote_ip4)
4758 self.assertEqual(tcp.sport, local_port)
4759 self.assert_packet_checksums_valid(p)
4761 self.logger.error(ppp("Unexpected or invalid packet:", p))
4764 # from client VRF1 to server VRF0 (no translation)
4765 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4766 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4767 TCP(sport=local_port, dport=12349))
4768 self.pg0.add_stream(p)
4769 self.pg_enable_capture(self.pg_interfaces)
4771 capture = self.pg6.get_capture(1)
4776 self.assertEqual(ip.src, self.pg0.remote_ip4)
4777 self.assertEqual(tcp.sport, local_port)
4778 self.assert_packet_checksums_valid(p)
4780 self.logger.error(ppp("Unexpected or invalid packet:", p))
4783 # from server VRF0 back to client VRF1 (no translation)
4784 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4785 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4786 TCP(sport=local_port, dport=12349))
4787 self.pg0.add_stream(p)
4788 self.pg_enable_capture(self.pg_interfaces)
4790 capture = self.pg6.get_capture(1)
4795 self.assertEqual(ip.src, self.pg0.remote_ip4)
4796 self.assertEqual(tcp.sport, local_port)
4797 self.assert_packet_checksums_valid(p)
4799 self.logger.error(ppp("Unexpected or invalid packet:", p))
4802 # from client VRF0 to server VRF1 (no translation)
4803 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4804 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
4805 TCP(sport=12344, dport=local_port))
4806 self.pg0.add_stream(p)
4807 self.pg_enable_capture(self.pg_interfaces)
4809 capture = self.pg5.get_capture(1)
4814 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4815 self.assertEqual(tcp.dport, local_port)
4816 self.assert_packet_checksums_valid(p)
4818 self.logger.error(ppp("Unexpected or invalid packet:", p))
4821 # from server VRF1 back to client VRF0 (no translation)
4822 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4823 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4824 TCP(sport=local_port, dport=12344))
4825 self.pg5.add_stream(p)
4826 self.pg_enable_capture(self.pg_interfaces)
4828 capture = self.pg0.get_capture(1)
4833 self.assertEqual(ip.src, self.pg5.remote_ip4)
4834 self.assertEqual(tcp.sport, local_port)
4835 self.assert_packet_checksums_valid(p)
4837 self.logger.error(ppp("Unexpected or invalid packet:", p))
4841 super(TestNAT44EndpointDependent, self).tearDown()
4842 if not self.vpp_dead:
4843 self.logger.info(self.vapi.cli("show nat44 addresses"))
4844 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4845 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4846 self.logger.info(self.vapi.cli("show nat44 interface address"))
4847 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4848 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
4850 self.vapi.cli("clear logging")
4853 class TestNAT44Out2InDPO(MethodHolder):
4854 """ NAT44 Test Cases using out2in DPO """
4857 def setUpConstants(cls):
4858 super(TestNAT44Out2InDPO, cls).setUpConstants()
4859 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4862 def setUpClass(cls):
4863 super(TestNAT44Out2InDPO, cls).setUpClass()
4864 cls.vapi.cli("set log class nat level debug")
4867 cls.tcp_port_in = 6303
4868 cls.tcp_port_out = 6303
4869 cls.udp_port_in = 6304
4870 cls.udp_port_out = 6304
4871 cls.icmp_id_in = 6305
4872 cls.icmp_id_out = 6305
4873 cls.nat_addr = '10.0.0.3'
4874 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4875 cls.dst_ip4 = '192.168.70.1'
4877 cls.create_pg_interfaces(range(2))
4880 cls.pg0.config_ip4()
4881 cls.pg0.resolve_arp()
4884 cls.pg1.config_ip6()
4885 cls.pg1.resolve_ndp()
4887 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4888 dst_address_length=0,
4889 next_hop_address=cls.pg1.remote_ip6n,
4890 next_hop_sw_if_index=cls.pg1.sw_if_index)
4893 super(TestNAT44Out2InDPO, cls).tearDownClass()
4896 def configure_xlat(self):
4897 self.dst_ip6_pfx = '1:2:3::'
4898 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4900 self.dst_ip6_pfx_len = 96
4901 self.src_ip6_pfx = '4:5:6::'
4902 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4904 self.src_ip6_pfx_len = 96
4905 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4906 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4907 '\x00\x00\x00\x00', 0, is_translation=1,
4910 def test_464xlat_ce(self):
4911 """ Test 464XLAT CE with NAT44 """
4913 self.configure_xlat()
4915 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4916 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4918 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4919 self.dst_ip6_pfx_len)
4920 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4921 self.src_ip6_pfx_len)
4924 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4925 self.pg0.add_stream(pkts)
4926 self.pg_enable_capture(self.pg_interfaces)
4928 capture = self.pg1.get_capture(len(pkts))
4929 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4932 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4934 self.pg1.add_stream(pkts)
4935 self.pg_enable_capture(self.pg_interfaces)
4937 capture = self.pg0.get_capture(len(pkts))
4938 self.verify_capture_in(capture, self.pg0)
4940 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4942 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4943 self.nat_addr_n, is_add=0)
4945 def test_464xlat_ce_no_nat(self):
4946 """ Test 464XLAT CE without NAT44 """
4948 self.configure_xlat()
4950 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4951 self.dst_ip6_pfx_len)
4952 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4953 self.src_ip6_pfx_len)
4955 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4956 self.pg0.add_stream(pkts)
4957 self.pg_enable_capture(self.pg_interfaces)
4959 capture = self.pg1.get_capture(len(pkts))
4960 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4961 nat_ip=out_dst_ip6, same_port=True)
4963 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4964 self.pg1.add_stream(pkts)
4965 self.pg_enable_capture(self.pg_interfaces)
4967 capture = self.pg0.get_capture(len(pkts))
4968 self.verify_capture_in(capture, self.pg0)
4971 class TestDeterministicNAT(MethodHolder):
4972 """ Deterministic NAT Test Cases """
4975 def setUpConstants(cls):
4976 super(TestDeterministicNAT, cls).setUpConstants()
4977 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4980 def setUpClass(cls):
4981 super(TestDeterministicNAT, cls).setUpClass()
4982 cls.vapi.cli("set log class nat level debug")
4985 cls.tcp_port_in = 6303
4986 cls.tcp_external_port = 6303
4987 cls.udp_port_in = 6304
4988 cls.udp_external_port = 6304
4989 cls.icmp_id_in = 6305
4990 cls.nat_addr = '10.0.0.3'
4992 cls.create_pg_interfaces(range(3))
4993 cls.interfaces = list(cls.pg_interfaces)
4995 for i in cls.interfaces:
5000 cls.pg0.generate_remote_hosts(2)
5001 cls.pg0.configure_ipv4_neighbors()
5004 super(TestDeterministicNAT, cls).tearDownClass()
5007 def create_stream_in(self, in_if, out_if, ttl=64):
5009 Create packet stream for inside network
5011 :param in_if: Inside interface
5012 :param out_if: Outside interface
5013 :param ttl: TTL of generated packets
5017 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5018 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5019 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5023 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5024 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5025 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5029 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5030 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5031 ICMP(id=self.icmp_id_in, type='echo-request'))
5036 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5038 Create packet stream for outside network
5040 :param out_if: Outside interface
5041 :param dst_ip: Destination IP address (Default use global NAT address)
5042 :param ttl: TTL of generated packets
5045 dst_ip = self.nat_addr
5048 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5049 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5050 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5054 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5055 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5056 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5060 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5061 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5062 ICMP(id=self.icmp_external_id, type='echo-reply'))
5067 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
5069 Verify captured packets on outside network
5071 :param capture: Captured packets
5072 :param nat_ip: Translated IP address (Default use global NAT address)
5073 :param same_port: Sorce port number is not translated (Default False)
5074 :param packet_num: Expected number of packets (Default 3)
5077 nat_ip = self.nat_addr
5078 self.assertEqual(packet_num, len(capture))
5079 for packet in capture:
5081 self.assertEqual(packet[IP].src, nat_ip)
5082 if packet.haslayer(TCP):
5083 self.tcp_port_out = packet[TCP].sport
5084 elif packet.haslayer(UDP):
5085 self.udp_port_out = packet[UDP].sport
5087 self.icmp_external_id = packet[ICMP].id
5089 self.logger.error(ppp("Unexpected or invalid packet "
5090 "(outside network):", packet))
5093 def verify_ipfix_max_entries_per_user(self, data):
5095 Verify IPFIX maximum entries per user exceeded event
5097 :param data: Decoded IPFIX data records
5099 self.assertEqual(1, len(data))
5102 self.assertEqual(ord(record[230]), 13)
5103 # natQuotaExceededEvent
5104 self.assertEqual('\x03\x00\x00\x00', record[466])
5106 self.assertEqual('\xe8\x03\x00\x00', record[473])
5108 self.assertEqual(self.pg0.remote_ip4n, record[8])
5110 def test_deterministic_mode(self):
5111 """ NAT plugin run deterministic mode """
5112 in_addr = '172.16.255.0'
5113 out_addr = '172.17.255.50'
5114 in_addr_t = '172.16.255.20'
5115 in_addr_n = socket.inet_aton(in_addr)
5116 out_addr_n = socket.inet_aton(out_addr)
5117 in_addr_t_n = socket.inet_aton(in_addr_t)
5121 nat_config = self.vapi.nat_show_config()
5122 self.assertEqual(1, nat_config.deterministic)
5124 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5126 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5127 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5128 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5129 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5131 deterministic_mappings = self.vapi.nat_det_map_dump()
5132 self.assertEqual(len(deterministic_mappings), 1)
5133 dsm = deterministic_mappings[0]
5134 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5135 self.assertEqual(in_plen, dsm.in_plen)
5136 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5137 self.assertEqual(out_plen, dsm.out_plen)
5139 self.clear_nat_det()
5140 deterministic_mappings = self.vapi.nat_det_map_dump()
5141 self.assertEqual(len(deterministic_mappings), 0)
5143 def test_set_timeouts(self):
5144 """ Set deterministic NAT timeouts """
5145 timeouts_before = self.vapi.nat_det_get_timeouts()
5147 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
5148 timeouts_before.tcp_established + 10,
5149 timeouts_before.tcp_transitory + 10,
5150 timeouts_before.icmp + 10)
5152 timeouts_after = self.vapi.nat_det_get_timeouts()
5154 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5155 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5156 self.assertNotEqual(timeouts_before.tcp_established,
5157 timeouts_after.tcp_established)
5158 self.assertNotEqual(timeouts_before.tcp_transitory,
5159 timeouts_after.tcp_transitory)
5161 def test_det_in(self):
5162 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5164 nat_ip = "10.0.0.10"
5166 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5168 socket.inet_aton(nat_ip),
5170 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5171 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5175 pkts = self.create_stream_in(self.pg0, self.pg1)
5176 self.pg0.add_stream(pkts)
5177 self.pg_enable_capture(self.pg_interfaces)
5179 capture = self.pg1.get_capture(len(pkts))
5180 self.verify_capture_out(capture, nat_ip)
5183 pkts = self.create_stream_out(self.pg1, nat_ip)
5184 self.pg1.add_stream(pkts)
5185 self.pg_enable_capture(self.pg_interfaces)
5187 capture = self.pg0.get_capture(len(pkts))
5188 self.verify_capture_in(capture, self.pg0)
5191 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5192 self.assertEqual(len(sessions), 3)
5196 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5197 self.assertEqual(s.in_port, self.tcp_port_in)
5198 self.assertEqual(s.out_port, self.tcp_port_out)
5199 self.assertEqual(s.ext_port, self.tcp_external_port)
5203 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5204 self.assertEqual(s.in_port, self.udp_port_in)
5205 self.assertEqual(s.out_port, self.udp_port_out)
5206 self.assertEqual(s.ext_port, self.udp_external_port)
5210 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5211 self.assertEqual(s.in_port, self.icmp_id_in)
5212 self.assertEqual(s.out_port, self.icmp_external_id)
5214 def test_multiple_users(self):
5215 """ Deterministic NAT multiple users """
5217 nat_ip = "10.0.0.10"
5219 external_port = 6303
5221 host0 = self.pg0.remote_hosts[0]
5222 host1 = self.pg0.remote_hosts[1]
5224 self.vapi.nat_det_add_del_map(host0.ip4n,
5226 socket.inet_aton(nat_ip),
5228 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5229 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5233 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5234 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5235 TCP(sport=port_in, dport=external_port))
5236 self.pg0.add_stream(p)
5237 self.pg_enable_capture(self.pg_interfaces)
5239 capture = self.pg1.get_capture(1)
5244 self.assertEqual(ip.src, nat_ip)
5245 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5246 self.assertEqual(tcp.dport, external_port)
5247 port_out0 = tcp.sport
5249 self.logger.error(ppp("Unexpected or invalid packet:", p))
5253 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5254 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5255 TCP(sport=port_in, dport=external_port))
5256 self.pg0.add_stream(p)
5257 self.pg_enable_capture(self.pg_interfaces)
5259 capture = self.pg1.get_capture(1)
5264 self.assertEqual(ip.src, nat_ip)
5265 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5266 self.assertEqual(tcp.dport, external_port)
5267 port_out1 = tcp.sport
5269 self.logger.error(ppp("Unexpected or invalid packet:", p))
5272 dms = self.vapi.nat_det_map_dump()
5273 self.assertEqual(1, len(dms))
5274 self.assertEqual(2, dms[0].ses_num)
5277 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5278 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5279 TCP(sport=external_port, dport=port_out0))
5280 self.pg1.add_stream(p)
5281 self.pg_enable_capture(self.pg_interfaces)
5283 capture = self.pg0.get_capture(1)
5288 self.assertEqual(ip.src, self.pg1.remote_ip4)
5289 self.assertEqual(ip.dst, host0.ip4)
5290 self.assertEqual(tcp.dport, port_in)
5291 self.assertEqual(tcp.sport, external_port)
5293 self.logger.error(ppp("Unexpected or invalid packet:", p))
5297 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5298 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5299 TCP(sport=external_port, dport=port_out1))
5300 self.pg1.add_stream(p)
5301 self.pg_enable_capture(self.pg_interfaces)
5303 capture = self.pg0.get_capture(1)
5308 self.assertEqual(ip.src, self.pg1.remote_ip4)
5309 self.assertEqual(ip.dst, host1.ip4)
5310 self.assertEqual(tcp.dport, port_in)
5311 self.assertEqual(tcp.sport, external_port)
5313 self.logger.error(ppp("Unexpected or invalid packet", p))
5316 # session close api test
5317 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5319 self.pg1.remote_ip4n,
5321 dms = self.vapi.nat_det_map_dump()
5322 self.assertEqual(dms[0].ses_num, 1)
5324 self.vapi.nat_det_close_session_in(host0.ip4n,
5326 self.pg1.remote_ip4n,
5328 dms = self.vapi.nat_det_map_dump()
5329 self.assertEqual(dms[0].ses_num, 0)
5331 def test_tcp_session_close_detection_in(self):
5332 """ Deterministic NAT TCP session close from inside network """
5333 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5335 socket.inet_aton(self.nat_addr),
5337 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5338 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5341 self.initiate_tcp_session(self.pg0, self.pg1)
5343 # close the session from inside
5345 # FIN packet in -> out
5346 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5347 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5348 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5350 self.pg0.add_stream(p)
5351 self.pg_enable_capture(self.pg_interfaces)
5353 self.pg1.get_capture(1)
5357 # ACK packet out -> in
5358 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5359 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5360 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5364 # FIN packet out -> in
5365 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5366 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5367 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5371 self.pg1.add_stream(pkts)
5372 self.pg_enable_capture(self.pg_interfaces)
5374 self.pg0.get_capture(2)
5376 # ACK packet in -> out
5377 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5378 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5379 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5381 self.pg0.add_stream(p)
5382 self.pg_enable_capture(self.pg_interfaces)
5384 self.pg1.get_capture(1)
5386 # Check if deterministic NAT44 closed the session
5387 dms = self.vapi.nat_det_map_dump()
5388 self.assertEqual(0, dms[0].ses_num)
5390 self.logger.error("TCP session termination failed")
5393 def test_tcp_session_close_detection_out(self):
5394 """ Deterministic NAT TCP session close from outside network """
5395 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5397 socket.inet_aton(self.nat_addr),
5399 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5400 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5403 self.initiate_tcp_session(self.pg0, self.pg1)
5405 # close the session from outside
5407 # FIN packet out -> in
5408 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5409 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5410 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5412 self.pg1.add_stream(p)
5413 self.pg_enable_capture(self.pg_interfaces)
5415 self.pg0.get_capture(1)
5419 # ACK packet in -> out
5420 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5421 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5422 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5426 # ACK packet in -> out
5427 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5428 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5429 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5433 self.pg0.add_stream(pkts)
5434 self.pg_enable_capture(self.pg_interfaces)
5436 self.pg1.get_capture(2)
5438 # ACK packet out -> in
5439 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5440 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5441 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5443 self.pg1.add_stream(p)
5444 self.pg_enable_capture(self.pg_interfaces)
5446 self.pg0.get_capture(1)
5448 # Check if deterministic NAT44 closed the session
5449 dms = self.vapi.nat_det_map_dump()
5450 self.assertEqual(0, dms[0].ses_num)
5452 self.logger.error("TCP session termination failed")
5455 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5456 def test_session_timeout(self):
5457 """ Deterministic NAT session timeouts """
5458 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5460 socket.inet_aton(self.nat_addr),
5462 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5463 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5466 self.initiate_tcp_session(self.pg0, self.pg1)
5467 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
5468 pkts = self.create_stream_in(self.pg0, self.pg1)
5469 self.pg0.add_stream(pkts)
5470 self.pg_enable_capture(self.pg_interfaces)
5472 capture = self.pg1.get_capture(len(pkts))
5475 dms = self.vapi.nat_det_map_dump()
5476 self.assertEqual(0, dms[0].ses_num)
5478 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5479 def test_session_limit_per_user(self):
5480 """ Deterministic NAT maximum sessions per user limit """
5481 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5483 socket.inet_aton(self.nat_addr),
5485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5486 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5488 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5489 src_address=self.pg2.local_ip4n,
5491 template_interval=10)
5492 self.vapi.nat_ipfix()
5495 for port in range(1025, 2025):
5496 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5497 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5498 UDP(sport=port, dport=port))
5501 self.pg0.add_stream(pkts)
5502 self.pg_enable_capture(self.pg_interfaces)
5504 capture = self.pg1.get_capture(len(pkts))
5506 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5507 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5508 UDP(sport=3001, dport=3002))
5509 self.pg0.add_stream(p)
5510 self.pg_enable_capture(self.pg_interfaces)
5512 capture = self.pg1.assert_nothing_captured()
5514 # verify ICMP error packet
5515 capture = self.pg0.get_capture(1)
5517 self.assertTrue(p.haslayer(ICMP))
5519 self.assertEqual(icmp.type, 3)
5520 self.assertEqual(icmp.code, 1)
5521 self.assertTrue(icmp.haslayer(IPerror))
5522 inner_ip = icmp[IPerror]
5523 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5524 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5526 dms = self.vapi.nat_det_map_dump()
5528 self.assertEqual(1000, dms[0].ses_num)
5530 # verify IPFIX logging
5531 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5533 capture = self.pg2.get_capture(2)
5534 ipfix = IPFIXDecoder()
5535 # first load template
5537 self.assertTrue(p.haslayer(IPFIX))
5538 if p.haslayer(Template):
5539 ipfix.add_template(p.getlayer(Template))
5540 # verify events in data set
5542 if p.haslayer(Data):
5543 data = ipfix.decode_data_set(p.getlayer(Set))
5544 self.verify_ipfix_max_entries_per_user(data)
5546 def clear_nat_det(self):
5548 Clear deterministic NAT configuration.
5550 self.vapi.nat_ipfix(enable=0)
5551 self.vapi.nat_det_set_timeouts()
5552 deterministic_mappings = self.vapi.nat_det_map_dump()
5553 for dsm in deterministic_mappings:
5554 self.vapi.nat_det_add_del_map(dsm.in_addr,
5560 interfaces = self.vapi.nat44_interface_dump()
5561 for intf in interfaces:
5562 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5567 super(TestDeterministicNAT, self).tearDown()
5568 if not self.vpp_dead:
5569 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5571 self.vapi.cli("show nat44 deterministic mappings"))
5573 self.vapi.cli("show nat44 deterministic timeouts"))
5575 self.vapi.cli("show nat44 deterministic sessions"))
5576 self.clear_nat_det()
5579 class TestNAT64(MethodHolder):
5580 """ NAT64 Test Cases """
5583 def setUpConstants(cls):
5584 super(TestNAT64, cls).setUpConstants()
5585 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5586 "nat64 st hash buckets 256", "}"])
5589 def setUpClass(cls):
5590 super(TestNAT64, cls).setUpClass()
5593 cls.tcp_port_in = 6303
5594 cls.tcp_port_out = 6303
5595 cls.udp_port_in = 6304
5596 cls.udp_port_out = 6304
5597 cls.icmp_id_in = 6305
5598 cls.icmp_id_out = 6305
5599 cls.nat_addr = '10.0.0.3'
5600 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5602 cls.vrf1_nat_addr = '10.0.10.3'
5603 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5605 cls.ipfix_src_port = 4739
5606 cls.ipfix_domain_id = 1
5608 cls.create_pg_interfaces(range(6))
5609 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5610 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5611 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5613 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5615 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5617 cls.pg0.generate_remote_hosts(2)
5619 for i in cls.ip6_interfaces:
5622 i.configure_ipv6_neighbors()
5624 for i in cls.ip4_interfaces:
5630 cls.pg3.config_ip4()
5631 cls.pg3.resolve_arp()
5632 cls.pg3.config_ip6()
5633 cls.pg3.configure_ipv6_neighbors()
5636 cls.pg5.config_ip6()
5639 super(TestNAT64, cls).tearDownClass()
5642 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5643 """ NAT64 inside interface handles Neighbor Advertisement """
5645 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5648 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5649 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5650 ICMPv6EchoRequest())
5652 self.pg5.add_stream(pkts)
5653 self.pg_enable_capture(self.pg_interfaces)
5656 # Wait for Neighbor Solicitation
5657 capture = self.pg5.get_capture(len(pkts))
5658 self.assertEqual(1, len(capture))
5661 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5662 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
5663 tgt = packet[ICMPv6ND_NS].tgt
5665 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5668 # Send Neighbor Advertisement
5669 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5670 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
5671 ICMPv6ND_NA(tgt=tgt) /
5672 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
5674 self.pg5.add_stream(pkts)
5675 self.pg_enable_capture(self.pg_interfaces)
5678 # Try to send ping again
5680 self.pg5.add_stream(pkts)
5681 self.pg_enable_capture(self.pg_interfaces)
5684 # Wait for ping reply
5685 capture = self.pg5.get_capture(len(pkts))
5686 self.assertEqual(1, len(capture))
5689 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
5690 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
5691 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
5693 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5696 def test_pool(self):
5697 """ Add/delete address to NAT64 pool """
5698 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5700 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5702 addresses = self.vapi.nat64_pool_addr_dump()
5703 self.assertEqual(len(addresses), 1)
5704 self.assertEqual(addresses[0].address, nat_addr)
5706 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5708 addresses = self.vapi.nat64_pool_addr_dump()
5709 self.assertEqual(len(addresses), 0)
5711 def test_interface(self):
5712 """ Enable/disable NAT64 feature on the interface """
5713 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5714 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5716 interfaces = self.vapi.nat64_interface_dump()
5717 self.assertEqual(len(interfaces), 2)
5720 for intf in interfaces:
5721 if intf.sw_if_index == self.pg0.sw_if_index:
5722 self.assertEqual(intf.is_inside, 1)
5724 elif intf.sw_if_index == self.pg1.sw_if_index:
5725 self.assertEqual(intf.is_inside, 0)
5727 self.assertTrue(pg0_found)
5728 self.assertTrue(pg1_found)
5730 features = self.vapi.cli("show interface features pg0")
5731 self.assertNotEqual(features.find('nat64-in2out'), -1)
5732 features = self.vapi.cli("show interface features pg1")
5733 self.assertNotEqual(features.find('nat64-out2in'), -1)
5735 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5736 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5738 interfaces = self.vapi.nat64_interface_dump()
5739 self.assertEqual(len(interfaces), 0)
5741 def test_static_bib(self):
5742 """ Add/delete static BIB entry """
5743 in_addr = socket.inet_pton(socket.AF_INET6,
5744 '2001:db8:85a3::8a2e:370:7334')
5745 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5748 proto = IP_PROTOS.tcp
5750 self.vapi.nat64_add_del_static_bib(in_addr,
5755 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5760 self.assertEqual(bibe.i_addr, in_addr)
5761 self.assertEqual(bibe.o_addr, out_addr)
5762 self.assertEqual(bibe.i_port, in_port)
5763 self.assertEqual(bibe.o_port, out_port)
5764 self.assertEqual(static_bib_num, 1)
5766 self.vapi.nat64_add_del_static_bib(in_addr,
5772 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5777 self.assertEqual(static_bib_num, 0)
5779 def test_set_timeouts(self):
5780 """ Set NAT64 timeouts """
5781 # verify default values
5782 timeouts = self.vapi.nat64_get_timeouts()
5783 self.assertEqual(timeouts.udp, 300)
5784 self.assertEqual(timeouts.icmp, 60)
5785 self.assertEqual(timeouts.tcp_trans, 240)
5786 self.assertEqual(timeouts.tcp_est, 7440)
5787 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5789 # set and verify custom values
5790 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5791 tcp_est=7450, tcp_incoming_syn=10)
5792 timeouts = self.vapi.nat64_get_timeouts()
5793 self.assertEqual(timeouts.udp, 200)
5794 self.assertEqual(timeouts.icmp, 30)
5795 self.assertEqual(timeouts.tcp_trans, 250)
5796 self.assertEqual(timeouts.tcp_est, 7450)
5797 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5799 def test_dynamic(self):
5800 """ NAT64 dynamic translation test """
5801 self.tcp_port_in = 6303
5802 self.udp_port_in = 6304
5803 self.icmp_id_in = 6305
5805 ses_num_start = self.nat64_get_ses_num()
5807 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5809 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5810 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5813 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5814 self.pg0.add_stream(pkts)
5815 self.pg_enable_capture(self.pg_interfaces)
5817 capture = self.pg1.get_capture(len(pkts))
5818 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5819 dst_ip=self.pg1.remote_ip4)
5822 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5823 self.pg1.add_stream(pkts)
5824 self.pg_enable_capture(self.pg_interfaces)
5826 capture = self.pg0.get_capture(len(pkts))
5827 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5828 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5831 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5832 self.pg0.add_stream(pkts)
5833 self.pg_enable_capture(self.pg_interfaces)
5835 capture = self.pg1.get_capture(len(pkts))
5836 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5837 dst_ip=self.pg1.remote_ip4)
5840 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5841 self.pg1.add_stream(pkts)
5842 self.pg_enable_capture(self.pg_interfaces)
5844 capture = self.pg0.get_capture(len(pkts))
5845 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5847 ses_num_end = self.nat64_get_ses_num()
5849 self.assertEqual(ses_num_end - ses_num_start, 3)
5851 # tenant with specific VRF
5852 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5853 self.vrf1_nat_addr_n,
5854 vrf_id=self.vrf1_id)
5855 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5857 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5858 self.pg2.add_stream(pkts)
5859 self.pg_enable_capture(self.pg_interfaces)
5861 capture = self.pg1.get_capture(len(pkts))
5862 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5863 dst_ip=self.pg1.remote_ip4)
5865 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5866 self.pg1.add_stream(pkts)
5867 self.pg_enable_capture(self.pg_interfaces)
5869 capture = self.pg2.get_capture(len(pkts))
5870 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5872 def test_static(self):
5873 """ NAT64 static translation test """
5874 self.tcp_port_in = 60303
5875 self.udp_port_in = 60304
5876 self.icmp_id_in = 60305
5877 self.tcp_port_out = 60303
5878 self.udp_port_out = 60304
5879 self.icmp_id_out = 60305
5881 ses_num_start = self.nat64_get_ses_num()
5883 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5885 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5886 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5888 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5893 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5898 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5905 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5906 self.pg0.add_stream(pkts)
5907 self.pg_enable_capture(self.pg_interfaces)
5909 capture = self.pg1.get_capture(len(pkts))
5910 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5911 dst_ip=self.pg1.remote_ip4, same_port=True)
5914 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5915 self.pg1.add_stream(pkts)
5916 self.pg_enable_capture(self.pg_interfaces)
5918 capture = self.pg0.get_capture(len(pkts))
5919 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5920 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5922 ses_num_end = self.nat64_get_ses_num()
5924 self.assertEqual(ses_num_end - ses_num_start, 3)
5926 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5927 def test_session_timeout(self):
5928 """ NAT64 session timeout """
5929 self.icmp_id_in = 1234
5930 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5932 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5933 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5934 self.vapi.nat64_set_timeouts(icmp=5)
5936 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5937 self.pg0.add_stream(pkts)
5938 self.pg_enable_capture(self.pg_interfaces)
5940 capture = self.pg1.get_capture(len(pkts))
5942 ses_num_before_timeout = self.nat64_get_ses_num()
5946 # ICMP session after timeout
5947 ses_num_after_timeout = self.nat64_get_ses_num()
5948 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5950 def test_icmp_error(self):
5951 """ NAT64 ICMP Error message translation """
5952 self.tcp_port_in = 6303
5953 self.udp_port_in = 6304
5954 self.icmp_id_in = 6305
5956 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5958 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5959 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5961 # send some packets to create sessions
5962 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5963 self.pg0.add_stream(pkts)
5964 self.pg_enable_capture(self.pg_interfaces)
5966 capture_ip4 = self.pg1.get_capture(len(pkts))
5967 self.verify_capture_out(capture_ip4,
5968 nat_ip=self.nat_addr,
5969 dst_ip=self.pg1.remote_ip4)
5971 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5972 self.pg1.add_stream(pkts)
5973 self.pg_enable_capture(self.pg_interfaces)
5975 capture_ip6 = self.pg0.get_capture(len(pkts))
5976 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5977 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5978 self.pg0.remote_ip6)
5981 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5982 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5983 ICMPv6DestUnreach(code=1) /
5984 packet[IPv6] for packet in capture_ip6]
5985 self.pg0.add_stream(pkts)
5986 self.pg_enable_capture(self.pg_interfaces)
5988 capture = self.pg1.get_capture(len(pkts))
5989 for packet in capture:
5991 self.assertEqual(packet[IP].src, self.nat_addr)
5992 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5993 self.assertEqual(packet[ICMP].type, 3)
5994 self.assertEqual(packet[ICMP].code, 13)
5995 inner = packet[IPerror]
5996 self.assertEqual(inner.src, self.pg1.remote_ip4)
5997 self.assertEqual(inner.dst, self.nat_addr)
5998 self.assert_packet_checksums_valid(packet)
5999 if inner.haslayer(TCPerror):
6000 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6001 elif inner.haslayer(UDPerror):
6002 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6004 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6006 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6010 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6011 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6012 ICMP(type=3, code=13) /
6013 packet[IP] for packet in capture_ip4]
6014 self.pg1.add_stream(pkts)
6015 self.pg_enable_capture(self.pg_interfaces)
6017 capture = self.pg0.get_capture(len(pkts))
6018 for packet in capture:
6020 self.assertEqual(packet[IPv6].src, ip.src)
6021 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6022 icmp = packet[ICMPv6DestUnreach]
6023 self.assertEqual(icmp.code, 1)
6024 inner = icmp[IPerror6]
6025 self.assertEqual(inner.src, self.pg0.remote_ip6)
6026 self.assertEqual(inner.dst, ip.src)
6027 self.assert_icmpv6_checksum_valid(packet)
6028 if inner.haslayer(TCPerror):
6029 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6030 elif inner.haslayer(UDPerror):
6031 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6033 self.assertEqual(inner[ICMPv6EchoRequest].id,
6036 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6039 def test_hairpinning(self):
6040 """ NAT64 hairpinning """
6042 client = self.pg0.remote_hosts[0]
6043 server = self.pg0.remote_hosts[1]
6044 server_tcp_in_port = 22
6045 server_tcp_out_port = 4022
6046 server_udp_in_port = 23
6047 server_udp_out_port = 4023
6048 client_tcp_in_port = 1234
6049 client_udp_in_port = 1235
6050 client_tcp_out_port = 0
6051 client_udp_out_port = 0
6052 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6053 nat_addr_ip6 = ip.src
6055 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6057 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6058 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6060 self.vapi.nat64_add_del_static_bib(server.ip6n,
6063 server_tcp_out_port,
6065 self.vapi.nat64_add_del_static_bib(server.ip6n,
6068 server_udp_out_port,
6073 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6074 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6075 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6077 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6078 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6079 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6081 self.pg0.add_stream(pkts)
6082 self.pg_enable_capture(self.pg_interfaces)
6084 capture = self.pg0.get_capture(len(pkts))
6085 for packet in capture:
6087 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6088 self.assertEqual(packet[IPv6].dst, server.ip6)
6089 self.assert_packet_checksums_valid(packet)
6090 if packet.haslayer(TCP):
6091 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6092 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6093 client_tcp_out_port = packet[TCP].sport
6095 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6096 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6097 client_udp_out_port = packet[UDP].sport
6099 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6104 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6105 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6106 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6108 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6109 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6110 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6112 self.pg0.add_stream(pkts)
6113 self.pg_enable_capture(self.pg_interfaces)
6115 capture = self.pg0.get_capture(len(pkts))
6116 for packet in capture:
6118 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6119 self.assertEqual(packet[IPv6].dst, client.ip6)
6120 self.assert_packet_checksums_valid(packet)
6121 if packet.haslayer(TCP):
6122 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6123 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6125 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6126 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6128 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6133 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6134 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6135 ICMPv6DestUnreach(code=1) /
6136 packet[IPv6] for packet in capture]
6137 self.pg0.add_stream(pkts)
6138 self.pg_enable_capture(self.pg_interfaces)
6140 capture = self.pg0.get_capture(len(pkts))
6141 for packet in capture:
6143 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6144 self.assertEqual(packet[IPv6].dst, server.ip6)
6145 icmp = packet[ICMPv6DestUnreach]
6146 self.assertEqual(icmp.code, 1)
6147 inner = icmp[IPerror6]
6148 self.assertEqual(inner.src, server.ip6)
6149 self.assertEqual(inner.dst, nat_addr_ip6)
6150 self.assert_packet_checksums_valid(packet)
6151 if inner.haslayer(TCPerror):
6152 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6153 self.assertEqual(inner[TCPerror].dport,
6154 client_tcp_out_port)
6156 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6157 self.assertEqual(inner[UDPerror].dport,
6158 client_udp_out_port)
6160 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6163 def test_prefix(self):
6164 """ NAT64 Network-Specific Prefix """
6166 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6168 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6169 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6170 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6171 self.vrf1_nat_addr_n,
6172 vrf_id=self.vrf1_id)
6173 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6176 global_pref64 = "2001:db8::"
6177 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6178 global_pref64_len = 32
6179 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6181 prefix = self.vapi.nat64_prefix_dump()
6182 self.assertEqual(len(prefix), 1)
6183 self.assertEqual(prefix[0].prefix, global_pref64_n)
6184 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6185 self.assertEqual(prefix[0].vrf_id, 0)
6187 # Add tenant specific prefix
6188 vrf1_pref64 = "2001:db8:122:300::"
6189 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6190 vrf1_pref64_len = 56
6191 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6193 vrf_id=self.vrf1_id)
6194 prefix = self.vapi.nat64_prefix_dump()
6195 self.assertEqual(len(prefix), 2)
6198 pkts = self.create_stream_in_ip6(self.pg0,
6201 plen=global_pref64_len)
6202 self.pg0.add_stream(pkts)
6203 self.pg_enable_capture(self.pg_interfaces)
6205 capture = self.pg1.get_capture(len(pkts))
6206 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6207 dst_ip=self.pg1.remote_ip4)
6209 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6210 self.pg1.add_stream(pkts)
6211 self.pg_enable_capture(self.pg_interfaces)
6213 capture = self.pg0.get_capture(len(pkts))
6214 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6217 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6219 # Tenant specific prefix
6220 pkts = self.create_stream_in_ip6(self.pg2,
6223 plen=vrf1_pref64_len)
6224 self.pg2.add_stream(pkts)
6225 self.pg_enable_capture(self.pg_interfaces)
6227 capture = self.pg1.get_capture(len(pkts))
6228 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6229 dst_ip=self.pg1.remote_ip4)
6231 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6232 self.pg1.add_stream(pkts)
6233 self.pg_enable_capture(self.pg_interfaces)
6235 capture = self.pg2.get_capture(len(pkts))
6236 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6239 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6241 def test_unknown_proto(self):
6242 """ NAT64 translate packet with unknown protocol """
6244 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6246 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6247 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6248 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6251 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6252 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6253 TCP(sport=self.tcp_port_in, dport=20))
6254 self.pg0.add_stream(p)
6255 self.pg_enable_capture(self.pg_interfaces)
6257 p = self.pg1.get_capture(1)
6259 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6260 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6262 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6263 TCP(sport=1234, dport=1234))
6264 self.pg0.add_stream(p)
6265 self.pg_enable_capture(self.pg_interfaces)
6267 p = self.pg1.get_capture(1)
6270 self.assertEqual(packet[IP].src, self.nat_addr)
6271 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6272 self.assertTrue(packet.haslayer(GRE))
6273 self.assert_packet_checksums_valid(packet)
6275 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6279 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6280 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6282 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6283 TCP(sport=1234, dport=1234))
6284 self.pg1.add_stream(p)
6285 self.pg_enable_capture(self.pg_interfaces)
6287 p = self.pg0.get_capture(1)
6290 self.assertEqual(packet[IPv6].src, remote_ip6)
6291 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6292 self.assertEqual(packet[IPv6].nh, 47)
6294 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6297 def test_hairpinning_unknown_proto(self):
6298 """ NAT64 translate packet with unknown protocol - hairpinning """
6300 client = self.pg0.remote_hosts[0]
6301 server = self.pg0.remote_hosts[1]
6302 server_tcp_in_port = 22
6303 server_tcp_out_port = 4022
6304 client_tcp_in_port = 1234
6305 client_tcp_out_port = 1235
6306 server_nat_ip = "10.0.0.100"
6307 client_nat_ip = "10.0.0.110"
6308 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6309 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6310 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6311 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6313 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6315 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6316 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6318 self.vapi.nat64_add_del_static_bib(server.ip6n,
6321 server_tcp_out_port,
6324 self.vapi.nat64_add_del_static_bib(server.ip6n,
6330 self.vapi.nat64_add_del_static_bib(client.ip6n,
6333 client_tcp_out_port,
6337 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6338 IPv6(src=client.ip6, dst=server_nat_ip6) /
6339 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6340 self.pg0.add_stream(p)
6341 self.pg_enable_capture(self.pg_interfaces)
6343 p = self.pg0.get_capture(1)
6345 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6346 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6348 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6349 TCP(sport=1234, dport=1234))
6350 self.pg0.add_stream(p)
6351 self.pg_enable_capture(self.pg_interfaces)
6353 p = self.pg0.get_capture(1)
6356 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6357 self.assertEqual(packet[IPv6].dst, server.ip6)
6358 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6360 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6364 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6365 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6367 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6368 TCP(sport=1234, dport=1234))
6369 self.pg0.add_stream(p)
6370 self.pg_enable_capture(self.pg_interfaces)
6372 p = self.pg0.get_capture(1)
6375 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6376 self.assertEqual(packet[IPv6].dst, client.ip6)
6377 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6379 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6382 def test_one_armed_nat64(self):
6383 """ One armed NAT64 """
6385 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6389 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6391 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6392 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6395 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6396 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6397 TCP(sport=12345, dport=80))
6398 self.pg3.add_stream(p)
6399 self.pg_enable_capture(self.pg_interfaces)
6401 capture = self.pg3.get_capture(1)
6406 self.assertEqual(ip.src, self.nat_addr)
6407 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6408 self.assertNotEqual(tcp.sport, 12345)
6409 external_port = tcp.sport
6410 self.assertEqual(tcp.dport, 80)
6411 self.assert_packet_checksums_valid(p)
6413 self.logger.error(ppp("Unexpected or invalid packet:", p))
6417 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6418 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6419 TCP(sport=80, dport=external_port))
6420 self.pg3.add_stream(p)
6421 self.pg_enable_capture(self.pg_interfaces)
6423 capture = self.pg3.get_capture(1)
6428 self.assertEqual(ip.src, remote_host_ip6)
6429 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6430 self.assertEqual(tcp.sport, 80)
6431 self.assertEqual(tcp.dport, 12345)
6432 self.assert_packet_checksums_valid(p)
6434 self.logger.error(ppp("Unexpected or invalid packet:", p))
6437 def test_frag_in_order(self):
6438 """ NAT64 translate fragments arriving in order """
6439 self.tcp_port_in = random.randint(1025, 65535)
6441 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6443 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6444 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6446 reass = self.vapi.nat_reass_dump()
6447 reass_n_start = len(reass)
6451 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6452 self.tcp_port_in, 20, data)
6453 self.pg0.add_stream(pkts)
6454 self.pg_enable_capture(self.pg_interfaces)
6456 frags = self.pg1.get_capture(len(pkts))
6457 p = self.reass_frags_and_verify(frags,
6459 self.pg1.remote_ip4)
6460 self.assertEqual(p[TCP].dport, 20)
6461 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6462 self.tcp_port_out = p[TCP].sport
6463 self.assertEqual(data, p[Raw].load)
6466 data = "A" * 4 + "b" * 16 + "C" * 3
6467 pkts = self.create_stream_frag(self.pg1,
6472 self.pg1.add_stream(pkts)
6473 self.pg_enable_capture(self.pg_interfaces)
6475 frags = self.pg0.get_capture(len(pkts))
6476 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6477 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6478 self.assertEqual(p[TCP].sport, 20)
6479 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6480 self.assertEqual(data, p[Raw].load)
6482 reass = self.vapi.nat_reass_dump()
6483 reass_n_end = len(reass)
6485 self.assertEqual(reass_n_end - reass_n_start, 2)
6487 def test_reass_hairpinning(self):
6488 """ NAT64 fragments hairpinning """
6490 server = self.pg0.remote_hosts[1]
6491 server_in_port = random.randint(1025, 65535)
6492 server_out_port = random.randint(1025, 65535)
6493 client_in_port = random.randint(1025, 65535)
6494 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6495 nat_addr_ip6 = ip.src
6497 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6499 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6500 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6502 # add static BIB entry for server
6503 self.vapi.nat64_add_del_static_bib(server.ip6n,
6509 # send packet from host to server
6510 pkts = self.create_stream_frag_ip6(self.pg0,
6515 self.pg0.add_stream(pkts)
6516 self.pg_enable_capture(self.pg_interfaces)
6518 frags = self.pg0.get_capture(len(pkts))
6519 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6520 self.assertNotEqual(p[TCP].sport, client_in_port)
6521 self.assertEqual(p[TCP].dport, server_in_port)
6522 self.assertEqual(data, p[Raw].load)
6524 def test_frag_out_of_order(self):
6525 """ NAT64 translate fragments arriving out of order """
6526 self.tcp_port_in = random.randint(1025, 65535)
6528 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6530 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6531 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6535 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6536 self.tcp_port_in, 20, data)
6538 self.pg0.add_stream(pkts)
6539 self.pg_enable_capture(self.pg_interfaces)
6541 frags = self.pg1.get_capture(len(pkts))
6542 p = self.reass_frags_and_verify(frags,
6544 self.pg1.remote_ip4)
6545 self.assertEqual(p[TCP].dport, 20)
6546 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6547 self.tcp_port_out = p[TCP].sport
6548 self.assertEqual(data, p[Raw].load)
6551 data = "A" * 4 + "B" * 16 + "C" * 3
6552 pkts = self.create_stream_frag(self.pg1,
6558 self.pg1.add_stream(pkts)
6559 self.pg_enable_capture(self.pg_interfaces)
6561 frags = self.pg0.get_capture(len(pkts))
6562 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6563 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6564 self.assertEqual(p[TCP].sport, 20)
6565 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6566 self.assertEqual(data, p[Raw].load)
6568 def test_interface_addr(self):
6569 """ Acquire NAT64 pool addresses from interface """
6570 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6572 # no address in NAT64 pool
6573 adresses = self.vapi.nat44_address_dump()
6574 self.assertEqual(0, len(adresses))
6576 # configure interface address and check NAT64 address pool
6577 self.pg4.config_ip4()
6578 addresses = self.vapi.nat64_pool_addr_dump()
6579 self.assertEqual(len(addresses), 1)
6580 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6582 # remove interface address and check NAT64 address pool
6583 self.pg4.unconfig_ip4()
6584 addresses = self.vapi.nat64_pool_addr_dump()
6585 self.assertEqual(0, len(adresses))
6587 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6588 def test_ipfix_max_bibs_sessions(self):
6589 """ IPFIX logging maximum session and BIB entries exceeded """
6592 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6596 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6598 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6599 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6603 for i in range(0, max_bibs):
6604 src = "fd01:aa::%x" % (i)
6605 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6606 IPv6(src=src, dst=remote_host_ip6) /
6607 TCP(sport=12345, dport=80))
6609 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6610 IPv6(src=src, dst=remote_host_ip6) /
6611 TCP(sport=12345, dport=22))
6613 self.pg0.add_stream(pkts)
6614 self.pg_enable_capture(self.pg_interfaces)
6616 self.pg1.get_capture(max_sessions)
6618 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6619 src_address=self.pg3.local_ip4n,
6621 template_interval=10)
6622 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6623 src_port=self.ipfix_src_port)
6625 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6626 IPv6(src=src, dst=remote_host_ip6) /
6627 TCP(sport=12345, dport=25))
6628 self.pg0.add_stream(p)
6629 self.pg_enable_capture(self.pg_interfaces)
6631 self.pg1.assert_nothing_captured()
6633 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6634 capture = self.pg3.get_capture(9)
6635 ipfix = IPFIXDecoder()
6636 # first load template
6638 self.assertTrue(p.haslayer(IPFIX))
6639 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6640 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6641 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6642 self.assertEqual(p[UDP].dport, 4739)
6643 self.assertEqual(p[IPFIX].observationDomainID,
6644 self.ipfix_domain_id)
6645 if p.haslayer(Template):
6646 ipfix.add_template(p.getlayer(Template))
6647 # verify events in data set
6649 if p.haslayer(Data):
6650 data = ipfix.decode_data_set(p.getlayer(Set))
6651 self.verify_ipfix_max_sessions(data, max_sessions)
6653 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6654 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6655 TCP(sport=12345, dport=80))
6656 self.pg0.add_stream(p)
6657 self.pg_enable_capture(self.pg_interfaces)
6659 self.pg1.assert_nothing_captured()
6661 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6662 capture = self.pg3.get_capture(1)
6663 # verify events in data set
6665 self.assertTrue(p.haslayer(IPFIX))
6666 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6667 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6668 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6669 self.assertEqual(p[UDP].dport, 4739)
6670 self.assertEqual(p[IPFIX].observationDomainID,
6671 self.ipfix_domain_id)
6672 if p.haslayer(Data):
6673 data = ipfix.decode_data_set(p.getlayer(Set))
6674 self.verify_ipfix_max_bibs(data, max_bibs)
6676 def test_ipfix_max_frags(self):
6677 """ IPFIX logging maximum fragments pending reassembly exceeded """
6678 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6680 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6681 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6682 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6683 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6684 src_address=self.pg3.local_ip4n,
6686 template_interval=10)
6687 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6688 src_port=self.ipfix_src_port)
6691 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6692 self.tcp_port_in, 20, data)
6693 self.pg0.add_stream(pkts[-1])
6694 self.pg_enable_capture(self.pg_interfaces)
6696 self.pg1.assert_nothing_captured()
6698 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6699 capture = self.pg3.get_capture(9)
6700 ipfix = IPFIXDecoder()
6701 # first load template
6703 self.assertTrue(p.haslayer(IPFIX))
6704 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6705 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6706 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6707 self.assertEqual(p[UDP].dport, 4739)
6708 self.assertEqual(p[IPFIX].observationDomainID,
6709 self.ipfix_domain_id)
6710 if p.haslayer(Template):
6711 ipfix.add_template(p.getlayer(Template))
6712 # verify events in data set
6714 if p.haslayer(Data):
6715 data = ipfix.decode_data_set(p.getlayer(Set))
6716 self.verify_ipfix_max_fragments_ip6(data, 0,
6717 self.pg0.remote_ip6n)
6719 def test_ipfix_bib_ses(self):
6720 """ IPFIX logging NAT64 BIB/session create and delete events """
6721 self.tcp_port_in = random.randint(1025, 65535)
6722 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6726 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6728 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6729 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6730 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6731 src_address=self.pg3.local_ip4n,
6733 template_interval=10)
6734 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6735 src_port=self.ipfix_src_port)
6738 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6739 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6740 TCP(sport=self.tcp_port_in, dport=25))
6741 self.pg0.add_stream(p)
6742 self.pg_enable_capture(self.pg_interfaces)
6744 p = self.pg1.get_capture(1)
6745 self.tcp_port_out = p[0][TCP].sport
6746 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6747 capture = self.pg3.get_capture(10)
6748 ipfix = IPFIXDecoder()
6749 # first load template
6751 self.assertTrue(p.haslayer(IPFIX))
6752 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6753 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6754 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6755 self.assertEqual(p[UDP].dport, 4739)
6756 self.assertEqual(p[IPFIX].observationDomainID,
6757 self.ipfix_domain_id)
6758 if p.haslayer(Template):
6759 ipfix.add_template(p.getlayer(Template))
6760 # verify events in data set
6762 if p.haslayer(Data):
6763 data = ipfix.decode_data_set(p.getlayer(Set))
6764 if ord(data[0][230]) == 10:
6765 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6766 elif ord(data[0][230]) == 6:
6767 self.verify_ipfix_nat64_ses(data,
6769 self.pg0.remote_ip6n,
6770 self.pg1.remote_ip4,
6773 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6776 self.pg_enable_capture(self.pg_interfaces)
6777 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6780 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6781 capture = self.pg3.get_capture(2)
6782 # verify events in data set
6784 self.assertTrue(p.haslayer(IPFIX))
6785 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6786 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6787 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6788 self.assertEqual(p[UDP].dport, 4739)
6789 self.assertEqual(p[IPFIX].observationDomainID,
6790 self.ipfix_domain_id)
6791 if p.haslayer(Data):
6792 data = ipfix.decode_data_set(p.getlayer(Set))
6793 if ord(data[0][230]) == 11:
6794 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6795 elif ord(data[0][230]) == 7:
6796 self.verify_ipfix_nat64_ses(data,
6798 self.pg0.remote_ip6n,
6799 self.pg1.remote_ip4,
6802 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6804 def nat64_get_ses_num(self):
6806 Return number of active NAT64 sessions.
6808 st = self.vapi.nat64_st_dump()
6811 def clear_nat64(self):
6813 Clear NAT64 configuration.
6815 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6816 domain_id=self.ipfix_domain_id)
6817 self.ipfix_src_port = 4739
6818 self.ipfix_domain_id = 1
6820 self.vapi.nat64_set_timeouts()
6822 interfaces = self.vapi.nat64_interface_dump()
6823 for intf in interfaces:
6824 if intf.is_inside > 1:
6825 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6828 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6832 bib = self.vapi.nat64_bib_dump(255)
6835 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6843 adresses = self.vapi.nat64_pool_addr_dump()
6844 for addr in adresses:
6845 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6850 prefixes = self.vapi.nat64_prefix_dump()
6851 for prefix in prefixes:
6852 self.vapi.nat64_add_del_prefix(prefix.prefix,
6854 vrf_id=prefix.vrf_id,
6858 super(TestNAT64, self).tearDown()
6859 if not self.vpp_dead:
6860 self.logger.info(self.vapi.cli("show nat64 pool"))
6861 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6862 self.logger.info(self.vapi.cli("show nat64 prefix"))
6863 self.logger.info(self.vapi.cli("show nat64 bib all"))
6864 self.logger.info(self.vapi.cli("show nat64 session table all"))
6865 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6869 class TestDSlite(MethodHolder):
6870 """ DS-Lite Test Cases """
6873 def setUpClass(cls):
6874 super(TestDSlite, cls).setUpClass()
6877 cls.nat_addr = '10.0.0.3'
6878 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6880 cls.create_pg_interfaces(range(2))
6882 cls.pg0.config_ip4()
6883 cls.pg0.resolve_arp()
6885 cls.pg1.config_ip6()
6886 cls.pg1.generate_remote_hosts(2)
6887 cls.pg1.configure_ipv6_neighbors()
6890 super(TestDSlite, cls).tearDownClass()
6893 def test_dslite(self):
6894 """ Test DS-Lite """
6895 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6897 aftr_ip4 = '192.0.0.1'
6898 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6899 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6900 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6901 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6904 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6905 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6906 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6907 UDP(sport=20000, dport=10000))
6908 self.pg1.add_stream(p)
6909 self.pg_enable_capture(self.pg_interfaces)
6911 capture = self.pg0.get_capture(1)
6912 capture = capture[0]
6913 self.assertFalse(capture.haslayer(IPv6))
6914 self.assertEqual(capture[IP].src, self.nat_addr)
6915 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6916 self.assertNotEqual(capture[UDP].sport, 20000)
6917 self.assertEqual(capture[UDP].dport, 10000)
6918 self.assert_packet_checksums_valid(capture)
6919 out_port = capture[UDP].sport
6921 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6922 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6923 UDP(sport=10000, dport=out_port))
6924 self.pg0.add_stream(p)
6925 self.pg_enable_capture(self.pg_interfaces)
6927 capture = self.pg1.get_capture(1)
6928 capture = capture[0]
6929 self.assertEqual(capture[IPv6].src, aftr_ip6)
6930 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6931 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6932 self.assertEqual(capture[IP].dst, '192.168.1.1')
6933 self.assertEqual(capture[UDP].sport, 10000)
6934 self.assertEqual(capture[UDP].dport, 20000)
6935 self.assert_packet_checksums_valid(capture)
6938 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6939 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6940 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6941 TCP(sport=20001, dport=10001))
6942 self.pg1.add_stream(p)
6943 self.pg_enable_capture(self.pg_interfaces)
6945 capture = self.pg0.get_capture(1)
6946 capture = capture[0]
6947 self.assertFalse(capture.haslayer(IPv6))
6948 self.assertEqual(capture[IP].src, self.nat_addr)
6949 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6950 self.assertNotEqual(capture[TCP].sport, 20001)
6951 self.assertEqual(capture[TCP].dport, 10001)
6952 self.assert_packet_checksums_valid(capture)
6953 out_port = capture[TCP].sport
6955 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6956 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6957 TCP(sport=10001, dport=out_port))
6958 self.pg0.add_stream(p)
6959 self.pg_enable_capture(self.pg_interfaces)
6961 capture = self.pg1.get_capture(1)
6962 capture = capture[0]
6963 self.assertEqual(capture[IPv6].src, aftr_ip6)
6964 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6965 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6966 self.assertEqual(capture[IP].dst, '192.168.1.1')
6967 self.assertEqual(capture[TCP].sport, 10001)
6968 self.assertEqual(capture[TCP].dport, 20001)
6969 self.assert_packet_checksums_valid(capture)
6972 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6973 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6974 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6975 ICMP(id=4000, type='echo-request'))
6976 self.pg1.add_stream(p)
6977 self.pg_enable_capture(self.pg_interfaces)
6979 capture = self.pg0.get_capture(1)
6980 capture = capture[0]
6981 self.assertFalse(capture.haslayer(IPv6))
6982 self.assertEqual(capture[IP].src, self.nat_addr)
6983 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6984 self.assertNotEqual(capture[ICMP].id, 4000)
6985 self.assert_packet_checksums_valid(capture)
6986 out_id = capture[ICMP].id
6988 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6989 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6990 ICMP(id=out_id, type='echo-reply'))
6991 self.pg0.add_stream(p)
6992 self.pg_enable_capture(self.pg_interfaces)
6994 capture = self.pg1.get_capture(1)
6995 capture = capture[0]
6996 self.assertEqual(capture[IPv6].src, aftr_ip6)
6997 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6998 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6999 self.assertEqual(capture[IP].dst, '192.168.1.1')
7000 self.assertEqual(capture[ICMP].id, 4000)
7001 self.assert_packet_checksums_valid(capture)
7003 # ping DS-Lite AFTR tunnel endpoint address
7004 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7005 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7006 ICMPv6EchoRequest())
7007 self.pg1.add_stream(p)
7008 self.pg_enable_capture(self.pg_interfaces)
7010 capture = self.pg1.get_capture(1)
7011 self.assertEqual(1, len(capture))
7012 capture = capture[0]
7013 self.assertEqual(capture[IPv6].src, aftr_ip6)
7014 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7015 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7018 super(TestDSlite, self).tearDown()
7019 if not self.vpp_dead:
7020 self.logger.info(self.vapi.cli("show dslite pool"))
7022 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7023 self.logger.info(self.vapi.cli("show dslite sessions"))
7026 class TestDSliteCE(MethodHolder):
7027 """ DS-Lite CE Test Cases """
7030 def setUpConstants(cls):
7031 super(TestDSliteCE, cls).setUpConstants()
7032 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7035 def setUpClass(cls):
7036 super(TestDSliteCE, cls).setUpClass()
7039 cls.create_pg_interfaces(range(2))
7041 cls.pg0.config_ip4()
7042 cls.pg0.resolve_arp()
7044 cls.pg1.config_ip6()
7045 cls.pg1.generate_remote_hosts(1)
7046 cls.pg1.configure_ipv6_neighbors()
7049 super(TestDSliteCE, cls).tearDownClass()
7052 def test_dslite_ce(self):
7053 """ Test DS-Lite CE """
7055 b4_ip4 = '192.0.0.2'
7056 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7057 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7058 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7059 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7061 aftr_ip4 = '192.0.0.1'
7062 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7063 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7064 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7065 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7067 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7068 dst_address_length=128,
7069 next_hop_address=self.pg1.remote_ip6n,
7070 next_hop_sw_if_index=self.pg1.sw_if_index,
7074 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7075 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7076 UDP(sport=10000, dport=20000))
7077 self.pg0.add_stream(p)
7078 self.pg_enable_capture(self.pg_interfaces)
7080 capture = self.pg1.get_capture(1)
7081 capture = capture[0]
7082 self.assertEqual(capture[IPv6].src, b4_ip6)
7083 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7084 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7085 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7086 self.assertEqual(capture[UDP].sport, 10000)
7087 self.assertEqual(capture[UDP].dport, 20000)
7088 self.assert_packet_checksums_valid(capture)
7091 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7092 IPv6(dst=b4_ip6, src=aftr_ip6) /
7093 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7094 UDP(sport=20000, dport=10000))
7095 self.pg1.add_stream(p)
7096 self.pg_enable_capture(self.pg_interfaces)
7098 capture = self.pg0.get_capture(1)
7099 capture = capture[0]
7100 self.assertFalse(capture.haslayer(IPv6))
7101 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7102 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7103 self.assertEqual(capture[UDP].sport, 20000)
7104 self.assertEqual(capture[UDP].dport, 10000)
7105 self.assert_packet_checksums_valid(capture)
7107 # ping DS-Lite B4 tunnel endpoint address
7108 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7109 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7110 ICMPv6EchoRequest())
7111 self.pg1.add_stream(p)
7112 self.pg_enable_capture(self.pg_interfaces)
7114 capture = self.pg1.get_capture(1)
7115 self.assertEqual(1, len(capture))
7116 capture = capture[0]
7117 self.assertEqual(capture[IPv6].src, b4_ip6)
7118 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7119 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7122 super(TestDSliteCE, self).tearDown()
7123 if not self.vpp_dead:
7125 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7127 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7130 class TestNAT66(MethodHolder):
7131 """ NAT66 Test Cases """
7134 def setUpClass(cls):
7135 super(TestNAT66, cls).setUpClass()
7138 cls.nat_addr = 'fd01:ff::2'
7139 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7141 cls.create_pg_interfaces(range(2))
7142 cls.interfaces = list(cls.pg_interfaces)
7144 for i in cls.interfaces:
7147 i.configure_ipv6_neighbors()
7150 super(TestNAT66, cls).tearDownClass()
7153 def test_static(self):
7154 """ 1:1 NAT66 test """
7155 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7156 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7157 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7162 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7163 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7166 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7167 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7170 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7171 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7172 ICMPv6EchoRequest())
7174 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7175 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7176 GRE() / IP() / TCP())
7178 self.pg0.add_stream(pkts)
7179 self.pg_enable_capture(self.pg_interfaces)
7181 capture = self.pg1.get_capture(len(pkts))
7182 for packet in capture:
7184 self.assertEqual(packet[IPv6].src, self.nat_addr)
7185 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7186 self.assert_packet_checksums_valid(packet)
7188 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7193 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7194 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7197 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7198 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7201 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7202 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7205 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7206 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7207 GRE() / IP() / TCP())
7209 self.pg1.add_stream(pkts)
7210 self.pg_enable_capture(self.pg_interfaces)
7212 capture = self.pg0.get_capture(len(pkts))
7213 for packet in capture:
7215 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7216 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7217 self.assert_packet_checksums_valid(packet)
7219 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7222 sm = self.vapi.nat66_static_mapping_dump()
7223 self.assertEqual(len(sm), 1)
7224 self.assertEqual(sm[0].total_pkts, 8)
7226 def test_check_no_translate(self):
7227 """ NAT66 translate only when egress interface is outside interface """
7228 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7229 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7230 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7234 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7235 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7237 self.pg0.add_stream([p])
7238 self.pg_enable_capture(self.pg_interfaces)
7240 capture = self.pg1.get_capture(1)
7243 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7244 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7246 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7249 def clear_nat66(self):
7251 Clear NAT66 configuration.
7253 interfaces = self.vapi.nat66_interface_dump()
7254 for intf in interfaces:
7255 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7259 static_mappings = self.vapi.nat66_static_mapping_dump()
7260 for sm in static_mappings:
7261 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7262 sm.external_ip_address,
7267 super(TestNAT66, self).tearDown()
7268 if not self.vpp_dead:
7269 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7270 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7274 if __name__ == '__main__':
7275 unittest.main(testRunner=VppTestRunner)