9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
13 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
29 def clear_nat44(self):
31 Clear NAT44 configuration.
33 if hasattr(self, 'pg7') and hasattr(self, 'pg8'):
34 # I found no elegant way to do this
35 self.vapi.ip_add_del_route(
36 dst_address=self.pg7.remote_ip4n,
37 dst_address_length=32,
38 next_hop_address=self.pg7.remote_ip4n,
39 next_hop_sw_if_index=self.pg7.sw_if_index,
41 self.vapi.ip_add_del_route(
42 dst_address=self.pg8.remote_ip4n,
43 dst_address_length=32,
44 next_hop_address=self.pg8.remote_ip4n,
45 next_hop_sw_if_index=self.pg8.sw_if_index,
48 for intf in [self.pg7, self.pg8]:
49 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
51 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
56 if self.pg7.has_ip4_config:
57 self.pg7.unconfig_ip4()
59 self.vapi.nat44_forwarding_enable_disable(0)
61 interfaces = self.vapi.nat44_interface_addr_dump()
62 for intf in interfaces:
63 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
64 twice_nat=intf.twice_nat,
67 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
68 domain_id=self.ipfix_domain_id)
69 self.ipfix_src_port = 4739
70 self.ipfix_domain_id = 1
72 interfaces = self.vapi.nat44_interface_dump()
73 for intf in interfaces:
74 if intf.is_inside > 1:
75 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
78 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
82 interfaces = self.vapi.nat44_interface_output_feature_dump()
83 for intf in interfaces:
84 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
88 static_mappings = self.vapi.nat44_static_mapping_dump()
89 for sm in static_mappings:
90 self.vapi.nat44_add_del_static_mapping(
92 sm.external_ip_address,
93 local_port=sm.local_port,
94 external_port=sm.external_port,
95 addr_only=sm.addr_only,
98 twice_nat=sm.twice_nat,
99 self_twice_nat=sm.self_twice_nat,
100 out2in_only=sm.out2in_only,
102 external_sw_if_index=sm.external_sw_if_index,
105 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
106 for lb_sm in lb_static_mappings:
107 self.vapi.nat44_add_del_lb_static_mapping(
111 twice_nat=lb_sm.twice_nat,
112 self_twice_nat=lb_sm.self_twice_nat,
113 out2in_only=lb_sm.out2in_only,
119 identity_mappings = self.vapi.nat44_identity_mapping_dump()
120 for id_m in identity_mappings:
121 self.vapi.nat44_add_del_identity_mapping(
122 addr_only=id_m.addr_only,
125 sw_if_index=id_m.sw_if_index,
127 protocol=id_m.protocol,
130 adresses = self.vapi.nat44_address_dump()
131 for addr in adresses:
132 self.vapi.nat44_add_del_address_range(addr.ip_address,
134 twice_nat=addr.twice_nat,
137 self.vapi.nat_set_reass()
138 self.vapi.nat_set_reass(is_ip6=1)
139 self.verify_no_nat44_user()
140 self.vapi.nat_set_timeouts()
141 self.vapi.nat_set_addr_and_port_alloc_alg()
142 self.vapi.nat_set_mss_clamping()
144 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
145 local_port=0, external_port=0, vrf_id=0,
146 is_add=1, external_sw_if_index=0xFFFFFFFF,
147 proto=0, twice_nat=0, self_twice_nat=0,
148 out2in_only=0, tag=""):
150 Add/delete NAT44 static mapping
152 :param local_ip: Local IP address
153 :param external_ip: External IP address
154 :param local_port: Local port number (Optional)
155 :param external_port: External port number (Optional)
156 :param vrf_id: VRF ID (Default 0)
157 :param is_add: 1 if add, 0 if delete (Default add)
158 :param external_sw_if_index: External interface instead of IP address
159 :param proto: IP protocol (Mandatory if port specified)
160 :param twice_nat: 1 if translate external host address and port
161 :param self_twice_nat: 1 if translate external host address and port
162 whenever external host address equals
163 local address of internal host
164 :param out2in_only: if 1 rule is matching only out2in direction
165 :param tag: Opaque string tag
168 if local_port and external_port:
170 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
171 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
172 self.vapi.nat44_add_del_static_mapping(
175 external_sw_if_index,
187 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
189 Add/delete NAT44 address
191 :param ip: IP address
192 :param is_add: 1 if add, 0 if delete (Default add)
193 :param twice_nat: twice NAT address for extenal hosts
195 nat_addr = socket.inet_pton(socket.AF_INET, ip)
196 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
200 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
202 Create packet stream for inside network
204 :param in_if: Inside interface
205 :param out_if: Outside interface
206 :param dst_ip: Destination address
207 :param ttl: TTL of generated packets
210 dst_ip = out_if.remote_ip4
214 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
215 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
216 TCP(sport=self.tcp_port_in, dport=20))
220 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
221 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
222 UDP(sport=self.udp_port_in, dport=20))
226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
227 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
228 ICMP(id=self.icmp_id_in, type='echo-request'))
233 def compose_ip6(self, ip4, pref, plen):
235 Compose IPv4-embedded IPv6 addresses
237 :param ip4: IPv4 address
238 :param pref: IPv6 prefix
239 :param plen: IPv6 prefix length
240 :returns: IPv4-embedded IPv6 addresses
242 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
243 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
258 pref_n[10] = ip4_n[3]
262 pref_n[10] = ip4_n[2]
263 pref_n[11] = ip4_n[3]
266 pref_n[10] = ip4_n[1]
267 pref_n[11] = ip4_n[2]
268 pref_n[12] = ip4_n[3]
270 pref_n[12] = ip4_n[0]
271 pref_n[13] = ip4_n[1]
272 pref_n[14] = ip4_n[2]
273 pref_n[15] = ip4_n[3]
274 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
276 def extract_ip4(self, ip6, plen):
278 Extract IPv4 address embedded in IPv6 addresses
280 :param ip6: IPv6 address
281 :param plen: IPv6 prefix length
282 :returns: extracted IPv4 address
284 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
316 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
318 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
320 Create IPv6 packet stream for inside network
322 :param in_if: Inside interface
323 :param out_if: Outside interface
324 :param ttl: Hop Limit of generated packets
325 :param pref: NAT64 prefix
326 :param plen: NAT64 prefix length
330 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
332 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
335 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
336 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
337 TCP(sport=self.tcp_port_in, dport=20))
341 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
342 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
343 UDP(sport=self.udp_port_in, dport=20))
347 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
348 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
349 ICMPv6EchoRequest(id=self.icmp_id_in))
354 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
355 use_inside_ports=False):
357 Create packet stream for outside network
359 :param out_if: Outside interface
360 :param dst_ip: Destination IP address (Default use global NAT address)
361 :param ttl: TTL of generated packets
362 :param use_inside_ports: Use inside NAT ports as destination ports
363 instead of outside ports
366 dst_ip = self.nat_addr
367 if not use_inside_ports:
368 tcp_port = self.tcp_port_out
369 udp_port = self.udp_port_out
370 icmp_id = self.icmp_id_out
372 tcp_port = self.tcp_port_in
373 udp_port = self.udp_port_in
374 icmp_id = self.icmp_id_in
377 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
378 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
379 TCP(dport=tcp_port, sport=20))
383 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
384 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
385 UDP(dport=udp_port, sport=20))
389 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
390 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
391 ICMP(id=icmp_id, type='echo-reply'))
396 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
398 Create packet stream for outside network
400 :param out_if: Outside interface
401 :param dst_ip: Destination IP address (Default use global NAT address)
402 :param hl: HL of generated packets
406 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
407 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
408 TCP(dport=self.tcp_port_out, sport=20))
412 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
413 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
414 UDP(dport=self.udp_port_out, sport=20))
418 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
419 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
420 ICMPv6EchoReply(id=self.icmp_id_out))
425 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
426 dst_ip=None, is_ip6=False):
428 Verify captured packets on outside network
430 :param capture: Captured packets
431 :param nat_ip: Translated IP address (Default use global NAT address)
432 :param same_port: Sorce port number is not translated (Default False)
433 :param dst_ip: Destination IP address (Default do not verify)
434 :param is_ip6: If L3 protocol is IPv6 (Default False)
438 ICMP46 = ICMPv6EchoRequest
443 nat_ip = self.nat_addr
444 for packet in capture:
447 self.assert_packet_checksums_valid(packet)
448 self.assertEqual(packet[IP46].src, nat_ip)
449 if dst_ip is not None:
450 self.assertEqual(packet[IP46].dst, dst_ip)
451 if packet.haslayer(TCP):
453 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
456 packet[TCP].sport, self.tcp_port_in)
457 self.tcp_port_out = packet[TCP].sport
458 self.assert_packet_checksums_valid(packet)
459 elif packet.haslayer(UDP):
461 self.assertEqual(packet[UDP].sport, self.udp_port_in)
464 packet[UDP].sport, self.udp_port_in)
465 self.udp_port_out = packet[UDP].sport
468 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
470 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
471 self.icmp_id_out = packet[ICMP46].id
472 self.assert_packet_checksums_valid(packet)
474 self.logger.error(ppp("Unexpected or invalid packet "
475 "(outside network):", packet))
478 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
481 Verify captured packets on outside network
483 :param capture: Captured packets
484 :param nat_ip: Translated IP address
485 :param same_port: Sorce port number is not translated (Default False)
486 :param dst_ip: Destination IP address (Default do not verify)
488 return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
491 def verify_capture_in(self, capture, in_if):
493 Verify captured packets on inside network
495 :param capture: Captured packets
496 :param in_if: Inside interface
498 for packet in capture:
500 self.assert_packet_checksums_valid(packet)
501 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
502 if packet.haslayer(TCP):
503 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
504 elif packet.haslayer(UDP):
505 self.assertEqual(packet[UDP].dport, self.udp_port_in)
507 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
509 self.logger.error(ppp("Unexpected or invalid packet "
510 "(inside network):", packet))
513 def verify_capture_in_ip6(self, capture, src_ip, dst_ip):
515 Verify captured IPv6 packets on inside network
517 :param capture: Captured packets
518 :param src_ip: Source IP
519 :param dst_ip: Destination IP address
521 for packet in capture:
523 self.assertEqual(packet[IPv6].src, src_ip)
524 self.assertEqual(packet[IPv6].dst, dst_ip)
525 self.assert_packet_checksums_valid(packet)
526 if packet.haslayer(TCP):
527 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
528 elif packet.haslayer(UDP):
529 self.assertEqual(packet[UDP].dport, self.udp_port_in)
531 self.assertEqual(packet[ICMPv6EchoReply].id,
534 self.logger.error(ppp("Unexpected or invalid packet "
535 "(inside network):", packet))
538 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
540 Verify captured packet that don't have to be translated
542 :param capture: Captured packets
543 :param ingress_if: Ingress interface
544 :param egress_if: Egress interface
546 for packet in capture:
548 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
549 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
550 if packet.haslayer(TCP):
551 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
552 elif packet.haslayer(UDP):
553 self.assertEqual(packet[UDP].sport, self.udp_port_in)
555 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
557 self.logger.error(ppp("Unexpected or invalid packet "
558 "(inside network):", packet))
561 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
564 Verify captured packets with ICMP errors on outside network
566 :param capture: Captured packets
567 :param src_ip: Translated IP address or IP address of VPP
568 (Default use global NAT address)
569 :param icmp_type: Type of error ICMP packet
570 we are expecting (Default 11)
573 src_ip = self.nat_addr
574 for packet in capture:
576 self.assertEqual(packet[IP].src, src_ip)
577 self.assertTrue(packet.haslayer(ICMP))
579 self.assertEqual(icmp.type, icmp_type)
580 self.assertTrue(icmp.haslayer(IPerror))
581 inner_ip = icmp[IPerror]
582 if inner_ip.haslayer(TCPerror):
583 self.assertEqual(inner_ip[TCPerror].dport,
585 elif inner_ip.haslayer(UDPerror):
586 self.assertEqual(inner_ip[UDPerror].dport,
589 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
591 self.logger.error(ppp("Unexpected or invalid packet "
592 "(outside network):", packet))
595 def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
597 Verify captured packets with ICMP errors on inside network
599 :param capture: Captured packets
600 :param in_if: Inside interface
601 :param icmp_type: Type of error ICMP packet
602 we are expecting (Default 11)
604 for packet in capture:
606 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
607 self.assertTrue(packet.haslayer(ICMP))
609 self.assertEqual(icmp.type, icmp_type)
610 self.assertTrue(icmp.haslayer(IPerror))
611 inner_ip = icmp[IPerror]
612 if inner_ip.haslayer(TCPerror):
613 self.assertEqual(inner_ip[TCPerror].sport,
615 elif inner_ip.haslayer(UDPerror):
616 self.assertEqual(inner_ip[UDPerror].sport,
619 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
621 self.logger.error(ppp("Unexpected or invalid packet "
622 "(inside network):", packet))
625 def create_stream_frag(self, src_if, dst, sport, dport, data):
627 Create fragmented packet stream
629 :param src_if: Source interface
630 :param dst: Destination IPv4 address
631 :param sport: Source TCP port
632 :param dport: Destination TCP port
633 :param data: Payload data
636 id = random.randint(0, 65535)
637 p = (IP(src=src_if.remote_ip4, dst=dst) /
638 TCP(sport=sport, dport=dport) /
640 p = p.__class__(str(p))
641 chksum = p['TCP'].chksum
643 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
644 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
645 TCP(sport=sport, dport=dport, chksum=chksum) /
648 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
649 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
650 proto=IP_PROTOS.tcp) /
653 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
654 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
660 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
661 pref=None, plen=0, frag_size=128):
663 Create fragmented packet stream
665 :param src_if: Source interface
666 :param dst: Destination IPv4 address
667 :param sport: Source TCP port
668 :param dport: Destination TCP port
669 :param data: Payload data
670 :param pref: NAT64 prefix
671 :param plen: NAT64 prefix length
672 :param fragsize: size of fragments
676 dst_ip6 = ''.join(['64:ff9b::', dst])
678 dst_ip6 = self.compose_ip6(dst, pref, plen)
680 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
681 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
682 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
683 TCP(sport=sport, dport=dport) /
686 return fragment6(p, frag_size)
688 def reass_frags_and_verify(self, frags, src, dst):
690 Reassemble and verify fragmented packet
692 :param frags: Captured fragments
693 :param src: Source IPv4 address to verify
694 :param dst: Destination IPv4 address to verify
696 :returns: Reassembled IPv4 packet
698 buffer = StringIO.StringIO()
700 self.assertEqual(p[IP].src, src)
701 self.assertEqual(p[IP].dst, dst)
702 self.assert_ip_checksum_valid(p)
703 buffer.seek(p[IP].frag * 8)
704 buffer.write(p[IP].payload)
705 ip = frags[0].getlayer(IP)
706 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
707 proto=frags[0][IP].proto)
708 if ip.proto == IP_PROTOS.tcp:
709 p = (ip / TCP(buffer.getvalue()))
710 self.assert_tcp_checksum_valid(p)
711 elif ip.proto == IP_PROTOS.udp:
712 p = (ip / UDP(buffer.getvalue()))
715 def reass_frags_and_verify_ip6(self, frags, src, dst):
717 Reassemble and verify fragmented packet
719 :param frags: Captured fragments
720 :param src: Source IPv6 address to verify
721 :param dst: Destination IPv6 address to verify
723 :returns: Reassembled IPv6 packet
725 buffer = StringIO.StringIO()
727 self.assertEqual(p[IPv6].src, src)
728 self.assertEqual(p[IPv6].dst, dst)
729 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
730 buffer.write(p[IPv6ExtHdrFragment].payload)
731 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
732 nh=frags[0][IPv6ExtHdrFragment].nh)
733 if ip.nh == IP_PROTOS.tcp:
734 p = (ip / TCP(buffer.getvalue()))
735 elif ip.nh == IP_PROTOS.udp:
736 p = (ip / UDP(buffer.getvalue()))
737 self.assert_packet_checksums_valid(p)
740 def initiate_tcp_session(self, in_if, out_if):
742 Initiates TCP session
744 :param in_if: Inside interface
745 :param out_if: Outside interface
749 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
750 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
751 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
754 self.pg_enable_capture(self.pg_interfaces)
756 capture = out_if.get_capture(1)
758 self.tcp_port_out = p[TCP].sport
760 # SYN + ACK packet out->in
761 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
762 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
763 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
766 self.pg_enable_capture(self.pg_interfaces)
771 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
772 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
773 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
776 self.pg_enable_capture(self.pg_interfaces)
778 out_if.get_capture(1)
781 self.logger.error("TCP 3 way handshake failed")
784 def verify_ipfix_nat44_ses(self, data):
786 Verify IPFIX NAT44 session create/delete event
788 :param data: Decoded IPFIX data records
790 nat44_ses_create_num = 0
791 nat44_ses_delete_num = 0
792 self.assertEqual(6, len(data))
795 self.assertIn(ord(record[230]), [4, 5])
796 if ord(record[230]) == 4:
797 nat44_ses_create_num += 1
799 nat44_ses_delete_num += 1
801 self.assertEqual(self.pg0.remote_ip4n, record[8])
802 # postNATSourceIPv4Address
803 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
806 self.assertEqual(struct.pack("!I", 0), record[234])
807 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
808 if IP_PROTOS.icmp == ord(record[4]):
809 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
810 self.assertEqual(struct.pack("!H", self.icmp_id_out),
812 elif IP_PROTOS.tcp == ord(record[4]):
813 self.assertEqual(struct.pack("!H", self.tcp_port_in),
815 self.assertEqual(struct.pack("!H", self.tcp_port_out),
817 elif IP_PROTOS.udp == ord(record[4]):
818 self.assertEqual(struct.pack("!H", self.udp_port_in),
820 self.assertEqual(struct.pack("!H", self.udp_port_out),
823 self.fail("Invalid protocol")
824 self.assertEqual(3, nat44_ses_create_num)
825 self.assertEqual(3, nat44_ses_delete_num)
827 def verify_ipfix_addr_exhausted(self, data):
829 Verify IPFIX NAT addresses event
831 :param data: Decoded IPFIX data records
833 self.assertEqual(1, len(data))
836 self.assertEqual(ord(record[230]), 3)
838 self.assertEqual(struct.pack("!I", 0), record[283])
840 def verify_ipfix_max_sessions(self, data, limit):
842 Verify IPFIX maximum session entries exceeded event
844 :param data: Decoded IPFIX data records
845 :param limit: Number of maximum session entries that can be created.
847 self.assertEqual(1, len(data))
850 self.assertEqual(ord(record[230]), 13)
851 # natQuotaExceededEvent
852 self.assertEqual(struct.pack("I", 1), record[466])
854 self.assertEqual(struct.pack("I", limit), record[471])
856 def verify_ipfix_max_bibs(self, data, limit):
858 Verify IPFIX maximum BIB entries exceeded event
860 :param data: Decoded IPFIX data records
861 :param limit: Number of maximum BIB entries that can be created.
863 self.assertEqual(1, len(data))
866 self.assertEqual(ord(record[230]), 13)
867 # natQuotaExceededEvent
868 self.assertEqual(struct.pack("I", 2), record[466])
870 self.assertEqual(struct.pack("I", limit), record[472])
872 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
874 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
876 :param data: Decoded IPFIX data records
877 :param limit: Number of maximum fragments pending reassembly
878 :param src_addr: IPv6 source address
880 self.assertEqual(1, len(data))
883 self.assertEqual(ord(record[230]), 13)
884 # natQuotaExceededEvent
885 self.assertEqual(struct.pack("I", 5), record[466])
886 # maxFragmentsPendingReassembly
887 self.assertEqual(struct.pack("I", limit), record[475])
889 self.assertEqual(src_addr, record[27])
891 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
893 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
895 :param data: Decoded IPFIX data records
896 :param limit: Number of maximum fragments pending reassembly
897 :param src_addr: IPv4 source address
899 self.assertEqual(1, len(data))
902 self.assertEqual(ord(record[230]), 13)
903 # natQuotaExceededEvent
904 self.assertEqual(struct.pack("I", 5), record[466])
905 # maxFragmentsPendingReassembly
906 self.assertEqual(struct.pack("I", limit), record[475])
908 self.assertEqual(src_addr, record[8])
910 def verify_ipfix_bib(self, data, is_create, src_addr):
912 Verify IPFIX NAT64 BIB create and delete events
914 :param data: Decoded IPFIX data records
915 :param is_create: Create event if nonzero value otherwise delete event
916 :param src_addr: IPv6 source address
918 self.assertEqual(1, len(data))
922 self.assertEqual(ord(record[230]), 10)
924 self.assertEqual(ord(record[230]), 11)
926 self.assertEqual(src_addr, record[27])
927 # postNATSourceIPv4Address
928 self.assertEqual(self.nat_addr_n, record[225])
930 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
932 self.assertEqual(struct.pack("!I", 0), record[234])
933 # sourceTransportPort
934 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
935 # postNAPTSourceTransportPort
936 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
938 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
941 Verify IPFIX NAT64 session create and delete events
943 :param data: Decoded IPFIX data records
944 :param is_create: Create event if nonzero value otherwise delete event
945 :param src_addr: IPv6 source address
946 :param dst_addr: IPv4 destination address
947 :param dst_port: destination TCP port
949 self.assertEqual(1, len(data))
953 self.assertEqual(ord(record[230]), 6)
955 self.assertEqual(ord(record[230]), 7)
957 self.assertEqual(src_addr, record[27])
958 # destinationIPv6Address
959 self.assertEqual(socket.inet_pton(socket.AF_INET6,
960 self.compose_ip6(dst_addr,
964 # postNATSourceIPv4Address
965 self.assertEqual(self.nat_addr_n, record[225])
966 # postNATDestinationIPv4Address
967 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
970 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
972 self.assertEqual(struct.pack("!I", 0), record[234])
973 # sourceTransportPort
974 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
975 # postNAPTSourceTransportPort
976 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
977 # destinationTransportPort
978 self.assertEqual(struct.pack("!H", dst_port), record[11])
979 # postNAPTDestinationTransportPort
980 self.assertEqual(struct.pack("!H", dst_port), record[228])
982 def verify_no_nat44_user(self):
983 """ Verify that there is no NAT44 user """
984 users = self.vapi.nat44_user_dump()
985 self.assertEqual(len(users), 0)
987 def verify_ipfix_max_entries_per_user(self, data, limit, src_addr):
989 Verify IPFIX maximum entries per user exceeded event
991 :param data: Decoded IPFIX data records
992 :param limit: Number of maximum entries per user
993 :param src_addr: IPv4 source address
995 self.assertEqual(1, len(data))
998 self.assertEqual(ord(record[230]), 13)
999 # natQuotaExceededEvent
1000 self.assertEqual(struct.pack("I", 3), record[466])
1002 self.assertEqual(struct.pack("I", limit), record[473])
1004 self.assertEqual(src_addr, record[8])
1006 def verify_mss_value(self, pkt, mss):
1008 Verify TCP MSS value
1013 if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
1014 raise TypeError("Not a TCP/IP packet")
1016 for option in pkt[TCP].options:
1017 if option[0] == 'MSS':
1018 self.assertEqual(option[1], mss)
1019 self.assert_tcp_checksum_valid(pkt)
1022 class TestNAT44(MethodHolder):
1023 """ NAT44 Test Cases """
1026 def setUpClass(cls):
1027 super(TestNAT44, cls).setUpClass()
1028 cls.vapi.cli("set log class nat level debug")
1031 cls.tcp_port_in = 6303
1032 cls.tcp_port_out = 6303
1033 cls.udp_port_in = 6304
1034 cls.udp_port_out = 6304
1035 cls.icmp_id_in = 6305
1036 cls.icmp_id_out = 6305
1037 cls.nat_addr = '10.0.0.3'
1038 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
1039 cls.ipfix_src_port = 4739
1040 cls.ipfix_domain_id = 1
1041 cls.tcp_external_port = 80
1043 cls.create_pg_interfaces(range(10))
1044 cls.interfaces = list(cls.pg_interfaces[0:4])
1046 for i in cls.interfaces:
1051 cls.pg0.generate_remote_hosts(3)
1052 cls.pg0.configure_ipv4_neighbors()
1054 cls.pg1.generate_remote_hosts(1)
1055 cls.pg1.configure_ipv4_neighbors()
1057 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
1058 cls.vapi.ip_table_add_del(10, is_add=1)
1059 cls.vapi.ip_table_add_del(20, is_add=1)
1061 cls.pg4._local_ip4 = "172.16.255.1"
1062 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1063 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
1064 cls.pg4.set_table_ip4(10)
1065 cls.pg5._local_ip4 = "172.17.255.3"
1066 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1067 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
1068 cls.pg5.set_table_ip4(10)
1069 cls.pg6._local_ip4 = "172.16.255.1"
1070 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
1071 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
1072 cls.pg6.set_table_ip4(20)
1073 for i in cls.overlapping_interfaces:
1081 cls.pg9.generate_remote_hosts(2)
1082 cls.pg9.config_ip4()
1083 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1084 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1088 cls.pg9.resolve_arp()
1089 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1090 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1091 cls.pg9.resolve_arp()
1094 super(TestNAT44, cls).tearDownClass()
1097 def test_dynamic(self):
1098 """ NAT44 dynamic translation test """
1100 self.nat44_add_address(self.nat_addr)
1101 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1102 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1106 pkts = self.create_stream_in(self.pg0, self.pg1)
1107 self.pg0.add_stream(pkts)
1108 self.pg_enable_capture(self.pg_interfaces)
1110 capture = self.pg1.get_capture(len(pkts))
1111 self.verify_capture_out(capture)
1114 pkts = self.create_stream_out(self.pg1)
1115 self.pg1.add_stream(pkts)
1116 self.pg_enable_capture(self.pg_interfaces)
1118 capture = self.pg0.get_capture(len(pkts))
1119 self.verify_capture_in(capture, self.pg0)
1121 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1122 """ NAT44 handling of client packets with TTL=1 """
1124 self.nat44_add_address(self.nat_addr)
1125 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1126 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1129 # Client side - generate traffic
1130 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1131 self.pg0.add_stream(pkts)
1132 self.pg_enable_capture(self.pg_interfaces)
1135 # Client side - verify ICMP type 11 packets
1136 capture = self.pg0.get_capture(len(pkts))
1137 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1139 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1140 """ NAT44 handling of server packets with TTL=1 """
1142 self.nat44_add_address(self.nat_addr)
1143 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1144 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1147 # Client side - create sessions
1148 pkts = self.create_stream_in(self.pg0, self.pg1)
1149 self.pg0.add_stream(pkts)
1150 self.pg_enable_capture(self.pg_interfaces)
1153 # Server side - generate traffic
1154 capture = self.pg1.get_capture(len(pkts))
1155 self.verify_capture_out(capture)
1156 pkts = self.create_stream_out(self.pg1, ttl=1)
1157 self.pg1.add_stream(pkts)
1158 self.pg_enable_capture(self.pg_interfaces)
1161 # Server side - verify ICMP type 11 packets
1162 capture = self.pg1.get_capture(len(pkts))
1163 self.verify_capture_out_with_icmp_errors(capture,
1164 src_ip=self.pg1.local_ip4)
1166 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1167 """ NAT44 handling of error responses to client packets with TTL=2 """
1169 self.nat44_add_address(self.nat_addr)
1170 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1171 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1174 # Client side - generate traffic
1175 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1176 self.pg0.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1180 # Server side - simulate ICMP type 11 response
1181 capture = self.pg1.get_capture(len(pkts))
1182 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1183 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1184 ICMP(type=11) / packet[IP] for packet in capture]
1185 self.pg1.add_stream(pkts)
1186 self.pg_enable_capture(self.pg_interfaces)
1189 # Client side - verify ICMP type 11 packets
1190 capture = self.pg0.get_capture(len(pkts))
1191 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1193 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1194 """ NAT44 handling of error responses to server packets with TTL=2 """
1196 self.nat44_add_address(self.nat_addr)
1197 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1198 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1201 # Client side - create sessions
1202 pkts = self.create_stream_in(self.pg0, self.pg1)
1203 self.pg0.add_stream(pkts)
1204 self.pg_enable_capture(self.pg_interfaces)
1207 # Server side - generate traffic
1208 capture = self.pg1.get_capture(len(pkts))
1209 self.verify_capture_out(capture)
1210 pkts = self.create_stream_out(self.pg1, ttl=2)
1211 self.pg1.add_stream(pkts)
1212 self.pg_enable_capture(self.pg_interfaces)
1215 # Client side - simulate ICMP type 11 response
1216 capture = self.pg0.get_capture(len(pkts))
1217 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1218 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1219 ICMP(type=11) / packet[IP] for packet in capture]
1220 self.pg0.add_stream(pkts)
1221 self.pg_enable_capture(self.pg_interfaces)
1224 # Server side - verify ICMP type 11 packets
1225 capture = self.pg1.get_capture(len(pkts))
1226 self.verify_capture_out_with_icmp_errors(capture)
1228 def test_ping_out_interface_from_outside(self):
1229 """ Ping NAT44 out interface from outside network """
1231 self.nat44_add_address(self.nat_addr)
1232 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1233 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1236 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1237 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1238 ICMP(id=self.icmp_id_out, type='echo-request'))
1240 self.pg1.add_stream(pkts)
1241 self.pg_enable_capture(self.pg_interfaces)
1243 capture = self.pg1.get_capture(len(pkts))
1246 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1247 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1248 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1249 self.assertEqual(packet[ICMP].type, 0) # echo reply
1251 self.logger.error(ppp("Unexpected or invalid packet "
1252 "(outside network):", packet))
1255 def test_ping_internal_host_from_outside(self):
1256 """ Ping internal host from outside network """
1258 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1259 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1260 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1264 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1265 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1266 ICMP(id=self.icmp_id_out, type='echo-request'))
1267 self.pg1.add_stream(pkt)
1268 self.pg_enable_capture(self.pg_interfaces)
1270 capture = self.pg0.get_capture(1)
1271 self.verify_capture_in(capture, self.pg0)
1272 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1275 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1276 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1277 ICMP(id=self.icmp_id_in, type='echo-reply'))
1278 self.pg0.add_stream(pkt)
1279 self.pg_enable_capture(self.pg_interfaces)
1281 capture = self.pg1.get_capture(1)
1282 self.verify_capture_out(capture, same_port=True)
1283 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1285 def test_forwarding(self):
1286 """ NAT44 forwarding test """
1288 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1289 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1291 self.vapi.nat44_forwarding_enable_disable(1)
1293 real_ip = self.pg0.remote_ip4n
1294 alias_ip = self.nat_addr_n
1295 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1296 external_ip=alias_ip)
1299 # static mapping match
1301 pkts = self.create_stream_out(self.pg1)
1302 self.pg1.add_stream(pkts)
1303 self.pg_enable_capture(self.pg_interfaces)
1305 capture = self.pg0.get_capture(len(pkts))
1306 self.verify_capture_in(capture, self.pg0)
1308 pkts = self.create_stream_in(self.pg0, self.pg1)
1309 self.pg0.add_stream(pkts)
1310 self.pg_enable_capture(self.pg_interfaces)
1312 capture = self.pg1.get_capture(len(pkts))
1313 self.verify_capture_out(capture, same_port=True)
1315 # no static mapping match
1317 host0 = self.pg0.remote_hosts[0]
1318 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1320 pkts = self.create_stream_out(self.pg1,
1321 dst_ip=self.pg0.remote_ip4,
1322 use_inside_ports=True)
1323 self.pg1.add_stream(pkts)
1324 self.pg_enable_capture(self.pg_interfaces)
1326 capture = self.pg0.get_capture(len(pkts))
1327 self.verify_capture_in(capture, self.pg0)
1329 pkts = self.create_stream_in(self.pg0, self.pg1)
1330 self.pg0.add_stream(pkts)
1331 self.pg_enable_capture(self.pg_interfaces)
1333 capture = self.pg1.get_capture(len(pkts))
1334 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1337 self.pg0.remote_hosts[0] = host0
1340 self.vapi.nat44_forwarding_enable_disable(0)
1341 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1342 external_ip=alias_ip,
1345 def test_static_in(self):
1346 """ 1:1 NAT initialized from inside network """
1348 nat_ip = "10.0.0.10"
1349 self.tcp_port_out = 6303
1350 self.udp_port_out = 6304
1351 self.icmp_id_out = 6305
1353 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1354 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1355 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1357 sm = self.vapi.nat44_static_mapping_dump()
1358 self.assertEqual(len(sm), 1)
1359 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1360 self.assertEqual(sm[0].protocol, 0)
1361 self.assertEqual(sm[0].local_port, 0)
1362 self.assertEqual(sm[0].external_port, 0)
1365 pkts = self.create_stream_in(self.pg0, self.pg1)
1366 self.pg0.add_stream(pkts)
1367 self.pg_enable_capture(self.pg_interfaces)
1369 capture = self.pg1.get_capture(len(pkts))
1370 self.verify_capture_out(capture, nat_ip, True)
1373 pkts = self.create_stream_out(self.pg1, nat_ip)
1374 self.pg1.add_stream(pkts)
1375 self.pg_enable_capture(self.pg_interfaces)
1377 capture = self.pg0.get_capture(len(pkts))
1378 self.verify_capture_in(capture, self.pg0)
1380 def test_static_out(self):
1381 """ 1:1 NAT initialized from outside network """
1383 nat_ip = "10.0.0.20"
1384 self.tcp_port_out = 6303
1385 self.udp_port_out = 6304
1386 self.icmp_id_out = 6305
1389 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1390 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1391 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1393 sm = self.vapi.nat44_static_mapping_dump()
1394 self.assertEqual(len(sm), 1)
1395 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1398 pkts = self.create_stream_out(self.pg1, nat_ip)
1399 self.pg1.add_stream(pkts)
1400 self.pg_enable_capture(self.pg_interfaces)
1402 capture = self.pg0.get_capture(len(pkts))
1403 self.verify_capture_in(capture, self.pg0)
1406 pkts = self.create_stream_in(self.pg0, self.pg1)
1407 self.pg0.add_stream(pkts)
1408 self.pg_enable_capture(self.pg_interfaces)
1410 capture = self.pg1.get_capture(len(pkts))
1411 self.verify_capture_out(capture, nat_ip, True)
1413 def test_static_with_port_in(self):
1414 """ 1:1 NAPT initialized from inside network """
1416 self.tcp_port_out = 3606
1417 self.udp_port_out = 3607
1418 self.icmp_id_out = 3608
1420 self.nat44_add_address(self.nat_addr)
1421 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1422 self.tcp_port_in, self.tcp_port_out,
1423 proto=IP_PROTOS.tcp)
1424 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1425 self.udp_port_in, self.udp_port_out,
1426 proto=IP_PROTOS.udp)
1427 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1428 self.icmp_id_in, self.icmp_id_out,
1429 proto=IP_PROTOS.icmp)
1430 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1431 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1435 pkts = self.create_stream_in(self.pg0, self.pg1)
1436 self.pg0.add_stream(pkts)
1437 self.pg_enable_capture(self.pg_interfaces)
1439 capture = self.pg1.get_capture(len(pkts))
1440 self.verify_capture_out(capture)
1443 pkts = self.create_stream_out(self.pg1)
1444 self.pg1.add_stream(pkts)
1445 self.pg_enable_capture(self.pg_interfaces)
1447 capture = self.pg0.get_capture(len(pkts))
1448 self.verify_capture_in(capture, self.pg0)
1450 def test_static_with_port_out(self):
1451 """ 1:1 NAPT initialized from outside network """
1453 self.tcp_port_out = 30606
1454 self.udp_port_out = 30607
1455 self.icmp_id_out = 30608
1457 self.nat44_add_address(self.nat_addr)
1458 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1459 self.tcp_port_in, self.tcp_port_out,
1460 proto=IP_PROTOS.tcp)
1461 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1462 self.udp_port_in, self.udp_port_out,
1463 proto=IP_PROTOS.udp)
1464 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1465 self.icmp_id_in, self.icmp_id_out,
1466 proto=IP_PROTOS.icmp)
1467 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1468 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1472 pkts = self.create_stream_out(self.pg1)
1473 self.pg1.add_stream(pkts)
1474 self.pg_enable_capture(self.pg_interfaces)
1476 capture = self.pg0.get_capture(len(pkts))
1477 self.verify_capture_in(capture, self.pg0)
1480 pkts = self.create_stream_in(self.pg0, self.pg1)
1481 self.pg0.add_stream(pkts)
1482 self.pg_enable_capture(self.pg_interfaces)
1484 capture = self.pg1.get_capture(len(pkts))
1485 self.verify_capture_out(capture)
1487 def test_static_vrf_aware(self):
1488 """ 1:1 NAT VRF awareness """
1490 nat_ip1 = "10.0.0.30"
1491 nat_ip2 = "10.0.0.40"
1492 self.tcp_port_out = 6303
1493 self.udp_port_out = 6304
1494 self.icmp_id_out = 6305
1496 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1498 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1500 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1502 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1503 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1505 # inside interface VRF match NAT44 static mapping VRF
1506 pkts = self.create_stream_in(self.pg4, self.pg3)
1507 self.pg4.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg3.get_capture(len(pkts))
1511 self.verify_capture_out(capture, nat_ip1, True)
1513 # inside interface VRF don't match NAT44 static mapping VRF (packets
1515 pkts = self.create_stream_in(self.pg0, self.pg3)
1516 self.pg0.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 self.pg3.assert_nothing_captured()
1521 def test_dynamic_to_static(self):
1522 """ Switch from dynamic translation to 1:1NAT """
1523 nat_ip = "10.0.0.10"
1524 self.tcp_port_out = 6303
1525 self.udp_port_out = 6304
1526 self.icmp_id_out = 6305
1528 self.nat44_add_address(self.nat_addr)
1529 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1530 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1534 pkts = self.create_stream_in(self.pg0, self.pg1)
1535 self.pg0.add_stream(pkts)
1536 self.pg_enable_capture(self.pg_interfaces)
1538 capture = self.pg1.get_capture(len(pkts))
1539 self.verify_capture_out(capture)
1542 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1543 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1544 self.assertEqual(len(sessions), 0)
1545 pkts = self.create_stream_in(self.pg0, self.pg1)
1546 self.pg0.add_stream(pkts)
1547 self.pg_enable_capture(self.pg_interfaces)
1549 capture = self.pg1.get_capture(len(pkts))
1550 self.verify_capture_out(capture, nat_ip, True)
1552 def test_identity_nat(self):
1553 """ Identity NAT """
1555 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1556 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1557 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1560 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1561 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1562 TCP(sport=12345, dport=56789))
1563 self.pg1.add_stream(p)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg0.get_capture(1)
1571 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1572 self.assertEqual(ip.src, self.pg1.remote_ip4)
1573 self.assertEqual(tcp.dport, 56789)
1574 self.assertEqual(tcp.sport, 12345)
1575 self.assert_packet_checksums_valid(p)
1577 self.logger.error(ppp("Unexpected or invalid packet:", p))
1580 def test_multiple_inside_interfaces(self):
1581 """ NAT44 multiple non-overlapping address space inside interfaces """
1583 self.nat44_add_address(self.nat_addr)
1584 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1585 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1586 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1589 # between two NAT44 inside interfaces (no translation)
1590 pkts = self.create_stream_in(self.pg0, self.pg1)
1591 self.pg0.add_stream(pkts)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg1.get_capture(len(pkts))
1595 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1597 # from NAT44 inside to interface without NAT44 feature (no translation)
1598 pkts = self.create_stream_in(self.pg0, self.pg2)
1599 self.pg0.add_stream(pkts)
1600 self.pg_enable_capture(self.pg_interfaces)
1602 capture = self.pg2.get_capture(len(pkts))
1603 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1605 # in2out 1st interface
1606 pkts = self.create_stream_in(self.pg0, self.pg3)
1607 self.pg0.add_stream(pkts)
1608 self.pg_enable_capture(self.pg_interfaces)
1610 capture = self.pg3.get_capture(len(pkts))
1611 self.verify_capture_out(capture)
1613 # out2in 1st interface
1614 pkts = self.create_stream_out(self.pg3)
1615 self.pg3.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg0.get_capture(len(pkts))
1619 self.verify_capture_in(capture, self.pg0)
1621 # in2out 2nd interface
1622 pkts = self.create_stream_in(self.pg1, self.pg3)
1623 self.pg1.add_stream(pkts)
1624 self.pg_enable_capture(self.pg_interfaces)
1626 capture = self.pg3.get_capture(len(pkts))
1627 self.verify_capture_out(capture)
1629 # out2in 2nd interface
1630 pkts = self.create_stream_out(self.pg3)
1631 self.pg3.add_stream(pkts)
1632 self.pg_enable_capture(self.pg_interfaces)
1634 capture = self.pg1.get_capture(len(pkts))
1635 self.verify_capture_in(capture, self.pg1)
1637 def test_inside_overlapping_interfaces(self):
1638 """ NAT44 multiple inside interfaces with overlapping address space """
1640 static_nat_ip = "10.0.0.10"
1641 self.nat44_add_address(self.nat_addr)
1642 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1644 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1645 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1646 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1647 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1650 # between NAT44 inside interfaces with same VRF (no translation)
1651 pkts = self.create_stream_in(self.pg4, self.pg5)
1652 self.pg4.add_stream(pkts)
1653 self.pg_enable_capture(self.pg_interfaces)
1655 capture = self.pg5.get_capture(len(pkts))
1656 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1658 # between NAT44 inside interfaces with different VRF (hairpinning)
1659 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1660 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1661 TCP(sport=1234, dport=5678))
1662 self.pg4.add_stream(p)
1663 self.pg_enable_capture(self.pg_interfaces)
1665 capture = self.pg6.get_capture(1)
1670 self.assertEqual(ip.src, self.nat_addr)
1671 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1672 self.assertNotEqual(tcp.sport, 1234)
1673 self.assertEqual(tcp.dport, 5678)
1675 self.logger.error(ppp("Unexpected or invalid packet:", p))
1678 # in2out 1st interface
1679 pkts = self.create_stream_in(self.pg4, self.pg3)
1680 self.pg4.add_stream(pkts)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg3.get_capture(len(pkts))
1684 self.verify_capture_out(capture)
1686 # out2in 1st interface
1687 pkts = self.create_stream_out(self.pg3)
1688 self.pg3.add_stream(pkts)
1689 self.pg_enable_capture(self.pg_interfaces)
1691 capture = self.pg4.get_capture(len(pkts))
1692 self.verify_capture_in(capture, self.pg4)
1694 # in2out 2nd interface
1695 pkts = self.create_stream_in(self.pg5, self.pg3)
1696 self.pg5.add_stream(pkts)
1697 self.pg_enable_capture(self.pg_interfaces)
1699 capture = self.pg3.get_capture(len(pkts))
1700 self.verify_capture_out(capture)
1702 # out2in 2nd interface
1703 pkts = self.create_stream_out(self.pg3)
1704 self.pg3.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg5.get_capture(len(pkts))
1708 self.verify_capture_in(capture, self.pg5)
1711 addresses = self.vapi.nat44_address_dump()
1712 self.assertEqual(len(addresses), 1)
1713 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1714 self.assertEqual(len(sessions), 3)
1715 for session in sessions:
1716 self.assertFalse(session.is_static)
1717 self.assertEqual(session.inside_ip_address[0:4],
1718 self.pg5.remote_ip4n)
1719 self.assertEqual(session.outside_ip_address,
1720 addresses[0].ip_address)
1721 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1722 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1723 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1724 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1725 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1726 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1727 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1728 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1729 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1731 # in2out 3rd interface
1732 pkts = self.create_stream_in(self.pg6, self.pg3)
1733 self.pg6.add_stream(pkts)
1734 self.pg_enable_capture(self.pg_interfaces)
1736 capture = self.pg3.get_capture(len(pkts))
1737 self.verify_capture_out(capture, static_nat_ip, True)
1739 # out2in 3rd interface
1740 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1741 self.pg3.add_stream(pkts)
1742 self.pg_enable_capture(self.pg_interfaces)
1744 capture = self.pg6.get_capture(len(pkts))
1745 self.verify_capture_in(capture, self.pg6)
1747 # general user and session dump verifications
1748 users = self.vapi.nat44_user_dump()
1749 self.assertTrue(len(users) >= 3)
1750 addresses = self.vapi.nat44_address_dump()
1751 self.assertEqual(len(addresses), 1)
1753 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1755 for session in sessions:
1756 self.assertEqual(user.ip_address, session.inside_ip_address)
1757 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1758 self.assertTrue(session.protocol in
1759 [IP_PROTOS.tcp, IP_PROTOS.udp,
1761 self.assertFalse(session.ext_host_valid)
1764 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1765 self.assertTrue(len(sessions) >= 4)
1766 for session in sessions:
1767 self.assertFalse(session.is_static)
1768 self.assertEqual(session.inside_ip_address[0:4],
1769 self.pg4.remote_ip4n)
1770 self.assertEqual(session.outside_ip_address,
1771 addresses[0].ip_address)
1774 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1775 self.assertTrue(len(sessions) >= 3)
1776 for session in sessions:
1777 self.assertTrue(session.is_static)
1778 self.assertEqual(session.inside_ip_address[0:4],
1779 self.pg6.remote_ip4n)
1780 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1781 map(int, static_nat_ip.split('.')))
1782 self.assertTrue(session.inside_port in
1783 [self.tcp_port_in, self.udp_port_in,
1786 def test_hairpinning(self):
1787 """ NAT44 hairpinning - 1:1 NAPT """
1789 host = self.pg0.remote_hosts[0]
1790 server = self.pg0.remote_hosts[1]
1793 server_in_port = 5678
1794 server_out_port = 8765
1796 self.nat44_add_address(self.nat_addr)
1797 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1798 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1800 # add static mapping for server
1801 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1802 server_in_port, server_out_port,
1803 proto=IP_PROTOS.tcp)
1805 # send packet from host to server
1806 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1807 IP(src=host.ip4, dst=self.nat_addr) /
1808 TCP(sport=host_in_port, dport=server_out_port))
1809 self.pg0.add_stream(p)
1810 self.pg_enable_capture(self.pg_interfaces)
1812 capture = self.pg0.get_capture(1)
1817 self.assertEqual(ip.src, self.nat_addr)
1818 self.assertEqual(ip.dst, server.ip4)
1819 self.assertNotEqual(tcp.sport, host_in_port)
1820 self.assertEqual(tcp.dport, server_in_port)
1821 self.assert_packet_checksums_valid(p)
1822 host_out_port = tcp.sport
1824 self.logger.error(ppp("Unexpected or invalid packet:", p))
1827 # send reply from server to host
1828 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1829 IP(src=server.ip4, dst=self.nat_addr) /
1830 TCP(sport=server_in_port, dport=host_out_port))
1831 self.pg0.add_stream(p)
1832 self.pg_enable_capture(self.pg_interfaces)
1834 capture = self.pg0.get_capture(1)
1839 self.assertEqual(ip.src, self.nat_addr)
1840 self.assertEqual(ip.dst, host.ip4)
1841 self.assertEqual(tcp.sport, server_out_port)
1842 self.assertEqual(tcp.dport, host_in_port)
1843 self.assert_packet_checksums_valid(p)
1845 self.logger.error(ppp("Unexpected or invalid packet:", p))
1848 def test_hairpinning2(self):
1849 """ NAT44 hairpinning - 1:1 NAT"""
1851 server1_nat_ip = "10.0.0.10"
1852 server2_nat_ip = "10.0.0.11"
1853 host = self.pg0.remote_hosts[0]
1854 server1 = self.pg0.remote_hosts[1]
1855 server2 = self.pg0.remote_hosts[2]
1856 server_tcp_port = 22
1857 server_udp_port = 20
1859 self.nat44_add_address(self.nat_addr)
1860 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1861 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1864 # add static mapping for servers
1865 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1866 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1870 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1871 IP(src=host.ip4, dst=server1_nat_ip) /
1872 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1874 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1875 IP(src=host.ip4, dst=server1_nat_ip) /
1876 UDP(sport=self.udp_port_in, dport=server_udp_port))
1878 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1879 IP(src=host.ip4, dst=server1_nat_ip) /
1880 ICMP(id=self.icmp_id_in, type='echo-request'))
1882 self.pg0.add_stream(pkts)
1883 self.pg_enable_capture(self.pg_interfaces)
1885 capture = self.pg0.get_capture(len(pkts))
1886 for packet in capture:
1888 self.assertEqual(packet[IP].src, self.nat_addr)
1889 self.assertEqual(packet[IP].dst, server1.ip4)
1890 if packet.haslayer(TCP):
1891 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1892 self.assertEqual(packet[TCP].dport, server_tcp_port)
1893 self.tcp_port_out = packet[TCP].sport
1894 self.assert_packet_checksums_valid(packet)
1895 elif packet.haslayer(UDP):
1896 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1897 self.assertEqual(packet[UDP].dport, server_udp_port)
1898 self.udp_port_out = packet[UDP].sport
1900 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1901 self.icmp_id_out = packet[ICMP].id
1903 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1908 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1909 IP(src=server1.ip4, dst=self.nat_addr) /
1910 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1912 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1913 IP(src=server1.ip4, dst=self.nat_addr) /
1914 UDP(sport=server_udp_port, dport=self.udp_port_out))
1916 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1917 IP(src=server1.ip4, dst=self.nat_addr) /
1918 ICMP(id=self.icmp_id_out, type='echo-reply'))
1920 self.pg0.add_stream(pkts)
1921 self.pg_enable_capture(self.pg_interfaces)
1923 capture = self.pg0.get_capture(len(pkts))
1924 for packet in capture:
1926 self.assertEqual(packet[IP].src, server1_nat_ip)
1927 self.assertEqual(packet[IP].dst, host.ip4)
1928 if packet.haslayer(TCP):
1929 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1930 self.assertEqual(packet[TCP].sport, server_tcp_port)
1931 self.assert_packet_checksums_valid(packet)
1932 elif packet.haslayer(UDP):
1933 self.assertEqual(packet[UDP].dport, self.udp_port_in)
1934 self.assertEqual(packet[UDP].sport, server_udp_port)
1936 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1938 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1941 # server2 to server1
1943 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1944 IP(src=server2.ip4, dst=server1_nat_ip) /
1945 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1947 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1948 IP(src=server2.ip4, dst=server1_nat_ip) /
1949 UDP(sport=self.udp_port_in, dport=server_udp_port))
1951 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1952 IP(src=server2.ip4, dst=server1_nat_ip) /
1953 ICMP(id=self.icmp_id_in, type='echo-request'))
1955 self.pg0.add_stream(pkts)
1956 self.pg_enable_capture(self.pg_interfaces)
1958 capture = self.pg0.get_capture(len(pkts))
1959 for packet in capture:
1961 self.assertEqual(packet[IP].src, server2_nat_ip)
1962 self.assertEqual(packet[IP].dst, server1.ip4)
1963 if packet.haslayer(TCP):
1964 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1965 self.assertEqual(packet[TCP].dport, server_tcp_port)
1966 self.tcp_port_out = packet[TCP].sport
1967 self.assert_packet_checksums_valid(packet)
1968 elif packet.haslayer(UDP):
1969 self.assertEqual(packet[UDP].sport, self.udp_port_in)
1970 self.assertEqual(packet[UDP].dport, server_udp_port)
1971 self.udp_port_out = packet[UDP].sport
1973 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1974 self.icmp_id_out = packet[ICMP].id
1976 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1979 # server1 to server2
1981 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1982 IP(src=server1.ip4, dst=server2_nat_ip) /
1983 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1985 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1986 IP(src=server1.ip4, dst=server2_nat_ip) /
1987 UDP(sport=server_udp_port, dport=self.udp_port_out))
1989 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1990 IP(src=server1.ip4, dst=server2_nat_ip) /
1991 ICMP(id=self.icmp_id_out, type='echo-reply'))
1993 self.pg0.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg0.get_capture(len(pkts))
1997 for packet in capture:
1999 self.assertEqual(packet[IP].src, server1_nat_ip)
2000 self.assertEqual(packet[IP].dst, server2.ip4)
2001 if packet.haslayer(TCP):
2002 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2003 self.assertEqual(packet[TCP].sport, server_tcp_port)
2004 self.assert_packet_checksums_valid(packet)
2005 elif packet.haslayer(UDP):
2006 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2007 self.assertEqual(packet[UDP].sport, server_udp_port)
2009 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2011 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2014 def test_max_translations_per_user(self):
2015 """ MAX translations per user - recycle the least recently used """
2017 self.nat44_add_address(self.nat_addr)
2018 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2019 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2022 # get maximum number of translations per user
2023 nat44_config = self.vapi.nat_show_config()
2025 # send more than maximum number of translations per user packets
2026 pkts_num = nat44_config.max_translations_per_user + 5
2028 for port in range(0, pkts_num):
2029 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2030 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2031 TCP(sport=1025 + port))
2033 self.pg0.add_stream(pkts)
2034 self.pg_enable_capture(self.pg_interfaces)
2037 # verify number of translated packet
2038 self.pg1.get_capture(pkts_num)
2040 users = self.vapi.nat44_user_dump()
2042 if user.ip_address == self.pg0.remote_ip4n:
2043 self.assertEqual(user.nsessions,
2044 nat44_config.max_translations_per_user)
2045 self.assertEqual(user.nstaticsessions, 0)
2048 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2050 proto=IP_PROTOS.tcp)
2051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2052 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2053 TCP(sport=tcp_port))
2054 self.pg0.add_stream(p)
2055 self.pg_enable_capture(self.pg_interfaces)
2057 self.pg1.get_capture(1)
2058 users = self.vapi.nat44_user_dump()
2060 if user.ip_address == self.pg0.remote_ip4n:
2061 self.assertEqual(user.nsessions,
2062 nat44_config.max_translations_per_user - 1)
2063 self.assertEqual(user.nstaticsessions, 1)
2065 def test_interface_addr(self):
2066 """ Acquire NAT44 addresses from interface """
2067 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2069 # no address in NAT pool
2070 adresses = self.vapi.nat44_address_dump()
2071 self.assertEqual(0, len(adresses))
2073 # configure interface address and check NAT address pool
2074 self.pg7.config_ip4()
2075 adresses = self.vapi.nat44_address_dump()
2076 self.assertEqual(1, len(adresses))
2077 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2079 # remove interface address and check NAT address pool
2080 self.pg7.unconfig_ip4()
2081 adresses = self.vapi.nat44_address_dump()
2082 self.assertEqual(0, len(adresses))
2084 def test_interface_addr_static_mapping(self):
2085 """ Static mapping with addresses from interface """
2088 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2089 self.nat44_add_static_mapping(
2091 external_sw_if_index=self.pg7.sw_if_index,
2094 # static mappings with external interface
2095 static_mappings = self.vapi.nat44_static_mapping_dump()
2096 self.assertEqual(1, len(static_mappings))
2097 self.assertEqual(self.pg7.sw_if_index,
2098 static_mappings[0].external_sw_if_index)
2099 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2101 # configure interface address and check static mappings
2102 self.pg7.config_ip4()
2103 static_mappings = self.vapi.nat44_static_mapping_dump()
2104 self.assertEqual(2, len(static_mappings))
2106 for sm in static_mappings:
2107 if sm.external_sw_if_index == 0xFFFFFFFF:
2108 self.assertEqual(sm.external_ip_address[0:4],
2109 self.pg7.local_ip4n)
2110 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2112 self.assertTrue(resolved)
2114 # remove interface address and check static mappings
2115 self.pg7.unconfig_ip4()
2116 static_mappings = self.vapi.nat44_static_mapping_dump()
2117 self.assertEqual(1, len(static_mappings))
2118 self.assertEqual(self.pg7.sw_if_index,
2119 static_mappings[0].external_sw_if_index)
2120 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2122 # configure interface address again and check static mappings
2123 self.pg7.config_ip4()
2124 static_mappings = self.vapi.nat44_static_mapping_dump()
2125 self.assertEqual(2, len(static_mappings))
2127 for sm in static_mappings:
2128 if sm.external_sw_if_index == 0xFFFFFFFF:
2129 self.assertEqual(sm.external_ip_address[0:4],
2130 self.pg7.local_ip4n)
2131 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2133 self.assertTrue(resolved)
2135 # remove static mapping
2136 self.nat44_add_static_mapping(
2138 external_sw_if_index=self.pg7.sw_if_index,
2141 static_mappings = self.vapi.nat44_static_mapping_dump()
2142 self.assertEqual(0, len(static_mappings))
2144 def test_interface_addr_identity_nat(self):
2145 """ Identity NAT with addresses from interface """
2148 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2149 self.vapi.nat44_add_del_identity_mapping(
2150 sw_if_index=self.pg7.sw_if_index,
2152 protocol=IP_PROTOS.tcp,
2155 # identity mappings with external interface
2156 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2157 self.assertEqual(1, len(identity_mappings))
2158 self.assertEqual(self.pg7.sw_if_index,
2159 identity_mappings[0].sw_if_index)
2161 # configure interface address and check identity mappings
2162 self.pg7.config_ip4()
2163 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2165 self.assertEqual(2, len(identity_mappings))
2166 for sm in identity_mappings:
2167 if sm.sw_if_index == 0xFFFFFFFF:
2168 self.assertEqual(identity_mappings[0].ip_address,
2169 self.pg7.local_ip4n)
2170 self.assertEqual(port, identity_mappings[0].port)
2171 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2173 self.assertTrue(resolved)
2175 # remove interface address and check identity mappings
2176 self.pg7.unconfig_ip4()
2177 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2178 self.assertEqual(1, len(identity_mappings))
2179 self.assertEqual(self.pg7.sw_if_index,
2180 identity_mappings[0].sw_if_index)
2182 def test_ipfix_nat44_sess(self):
2183 """ IPFIX logging NAT44 session created/delted """
2184 self.ipfix_domain_id = 10
2185 self.ipfix_src_port = 20202
2186 colector_port = 30303
2187 bind_layers(UDP, IPFIX, dport=30303)
2188 self.nat44_add_address(self.nat_addr)
2189 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2190 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2192 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2193 src_address=self.pg3.local_ip4n,
2195 template_interval=10,
2196 collector_port=colector_port)
2197 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2198 src_port=self.ipfix_src_port)
2200 pkts = self.create_stream_in(self.pg0, self.pg1)
2201 self.pg0.add_stream(pkts)
2202 self.pg_enable_capture(self.pg_interfaces)
2204 capture = self.pg1.get_capture(len(pkts))
2205 self.verify_capture_out(capture)
2206 self.nat44_add_address(self.nat_addr, is_add=0)
2207 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2208 capture = self.pg3.get_capture(9)
2209 ipfix = IPFIXDecoder()
2210 # first load template
2212 self.assertTrue(p.haslayer(IPFIX))
2213 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2214 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2215 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2216 self.assertEqual(p[UDP].dport, colector_port)
2217 self.assertEqual(p[IPFIX].observationDomainID,
2218 self.ipfix_domain_id)
2219 if p.haslayer(Template):
2220 ipfix.add_template(p.getlayer(Template))
2221 # verify events in data set
2223 if p.haslayer(Data):
2224 data = ipfix.decode_data_set(p.getlayer(Set))
2225 self.verify_ipfix_nat44_ses(data)
2227 def test_ipfix_addr_exhausted(self):
2228 """ IPFIX logging NAT addresses exhausted """
2229 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2230 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2232 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2233 src_address=self.pg3.local_ip4n,
2235 template_interval=10)
2236 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2237 src_port=self.ipfix_src_port)
2239 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2240 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2242 self.pg0.add_stream(p)
2243 self.pg_enable_capture(self.pg_interfaces)
2245 self.pg1.assert_nothing_captured()
2247 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2248 capture = self.pg3.get_capture(9)
2249 ipfix = IPFIXDecoder()
2250 # first load template
2252 self.assertTrue(p.haslayer(IPFIX))
2253 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2254 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2255 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2256 self.assertEqual(p[UDP].dport, 4739)
2257 self.assertEqual(p[IPFIX].observationDomainID,
2258 self.ipfix_domain_id)
2259 if p.haslayer(Template):
2260 ipfix.add_template(p.getlayer(Template))
2261 # verify events in data set
2263 if p.haslayer(Data):
2264 data = ipfix.decode_data_set(p.getlayer(Set))
2265 self.verify_ipfix_addr_exhausted(data)
2267 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2268 def test_ipfix_max_sessions(self):
2269 """ IPFIX logging maximum session entries exceeded """
2270 self.nat44_add_address(self.nat_addr)
2271 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2272 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2275 nat44_config = self.vapi.nat_show_config()
2276 max_sessions = 10 * nat44_config.translation_buckets
2279 for i in range(0, max_sessions):
2280 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2281 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2282 IP(src=src, dst=self.pg1.remote_ip4) /
2285 self.pg0.add_stream(pkts)
2286 self.pg_enable_capture(self.pg_interfaces)
2289 self.pg1.get_capture(max_sessions)
2290 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2291 src_address=self.pg3.local_ip4n,
2293 template_interval=10)
2294 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2295 src_port=self.ipfix_src_port)
2297 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2300 self.pg0.add_stream(p)
2301 self.pg_enable_capture(self.pg_interfaces)
2303 self.pg1.assert_nothing_captured()
2305 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2306 capture = self.pg3.get_capture(9)
2307 ipfix = IPFIXDecoder()
2308 # first load template
2310 self.assertTrue(p.haslayer(IPFIX))
2311 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2312 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2313 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2314 self.assertEqual(p[UDP].dport, 4739)
2315 self.assertEqual(p[IPFIX].observationDomainID,
2316 self.ipfix_domain_id)
2317 if p.haslayer(Template):
2318 ipfix.add_template(p.getlayer(Template))
2319 # verify events in data set
2321 if p.haslayer(Data):
2322 data = ipfix.decode_data_set(p.getlayer(Set))
2323 self.verify_ipfix_max_sessions(data, max_sessions)
2325 def test_pool_addr_fib(self):
2326 """ NAT44 add pool addresses to FIB """
2327 static_addr = '10.0.0.10'
2328 self.nat44_add_address(self.nat_addr)
2329 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2330 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2332 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2335 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2336 ARP(op=ARP.who_has, pdst=self.nat_addr,
2337 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2338 self.pg1.add_stream(p)
2339 self.pg_enable_capture(self.pg_interfaces)
2341 capture = self.pg1.get_capture(1)
2342 self.assertTrue(capture[0].haslayer(ARP))
2343 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2346 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2347 ARP(op=ARP.who_has, pdst=static_addr,
2348 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2349 self.pg1.add_stream(p)
2350 self.pg_enable_capture(self.pg_interfaces)
2352 capture = self.pg1.get_capture(1)
2353 self.assertTrue(capture[0].haslayer(ARP))
2354 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2356 # send ARP to non-NAT44 interface
2357 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2358 ARP(op=ARP.who_has, pdst=self.nat_addr,
2359 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2360 self.pg2.add_stream(p)
2361 self.pg_enable_capture(self.pg_interfaces)
2363 self.pg1.assert_nothing_captured()
2365 # remove addresses and verify
2366 self.nat44_add_address(self.nat_addr, is_add=0)
2367 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2370 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2371 ARP(op=ARP.who_has, pdst=self.nat_addr,
2372 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2373 self.pg1.add_stream(p)
2374 self.pg_enable_capture(self.pg_interfaces)
2376 self.pg1.assert_nothing_captured()
2378 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2379 ARP(op=ARP.who_has, pdst=static_addr,
2380 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2381 self.pg1.add_stream(p)
2382 self.pg_enable_capture(self.pg_interfaces)
2384 self.pg1.assert_nothing_captured()
2386 def test_vrf_mode(self):
2387 """ NAT44 tenant VRF aware address pool mode """
2391 nat_ip1 = "10.0.0.10"
2392 nat_ip2 = "10.0.0.11"
2394 self.pg0.unconfig_ip4()
2395 self.pg1.unconfig_ip4()
2396 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2397 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2398 self.pg0.set_table_ip4(vrf_id1)
2399 self.pg1.set_table_ip4(vrf_id2)
2400 self.pg0.config_ip4()
2401 self.pg1.config_ip4()
2402 self.pg0.resolve_arp()
2403 self.pg1.resolve_arp()
2405 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2406 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2407 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2408 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2409 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2414 pkts = self.create_stream_in(self.pg0, self.pg2)
2415 self.pg0.add_stream(pkts)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 capture = self.pg2.get_capture(len(pkts))
2419 self.verify_capture_out(capture, nat_ip1)
2422 pkts = self.create_stream_in(self.pg1, self.pg2)
2423 self.pg1.add_stream(pkts)
2424 self.pg_enable_capture(self.pg_interfaces)
2426 capture = self.pg2.get_capture(len(pkts))
2427 self.verify_capture_out(capture, nat_ip2)
2430 self.pg0.unconfig_ip4()
2431 self.pg1.unconfig_ip4()
2432 self.pg0.set_table_ip4(0)
2433 self.pg1.set_table_ip4(0)
2434 self.pg0.config_ip4()
2435 self.pg1.config_ip4()
2436 self.pg0.resolve_arp()
2437 self.pg1.resolve_arp()
2438 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2439 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2441 def test_vrf_feature_independent(self):
2442 """ NAT44 tenant VRF independent address pool mode """
2444 nat_ip1 = "10.0.0.10"
2445 nat_ip2 = "10.0.0.11"
2447 self.nat44_add_address(nat_ip1)
2448 self.nat44_add_address(nat_ip2, vrf_id=99)
2449 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2450 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2451 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2455 pkts = self.create_stream_in(self.pg0, self.pg2)
2456 self.pg0.add_stream(pkts)
2457 self.pg_enable_capture(self.pg_interfaces)
2459 capture = self.pg2.get_capture(len(pkts))
2460 self.verify_capture_out(capture, nat_ip1)
2463 pkts = self.create_stream_in(self.pg1, self.pg2)
2464 self.pg1.add_stream(pkts)
2465 self.pg_enable_capture(self.pg_interfaces)
2467 capture = self.pg2.get_capture(len(pkts))
2468 self.verify_capture_out(capture, nat_ip1)
2470 def test_dynamic_ipless_interfaces(self):
2471 """ NAT44 interfaces without configured IP address """
2473 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2474 mactobinary(self.pg7.remote_mac),
2475 self.pg7.remote_ip4n,
2477 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2478 mactobinary(self.pg8.remote_mac),
2479 self.pg8.remote_ip4n,
2482 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2483 dst_address_length=32,
2484 next_hop_address=self.pg7.remote_ip4n,
2485 next_hop_sw_if_index=self.pg7.sw_if_index)
2486 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2487 dst_address_length=32,
2488 next_hop_address=self.pg8.remote_ip4n,
2489 next_hop_sw_if_index=self.pg8.sw_if_index)
2491 self.nat44_add_address(self.nat_addr)
2492 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2493 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2497 pkts = self.create_stream_in(self.pg7, self.pg8)
2498 self.pg7.add_stream(pkts)
2499 self.pg_enable_capture(self.pg_interfaces)
2501 capture = self.pg8.get_capture(len(pkts))
2502 self.verify_capture_out(capture)
2505 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2506 self.pg8.add_stream(pkts)
2507 self.pg_enable_capture(self.pg_interfaces)
2509 capture = self.pg7.get_capture(len(pkts))
2510 self.verify_capture_in(capture, self.pg7)
2512 def test_static_ipless_interfaces(self):
2513 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2515 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2516 mactobinary(self.pg7.remote_mac),
2517 self.pg7.remote_ip4n,
2519 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2520 mactobinary(self.pg8.remote_mac),
2521 self.pg8.remote_ip4n,
2524 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2525 dst_address_length=32,
2526 next_hop_address=self.pg7.remote_ip4n,
2527 next_hop_sw_if_index=self.pg7.sw_if_index)
2528 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2529 dst_address_length=32,
2530 next_hop_address=self.pg8.remote_ip4n,
2531 next_hop_sw_if_index=self.pg8.sw_if_index)
2533 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2534 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2535 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2539 pkts = self.create_stream_out(self.pg8)
2540 self.pg8.add_stream(pkts)
2541 self.pg_enable_capture(self.pg_interfaces)
2543 capture = self.pg7.get_capture(len(pkts))
2544 self.verify_capture_in(capture, self.pg7)
2547 pkts = self.create_stream_in(self.pg7, self.pg8)
2548 self.pg7.add_stream(pkts)
2549 self.pg_enable_capture(self.pg_interfaces)
2551 capture = self.pg8.get_capture(len(pkts))
2552 self.verify_capture_out(capture, self.nat_addr, True)
2554 def test_static_with_port_ipless_interfaces(self):
2555 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2557 self.tcp_port_out = 30606
2558 self.udp_port_out = 30607
2559 self.icmp_id_out = 30608
2561 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2562 mactobinary(self.pg7.remote_mac),
2563 self.pg7.remote_ip4n,
2565 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2566 mactobinary(self.pg8.remote_mac),
2567 self.pg8.remote_ip4n,
2570 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2571 dst_address_length=32,
2572 next_hop_address=self.pg7.remote_ip4n,
2573 next_hop_sw_if_index=self.pg7.sw_if_index)
2574 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2575 dst_address_length=32,
2576 next_hop_address=self.pg8.remote_ip4n,
2577 next_hop_sw_if_index=self.pg8.sw_if_index)
2579 self.nat44_add_address(self.nat_addr)
2580 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2581 self.tcp_port_in, self.tcp_port_out,
2582 proto=IP_PROTOS.tcp)
2583 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2584 self.udp_port_in, self.udp_port_out,
2585 proto=IP_PROTOS.udp)
2586 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2587 self.icmp_id_in, self.icmp_id_out,
2588 proto=IP_PROTOS.icmp)
2589 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2590 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2594 pkts = self.create_stream_out(self.pg8)
2595 self.pg8.add_stream(pkts)
2596 self.pg_enable_capture(self.pg_interfaces)
2598 capture = self.pg7.get_capture(len(pkts))
2599 self.verify_capture_in(capture, self.pg7)
2602 pkts = self.create_stream_in(self.pg7, self.pg8)
2603 self.pg7.add_stream(pkts)
2604 self.pg_enable_capture(self.pg_interfaces)
2606 capture = self.pg8.get_capture(len(pkts))
2607 self.verify_capture_out(capture)
2609 def test_static_unknown_proto(self):
2610 """ 1:1 NAT translate packet with unknown protocol """
2611 nat_ip = "10.0.0.10"
2612 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2613 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2614 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2618 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2619 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2621 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2622 TCP(sport=1234, dport=1234))
2623 self.pg0.add_stream(p)
2624 self.pg_enable_capture(self.pg_interfaces)
2626 p = self.pg1.get_capture(1)
2629 self.assertEqual(packet[IP].src, nat_ip)
2630 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2631 self.assertTrue(packet.haslayer(GRE))
2632 self.assert_packet_checksums_valid(packet)
2634 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2638 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2639 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2641 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2642 TCP(sport=1234, dport=1234))
2643 self.pg1.add_stream(p)
2644 self.pg_enable_capture(self.pg_interfaces)
2646 p = self.pg0.get_capture(1)
2649 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2650 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2651 self.assertTrue(packet.haslayer(GRE))
2652 self.assert_packet_checksums_valid(packet)
2654 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2657 def test_hairpinning_static_unknown_proto(self):
2658 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2660 host = self.pg0.remote_hosts[0]
2661 server = self.pg0.remote_hosts[1]
2663 host_nat_ip = "10.0.0.10"
2664 server_nat_ip = "10.0.0.11"
2666 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2667 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2668 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2669 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2673 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2674 IP(src=host.ip4, dst=server_nat_ip) /
2676 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2677 TCP(sport=1234, dport=1234))
2678 self.pg0.add_stream(p)
2679 self.pg_enable_capture(self.pg_interfaces)
2681 p = self.pg0.get_capture(1)
2684 self.assertEqual(packet[IP].src, host_nat_ip)
2685 self.assertEqual(packet[IP].dst, server.ip4)
2686 self.assertTrue(packet.haslayer(GRE))
2687 self.assert_packet_checksums_valid(packet)
2689 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2693 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2694 IP(src=server.ip4, dst=host_nat_ip) /
2696 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2697 TCP(sport=1234, dport=1234))
2698 self.pg0.add_stream(p)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 p = self.pg0.get_capture(1)
2704 self.assertEqual(packet[IP].src, server_nat_ip)
2705 self.assertEqual(packet[IP].dst, host.ip4)
2706 self.assertTrue(packet.haslayer(GRE))
2707 self.assert_packet_checksums_valid(packet)
2709 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2712 def test_output_feature(self):
2713 """ NAT44 interface output feature (in2out postrouting) """
2714 self.nat44_add_address(self.nat_addr)
2715 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2716 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2717 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2721 pkts = self.create_stream_in(self.pg0, self.pg3)
2722 self.pg0.add_stream(pkts)
2723 self.pg_enable_capture(self.pg_interfaces)
2725 capture = self.pg3.get_capture(len(pkts))
2726 self.verify_capture_out(capture)
2729 pkts = self.create_stream_out(self.pg3)
2730 self.pg3.add_stream(pkts)
2731 self.pg_enable_capture(self.pg_interfaces)
2733 capture = self.pg0.get_capture(len(pkts))
2734 self.verify_capture_in(capture, self.pg0)
2736 # from non-NAT interface to NAT inside interface
2737 pkts = self.create_stream_in(self.pg2, self.pg0)
2738 self.pg2.add_stream(pkts)
2739 self.pg_enable_capture(self.pg_interfaces)
2741 capture = self.pg0.get_capture(len(pkts))
2742 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2744 def test_output_feature_vrf_aware(self):
2745 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2746 nat_ip_vrf10 = "10.0.0.10"
2747 nat_ip_vrf20 = "10.0.0.20"
2749 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2750 dst_address_length=32,
2751 next_hop_address=self.pg3.remote_ip4n,
2752 next_hop_sw_if_index=self.pg3.sw_if_index,
2754 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2755 dst_address_length=32,
2756 next_hop_address=self.pg3.remote_ip4n,
2757 next_hop_sw_if_index=self.pg3.sw_if_index,
2760 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2761 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2762 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2763 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2764 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2768 pkts = self.create_stream_in(self.pg4, self.pg3)
2769 self.pg4.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg3.get_capture(len(pkts))
2773 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2776 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2777 self.pg3.add_stream(pkts)
2778 self.pg_enable_capture(self.pg_interfaces)
2780 capture = self.pg4.get_capture(len(pkts))
2781 self.verify_capture_in(capture, self.pg4)
2784 pkts = self.create_stream_in(self.pg6, self.pg3)
2785 self.pg6.add_stream(pkts)
2786 self.pg_enable_capture(self.pg_interfaces)
2788 capture = self.pg3.get_capture(len(pkts))
2789 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2792 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2793 self.pg3.add_stream(pkts)
2794 self.pg_enable_capture(self.pg_interfaces)
2796 capture = self.pg6.get_capture(len(pkts))
2797 self.verify_capture_in(capture, self.pg6)
2799 def test_output_feature_hairpinning(self):
2800 """ NAT44 interface output feature hairpinning (in2out postrouting) """
2801 host = self.pg0.remote_hosts[0]
2802 server = self.pg0.remote_hosts[1]
2805 server_in_port = 5678
2806 server_out_port = 8765
2808 self.nat44_add_address(self.nat_addr)
2809 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2810 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2813 # add static mapping for server
2814 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2815 server_in_port, server_out_port,
2816 proto=IP_PROTOS.tcp)
2818 # send packet from host to server
2819 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2820 IP(src=host.ip4, dst=self.nat_addr) /
2821 TCP(sport=host_in_port, dport=server_out_port))
2822 self.pg0.add_stream(p)
2823 self.pg_enable_capture(self.pg_interfaces)
2825 capture = self.pg0.get_capture(1)
2830 self.assertEqual(ip.src, self.nat_addr)
2831 self.assertEqual(ip.dst, server.ip4)
2832 self.assertNotEqual(tcp.sport, host_in_port)
2833 self.assertEqual(tcp.dport, server_in_port)
2834 self.assert_packet_checksums_valid(p)
2835 host_out_port = tcp.sport
2837 self.logger.error(ppp("Unexpected or invalid packet:", p))
2840 # send reply from server to host
2841 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2842 IP(src=server.ip4, dst=self.nat_addr) /
2843 TCP(sport=server_in_port, dport=host_out_port))
2844 self.pg0.add_stream(p)
2845 self.pg_enable_capture(self.pg_interfaces)
2847 capture = self.pg0.get_capture(1)
2852 self.assertEqual(ip.src, self.nat_addr)
2853 self.assertEqual(ip.dst, host.ip4)
2854 self.assertEqual(tcp.sport, server_out_port)
2855 self.assertEqual(tcp.dport, host_in_port)
2856 self.assert_packet_checksums_valid(p)
2858 self.logger.error(ppp("Unexpected or invalid packet:", p))
2861 def test_one_armed_nat44(self):
2862 """ One armed NAT44 """
2863 remote_host = self.pg9.remote_hosts[0]
2864 local_host = self.pg9.remote_hosts[1]
2867 self.nat44_add_address(self.nat_addr)
2868 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
2869 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
2873 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2874 IP(src=local_host.ip4, dst=remote_host.ip4) /
2875 TCP(sport=12345, dport=80))
2876 self.pg9.add_stream(p)
2877 self.pg_enable_capture(self.pg_interfaces)
2879 capture = self.pg9.get_capture(1)
2884 self.assertEqual(ip.src, self.nat_addr)
2885 self.assertEqual(ip.dst, remote_host.ip4)
2886 self.assertNotEqual(tcp.sport, 12345)
2887 external_port = tcp.sport
2888 self.assertEqual(tcp.dport, 80)
2889 self.assert_packet_checksums_valid(p)
2891 self.logger.error(ppp("Unexpected or invalid packet:", p))
2895 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2896 IP(src=remote_host.ip4, dst=self.nat_addr) /
2897 TCP(sport=80, dport=external_port))
2898 self.pg9.add_stream(p)
2899 self.pg_enable_capture(self.pg_interfaces)
2901 capture = self.pg9.get_capture(1)
2906 self.assertEqual(ip.src, remote_host.ip4)
2907 self.assertEqual(ip.dst, local_host.ip4)
2908 self.assertEqual(tcp.sport, 80)
2909 self.assertEqual(tcp.dport, 12345)
2910 self.assert_packet_checksums_valid(p)
2912 self.logger.error(ppp("Unexpected or invalid packet:", p))
2915 def test_del_session(self):
2916 """ Delete NAT44 session """
2917 self.nat44_add_address(self.nat_addr)
2918 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2919 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2922 pkts = self.create_stream_in(self.pg0, self.pg1)
2923 self.pg0.add_stream(pkts)
2924 self.pg_enable_capture(self.pg_interfaces)
2926 self.pg1.get_capture(len(pkts))
2928 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2929 nsessions = len(sessions)
2931 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2932 sessions[0].inside_port,
2933 sessions[0].protocol)
2934 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
2935 sessions[1].outside_port,
2936 sessions[1].protocol,
2939 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
2940 self.assertEqual(nsessions - len(sessions), 2)
2942 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
2943 sessions[0].inside_port,
2944 sessions[0].protocol)
2946 self.verify_no_nat44_user()
2948 def test_set_get_reass(self):
2949 """ NAT44 set/get virtual fragmentation reassembly """
2950 reas_cfg1 = self.vapi.nat_get_reass()
2952 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
2953 max_reass=reas_cfg1.ip4_max_reass * 2,
2954 max_frag=reas_cfg1.ip4_max_frag * 2)
2956 reas_cfg2 = self.vapi.nat_get_reass()
2958 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
2959 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
2960 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
2962 self.vapi.nat_set_reass(drop_frag=1)
2963 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
2965 def test_frag_in_order(self):
2966 """ NAT44 translate fragments arriving in order """
2967 self.nat44_add_address(self.nat_addr)
2968 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2969 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2972 data = "A" * 4 + "B" * 16 + "C" * 3
2973 self.tcp_port_in = random.randint(1025, 65535)
2975 reass = self.vapi.nat_reass_dump()
2976 reass_n_start = len(reass)
2979 pkts = self.create_stream_frag(self.pg0,
2980 self.pg1.remote_ip4,
2984 self.pg0.add_stream(pkts)
2985 self.pg_enable_capture(self.pg_interfaces)
2987 frags = self.pg1.get_capture(len(pkts))
2988 p = self.reass_frags_and_verify(frags,
2990 self.pg1.remote_ip4)
2991 self.assertEqual(p[TCP].dport, 20)
2992 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
2993 self.tcp_port_out = p[TCP].sport
2994 self.assertEqual(data, p[Raw].load)
2997 pkts = self.create_stream_frag(self.pg1,
3002 self.pg1.add_stream(pkts)
3003 self.pg_enable_capture(self.pg_interfaces)
3005 frags = self.pg0.get_capture(len(pkts))
3006 p = self.reass_frags_and_verify(frags,
3007 self.pg1.remote_ip4,
3008 self.pg0.remote_ip4)
3009 self.assertEqual(p[TCP].sport, 20)
3010 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3011 self.assertEqual(data, p[Raw].load)
3013 reass = self.vapi.nat_reass_dump()
3014 reass_n_end = len(reass)
3016 self.assertEqual(reass_n_end - reass_n_start, 2)
3018 def test_reass_hairpinning(self):
3019 """ NAT44 fragments hairpinning """
3020 server = self.pg0.remote_hosts[1]
3021 host_in_port = random.randint(1025, 65535)
3022 server_in_port = random.randint(1025, 65535)
3023 server_out_port = random.randint(1025, 65535)
3024 data = "A" * 4 + "B" * 16 + "C" * 3
3026 self.nat44_add_address(self.nat_addr)
3027 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3028 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3030 # add static mapping for server
3031 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3032 server_in_port, server_out_port,
3033 proto=IP_PROTOS.tcp)
3035 # send packet from host to server
3036 pkts = self.create_stream_frag(self.pg0,
3041 self.pg0.add_stream(pkts)
3042 self.pg_enable_capture(self.pg_interfaces)
3044 frags = self.pg0.get_capture(len(pkts))
3045 p = self.reass_frags_and_verify(frags,
3048 self.assertNotEqual(p[TCP].sport, host_in_port)
3049 self.assertEqual(p[TCP].dport, server_in_port)
3050 self.assertEqual(data, p[Raw].load)
3052 def test_frag_out_of_order(self):
3053 """ NAT44 translate fragments arriving out of order """
3054 self.nat44_add_address(self.nat_addr)
3055 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3056 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3059 data = "A" * 4 + "B" * 16 + "C" * 3
3060 random.randint(1025, 65535)
3063 pkts = self.create_stream_frag(self.pg0,
3064 self.pg1.remote_ip4,
3069 self.pg0.add_stream(pkts)
3070 self.pg_enable_capture(self.pg_interfaces)
3072 frags = self.pg1.get_capture(len(pkts))
3073 p = self.reass_frags_and_verify(frags,
3075 self.pg1.remote_ip4)
3076 self.assertEqual(p[TCP].dport, 20)
3077 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3078 self.tcp_port_out = p[TCP].sport
3079 self.assertEqual(data, p[Raw].load)
3082 pkts = self.create_stream_frag(self.pg1,
3088 self.pg1.add_stream(pkts)
3089 self.pg_enable_capture(self.pg_interfaces)
3091 frags = self.pg0.get_capture(len(pkts))
3092 p = self.reass_frags_and_verify(frags,
3093 self.pg1.remote_ip4,
3094 self.pg0.remote_ip4)
3095 self.assertEqual(p[TCP].sport, 20)
3096 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3097 self.assertEqual(data, p[Raw].load)
3099 def test_port_restricted(self):
3100 """ Port restricted NAT44 (MAP-E CE) """
3101 self.nat44_add_address(self.nat_addr)
3102 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3103 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3105 self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3110 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3111 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3112 TCP(sport=4567, dport=22))
3113 self.pg0.add_stream(p)
3114 self.pg_enable_capture(self.pg_interfaces)
3116 capture = self.pg1.get_capture(1)
3121 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3122 self.assertEqual(ip.src, self.nat_addr)
3123 self.assertEqual(tcp.dport, 22)
3124 self.assertNotEqual(tcp.sport, 4567)
3125 self.assertEqual((tcp.sport >> 6) & 63, 10)
3126 self.assert_packet_checksums_valid(p)
3128 self.logger.error(ppp("Unexpected or invalid packet:", p))
3131 def test_port_range(self):
3132 """ External address port range """
3133 self.nat44_add_address(self.nat_addr)
3134 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3135 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3137 self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3142 for port in range(0, 5):
3143 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3144 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3145 TCP(sport=1125 + port))
3147 self.pg0.add_stream(pkts)
3148 self.pg_enable_capture(self.pg_interfaces)
3150 capture = self.pg1.get_capture(3)
3153 self.assertGreaterEqual(tcp.sport, 1025)
3154 self.assertLessEqual(tcp.sport, 1027)
3156 def test_ipfix_max_frags(self):
3157 """ IPFIX logging maximum fragments pending reassembly exceeded """
3158 self.nat44_add_address(self.nat_addr)
3159 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3160 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3162 self.vapi.nat_set_reass(max_frag=1)
3163 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3164 src_address=self.pg3.local_ip4n,
3166 template_interval=10)
3167 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3168 src_port=self.ipfix_src_port)
3170 data = "A" * 4 + "B" * 16 + "C" * 3
3171 self.tcp_port_in = random.randint(1025, 65535)
3172 pkts = self.create_stream_frag(self.pg0,
3173 self.pg1.remote_ip4,
3178 self.pg0.add_stream(pkts)
3179 self.pg_enable_capture(self.pg_interfaces)
3181 self.pg1.assert_nothing_captured()
3183 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3184 capture = self.pg3.get_capture(9)
3185 ipfix = IPFIXDecoder()
3186 # first load template
3188 self.assertTrue(p.haslayer(IPFIX))
3189 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3190 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3191 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3192 self.assertEqual(p[UDP].dport, 4739)
3193 self.assertEqual(p[IPFIX].observationDomainID,
3194 self.ipfix_domain_id)
3195 if p.haslayer(Template):
3196 ipfix.add_template(p.getlayer(Template))
3197 # verify events in data set
3199 if p.haslayer(Data):
3200 data = ipfix.decode_data_set(p.getlayer(Set))
3201 self.verify_ipfix_max_fragments_ip4(data, 1,
3202 self.pg0.remote_ip4n)
3204 def test_multiple_outside_vrf(self):
3205 """ Multiple outside VRF """
3209 self.pg1.unconfig_ip4()
3210 self.pg2.unconfig_ip4()
3211 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
3212 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
3213 self.pg1.set_table_ip4(vrf_id1)
3214 self.pg2.set_table_ip4(vrf_id2)
3215 self.pg1.config_ip4()
3216 self.pg2.config_ip4()
3217 self.pg1.resolve_arp()
3218 self.pg2.resolve_arp()
3220 self.nat44_add_address(self.nat_addr)
3221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3224 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
3229 pkts = self.create_stream_in(self.pg0, self.pg1)
3230 self.pg0.add_stream(pkts)
3231 self.pg_enable_capture(self.pg_interfaces)
3233 capture = self.pg1.get_capture(len(pkts))
3234 self.verify_capture_out(capture, self.nat_addr)
3236 pkts = self.create_stream_out(self.pg1, self.nat_addr)
3237 self.pg1.add_stream(pkts)
3238 self.pg_enable_capture(self.pg_interfaces)
3240 capture = self.pg0.get_capture(len(pkts))
3241 self.verify_capture_in(capture, self.pg0)
3243 self.tcp_port_in = 60303
3244 self.udp_port_in = 60304
3245 self.icmp_id_in = 60305
3248 pkts = self.create_stream_in(self.pg0, self.pg2)
3249 self.pg0.add_stream(pkts)
3250 self.pg_enable_capture(self.pg_interfaces)
3252 capture = self.pg2.get_capture(len(pkts))
3253 self.verify_capture_out(capture, self.nat_addr)
3255 pkts = self.create_stream_out(self.pg2, self.nat_addr)
3256 self.pg2.add_stream(pkts)
3257 self.pg_enable_capture(self.pg_interfaces)
3259 capture = self.pg0.get_capture(len(pkts))
3260 self.verify_capture_in(capture, self.pg0)
3263 self.pg1.unconfig_ip4()
3264 self.pg2.unconfig_ip4()
3265 self.pg1.set_table_ip4(0)
3266 self.pg2.set_table_ip4(0)
3267 self.pg1.config_ip4()
3268 self.pg2.config_ip4()
3269 self.pg1.resolve_arp()
3270 self.pg2.resolve_arp()
3272 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3273 def test_session_timeout(self):
3274 """ NAT44 session timeouts """
3275 self.nat44_add_address(self.nat_addr)
3276 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3277 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3279 self.vapi.nat_set_timeouts(udp=5)
3283 for i in range(0, max_sessions):
3284 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3285 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3286 IP(src=src, dst=self.pg1.remote_ip4) /
3287 UDP(sport=1025, dport=53))
3289 self.pg0.add_stream(pkts)
3290 self.pg_enable_capture(self.pg_interfaces)
3292 self.pg1.get_capture(max_sessions)
3297 for i in range(0, max_sessions):
3298 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
3299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3300 IP(src=src, dst=self.pg1.remote_ip4) /
3301 UDP(sport=1026, dport=53))
3303 self.pg0.add_stream(pkts)
3304 self.pg_enable_capture(self.pg_interfaces)
3306 self.pg1.get_capture(max_sessions)
3309 users = self.vapi.nat44_user_dump()
3311 nsessions = nsessions + user.nsessions
3312 self.assertLess(nsessions, 2 * max_sessions)
3314 def test_mss_clamping(self):
3315 """ TCP MSS clamping """
3316 self.nat44_add_address(self.nat_addr)
3317 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3318 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3321 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3322 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3323 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3324 flags="S", options=[('MSS', 1400)]))
3326 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3327 self.pg0.add_stream(p)
3328 self.pg_enable_capture(self.pg_interfaces)
3330 capture = self.pg1.get_capture(1)
3331 # Negotiated MSS value greater than configured - changed
3332 self.verify_mss_value(capture[0], 1000)
3334 self.vapi.nat_set_mss_clamping(enable=0)
3335 self.pg0.add_stream(p)
3336 self.pg_enable_capture(self.pg_interfaces)
3338 capture = self.pg1.get_capture(1)
3339 # MSS clamping disabled - negotiated MSS unchanged
3340 self.verify_mss_value(capture[0], 1400)
3342 self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3343 self.pg0.add_stream(p)
3344 self.pg_enable_capture(self.pg_interfaces)
3346 capture = self.pg1.get_capture(1)
3347 # Negotiated MSS value smaller than configured - unchanged
3348 self.verify_mss_value(capture[0], 1400)
3351 super(TestNAT44, self).tearDown()
3352 if not self.vpp_dead:
3353 self.logger.info(self.vapi.cli("show nat44 addresses"))
3354 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3355 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3356 self.logger.info(self.vapi.cli("show nat44 interface address"))
3357 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3358 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3359 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3360 self.logger.info(self.vapi.cli("show nat timeouts"))
3362 self.vapi.cli("show nat addr-port-assignment-alg"))
3364 self.vapi.cli("clear logging")
3367 class TestNAT44EndpointDependent(MethodHolder):
3368 """ Endpoint-Dependent mapping and filtering test cases """
3371 def setUpConstants(cls):
3372 super(TestNAT44EndpointDependent, cls).setUpConstants()
3373 cls.vpp_cmdline.extend(["nat", "{", "endpoint-dependent", "}"])
3376 def setUpClass(cls):
3377 super(TestNAT44EndpointDependent, cls).setUpClass()
3378 cls.vapi.cli("set log class nat level debug")
3380 cls.tcp_port_in = 6303
3381 cls.tcp_port_out = 6303
3382 cls.udp_port_in = 6304
3383 cls.udp_port_out = 6304
3384 cls.icmp_id_in = 6305
3385 cls.icmp_id_out = 6305
3386 cls.nat_addr = '10.0.0.3'
3387 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3388 cls.ipfix_src_port = 4739
3389 cls.ipfix_domain_id = 1
3390 cls.tcp_external_port = 80
3392 cls.create_pg_interfaces(range(7))
3393 cls.interfaces = list(cls.pg_interfaces[0:3])
3395 for i in cls.interfaces:
3400 cls.pg0.generate_remote_hosts(3)
3401 cls.pg0.configure_ipv4_neighbors()
3405 cls.pg4.generate_remote_hosts(2)
3406 cls.pg4.config_ip4()
3407 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
3408 cls.vapi.sw_interface_add_del_address(cls.pg4.sw_if_index,
3412 cls.pg4.resolve_arp()
3413 cls.pg4._remote_hosts[1]._ip4 = cls.pg4._remote_hosts[0]._ip4
3414 cls.pg4.resolve_arp()
3416 zero_ip4n = socket.inet_pton(socket.AF_INET, "0.0.0.0")
3417 cls.vapi.ip_table_add_del(1, is_add=1)
3419 cls.pg5._local_ip4 = "10.1.1.1"
3420 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET,
3422 cls.pg5._remote_hosts[0]._ip4 = "10.1.1.2"
3423 cls.pg5._remote_hosts[0]._ip4n = socket.inet_pton(
3424 socket.AF_INET, cls.pg5.remote_ip4)
3425 cls.pg5.set_table_ip4(1)
3426 cls.pg5.config_ip4()
3428 cls.vapi.ip_add_del_route(dst_address=cls.pg5.remote_ip4n,
3429 dst_address_length=32,
3431 next_hop_sw_if_index=cls.pg5.sw_if_index,
3432 next_hop_address=zero_ip4n)
3434 cls.pg6._local_ip4 = "10.1.2.1"
3435 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET,
3437 cls.pg6._remote_hosts[0]._ip4 = "10.1.2.2"
3438 cls.pg6._remote_hosts[0]._ip4n = socket.inet_pton(
3439 socket.AF_INET, cls.pg6.remote_ip4)
3440 cls.pg6.set_table_ip4(1)
3441 cls.pg6.config_ip4()
3443 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3444 dst_address_length=32,
3446 next_hop_sw_if_index=cls.pg6.sw_if_index,
3447 next_hop_address=zero_ip4n)
3449 cls.vapi.ip_add_del_route(dst_address=cls.pg6.remote_ip4n,
3450 dst_address_length=16,
3451 next_hop_address=zero_ip4n,
3453 next_hop_table_id=1)
3454 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3455 dst_address_length=0,
3456 next_hop_address=zero_ip4n,
3458 next_hop_table_id=0)
3459 cls.vapi.ip_add_del_route(dst_address=zero_ip4n,
3460 dst_address_length=0,
3462 next_hop_sw_if_index=cls.pg1.sw_if_index,
3463 next_hop_address=cls.pg1.local_ip4n)
3465 cls.pg5.resolve_arp()
3466 cls.pg6.resolve_arp()
3469 super(TestNAT44EndpointDependent, cls).tearDownClass()
3472 def test_dynamic(self):
3473 """ NAT44 dynamic translation test """
3475 self.nat44_add_address(self.nat_addr)
3476 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3477 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3480 nat_config = self.vapi.nat_show_config()
3481 self.assertEqual(1, nat_config.endpoint_dependent)
3484 pkts = self.create_stream_in(self.pg0, self.pg1)
3485 self.pg0.add_stream(pkts)
3486 self.pg_enable_capture(self.pg_interfaces)
3488 capture = self.pg1.get_capture(len(pkts))
3489 self.verify_capture_out(capture)
3492 pkts = self.create_stream_out(self.pg1)
3493 self.pg1.add_stream(pkts)
3494 self.pg_enable_capture(self.pg_interfaces)
3496 capture = self.pg0.get_capture(len(pkts))
3497 self.verify_capture_in(capture, self.pg0)
3499 def test_forwarding(self):
3500 """ NAT44 forwarding test """
3502 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3503 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3505 self.vapi.nat44_forwarding_enable_disable(1)
3507 real_ip = self.pg0.remote_ip4n
3508 alias_ip = self.nat_addr_n
3509 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3510 external_ip=alias_ip)
3513 # in2out - static mapping match
3515 pkts = self.create_stream_out(self.pg1)
3516 self.pg1.add_stream(pkts)
3517 self.pg_enable_capture(self.pg_interfaces)
3519 capture = self.pg0.get_capture(len(pkts))
3520 self.verify_capture_in(capture, self.pg0)
3522 pkts = self.create_stream_in(self.pg0, self.pg1)
3523 self.pg0.add_stream(pkts)
3524 self.pg_enable_capture(self.pg_interfaces)
3526 capture = self.pg1.get_capture(len(pkts))
3527 self.verify_capture_out(capture, same_port=True)
3529 # in2out - no static mapping match
3531 host0 = self.pg0.remote_hosts[0]
3532 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
3534 pkts = self.create_stream_out(self.pg1,
3535 dst_ip=self.pg0.remote_ip4,
3536 use_inside_ports=True)
3537 self.pg1.add_stream(pkts)
3538 self.pg_enable_capture(self.pg_interfaces)
3540 capture = self.pg0.get_capture(len(pkts))
3541 self.verify_capture_in(capture, self.pg0)
3543 pkts = self.create_stream_in(self.pg0, self.pg1)
3544 self.pg0.add_stream(pkts)
3545 self.pg_enable_capture(self.pg_interfaces)
3547 capture = self.pg1.get_capture(len(pkts))
3548 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3551 self.pg0.remote_hosts[0] = host0
3553 user = self.pg0.remote_hosts[1]
3554 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3555 self.assertEqual(len(sessions), 3)
3556 self.assertTrue(sessions[0].ext_host_valid)
3557 self.vapi.nat44_del_session(
3558 sessions[0].inside_ip_address,
3559 sessions[0].inside_port,
3560 sessions[0].protocol,
3561 ext_host_address=sessions[0].ext_host_address,
3562 ext_host_port=sessions[0].ext_host_port)
3563 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
3564 self.assertEqual(len(sessions), 2)
3567 self.vapi.nat44_forwarding_enable_disable(0)
3568 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
3569 external_ip=alias_ip,
3572 def test_static_lb(self):
3573 """ NAT44 local service load balancing """
3574 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3577 server1 = self.pg0.remote_hosts[0]
3578 server2 = self.pg0.remote_hosts[1]
3580 locals = [{'addr': server1.ip4n,
3584 {'addr': server2.ip4n,
3589 self.nat44_add_address(self.nat_addr)
3590 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3593 local_num=len(locals),
3595 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3596 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3599 # from client to service
3600 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3601 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3602 TCP(sport=12345, dport=external_port))
3603 self.pg1.add_stream(p)
3604 self.pg_enable_capture(self.pg_interfaces)
3606 capture = self.pg0.get_capture(1)
3612 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3613 if ip.dst == server1.ip4:
3617 self.assertEqual(tcp.dport, local_port)
3618 self.assert_packet_checksums_valid(p)
3620 self.logger.error(ppp("Unexpected or invalid packet:", p))
3623 # from service back to client
3624 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3625 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3626 TCP(sport=local_port, dport=12345))
3627 self.pg0.add_stream(p)
3628 self.pg_enable_capture(self.pg_interfaces)
3630 capture = self.pg1.get_capture(1)
3635 self.assertEqual(ip.src, self.nat_addr)
3636 self.assertEqual(tcp.sport, external_port)
3637 self.assert_packet_checksums_valid(p)
3639 self.logger.error(ppp("Unexpected or invalid packet:", p))
3642 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3643 self.assertEqual(len(sessions), 1)
3644 self.assertTrue(sessions[0].ext_host_valid)
3645 self.vapi.nat44_del_session(
3646 sessions[0].inside_ip_address,
3647 sessions[0].inside_port,
3648 sessions[0].protocol,
3649 ext_host_address=sessions[0].ext_host_address,
3650 ext_host_port=sessions[0].ext_host_port)
3651 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3652 self.assertEqual(len(sessions), 0)
3654 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3655 def test_static_lb_multi_clients(self):
3656 """ NAT44 local service load balancing - multiple clients"""
3658 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3661 server1 = self.pg0.remote_hosts[0]
3662 server2 = self.pg0.remote_hosts[1]
3664 locals = [{'addr': server1.ip4n,
3668 {'addr': server2.ip4n,
3673 self.nat44_add_address(self.nat_addr)
3674 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3677 local_num=len(locals),
3679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3685 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
3687 for client in clients:
3688 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3689 IP(src=client, dst=self.nat_addr) /
3690 TCP(sport=12345, dport=external_port))
3692 self.pg1.add_stream(pkts)
3693 self.pg_enable_capture(self.pg_interfaces)
3695 capture = self.pg0.get_capture(len(pkts))
3697 if p[IP].dst == server1.ip4:
3701 self.assertTrue(server1_n > server2_n)
3703 def test_static_lb_2(self):
3704 """ NAT44 local service load balancing (asymmetrical rule) """
3705 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3708 server1 = self.pg0.remote_hosts[0]
3709 server2 = self.pg0.remote_hosts[1]
3711 locals = [{'addr': server1.ip4n,
3715 {'addr': server2.ip4n,
3720 self.vapi.nat44_forwarding_enable_disable(1)
3721 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3725 local_num=len(locals),
3727 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3728 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3731 # from client to service
3732 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3733 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3734 TCP(sport=12345, dport=external_port))
3735 self.pg1.add_stream(p)
3736 self.pg_enable_capture(self.pg_interfaces)
3738 capture = self.pg0.get_capture(1)
3744 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3745 if ip.dst == server1.ip4:
3749 self.assertEqual(tcp.dport, local_port)
3750 self.assert_packet_checksums_valid(p)
3752 self.logger.error(ppp("Unexpected or invalid packet:", p))
3755 # from service back to client
3756 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3757 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
3758 TCP(sport=local_port, dport=12345))
3759 self.pg0.add_stream(p)
3760 self.pg_enable_capture(self.pg_interfaces)
3762 capture = self.pg1.get_capture(1)
3767 self.assertEqual(ip.src, self.nat_addr)
3768 self.assertEqual(tcp.sport, external_port)
3769 self.assert_packet_checksums_valid(p)
3771 self.logger.error(ppp("Unexpected or invalid packet:", p))
3774 # from client to server (no translation)
3775 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3776 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
3777 TCP(sport=12346, dport=local_port))
3778 self.pg1.add_stream(p)
3779 self.pg_enable_capture(self.pg_interfaces)
3781 capture = self.pg0.get_capture(1)
3787 self.assertEqual(ip.dst, server1.ip4)
3788 self.assertEqual(tcp.dport, local_port)
3789 self.assert_packet_checksums_valid(p)
3791 self.logger.error(ppp("Unexpected or invalid packet:", p))
3794 # from service back to client (no translation)
3795 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
3796 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
3797 TCP(sport=local_port, dport=12346))
3798 self.pg0.add_stream(p)
3799 self.pg_enable_capture(self.pg_interfaces)
3801 capture = self.pg1.get_capture(1)
3806 self.assertEqual(ip.src, server1.ip4)
3807 self.assertEqual(tcp.sport, local_port)
3808 self.assert_packet_checksums_valid(p)
3810 self.logger.error(ppp("Unexpected or invalid packet:", p))
3813 def test_lb_affinity(self):
3814 """ NAT44 local service load balancing affinity """
3815 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3818 server1 = self.pg0.remote_hosts[0]
3819 server2 = self.pg0.remote_hosts[1]
3821 locals = [{'addr': server1.ip4n,
3825 {'addr': server2.ip4n,
3830 self.nat44_add_address(self.nat_addr)
3831 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3835 local_num=len(locals),
3837 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3838 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3841 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3842 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3843 TCP(sport=1025, dport=external_port))
3844 self.pg1.add_stream(p)
3845 self.pg_enable_capture(self.pg_interfaces)
3847 capture = self.pg0.get_capture(1)
3848 backend = capture[0][IP].dst
3850 sessions = self.vapi.nat44_user_session_dump(
3851 socket.inet_pton(socket.AF_INET, backend), 0)
3852 self.assertEqual(len(sessions), 1)
3853 self.assertTrue(sessions[0].ext_host_valid)
3854 self.vapi.nat44_del_session(
3855 sessions[0].inside_ip_address,
3856 sessions[0].inside_port,
3857 sessions[0].protocol,
3858 ext_host_address=sessions[0].ext_host_address,
3859 ext_host_port=sessions[0].ext_host_port)
3862 for port in range(1030, 1100):
3863 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3864 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3865 TCP(sport=port, dport=external_port))
3867 self.pg1.add_stream(pkts)
3868 self.pg_enable_capture(self.pg_interfaces)
3870 capture = self.pg0.get_capture(len(pkts))
3872 self.assertEqual(p[IP].dst, backend)
3874 def test_unknown_proto(self):
3875 """ NAT44 translate packet with unknown protocol """
3876 self.nat44_add_address(self.nat_addr)
3877 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3878 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3882 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3883 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3884 TCP(sport=self.tcp_port_in, dport=20))
3885 self.pg0.add_stream(p)
3886 self.pg_enable_capture(self.pg_interfaces)
3888 p = self.pg1.get_capture(1)
3890 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3891 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3893 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3894 TCP(sport=1234, dport=1234))
3895 self.pg0.add_stream(p)
3896 self.pg_enable_capture(self.pg_interfaces)
3898 p = self.pg1.get_capture(1)
3901 self.assertEqual(packet[IP].src, self.nat_addr)
3902 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3903 self.assertTrue(packet.haslayer(GRE))
3904 self.assert_packet_checksums_valid(packet)
3906 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3910 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3911 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3913 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3914 TCP(sport=1234, dport=1234))
3915 self.pg1.add_stream(p)
3916 self.pg_enable_capture(self.pg_interfaces)
3918 p = self.pg0.get_capture(1)
3921 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3922 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3923 self.assertTrue(packet.haslayer(GRE))
3924 self.assert_packet_checksums_valid(packet)
3926 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3929 def test_hairpinning_unknown_proto(self):
3930 """ NAT44 translate packet with unknown protocol - hairpinning """
3931 host = self.pg0.remote_hosts[0]
3932 server = self.pg0.remote_hosts[1]
3934 server_out_port = 8765
3935 server_nat_ip = "10.0.0.11"
3937 self.nat44_add_address(self.nat_addr)
3938 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3939 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3942 # add static mapping for server
3943 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3946 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3947 IP(src=host.ip4, dst=server_nat_ip) /
3948 TCP(sport=host_in_port, dport=server_out_port))
3949 self.pg0.add_stream(p)
3950 self.pg_enable_capture(self.pg_interfaces)
3952 self.pg0.get_capture(1)
3954 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3955 IP(src=host.ip4, dst=server_nat_ip) /
3957 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3958 TCP(sport=1234, dport=1234))
3959 self.pg0.add_stream(p)
3960 self.pg_enable_capture(self.pg_interfaces)
3962 p = self.pg0.get_capture(1)
3965 self.assertEqual(packet[IP].src, self.nat_addr)
3966 self.assertEqual(packet[IP].dst, server.ip4)
3967 self.assertTrue(packet.haslayer(GRE))
3968 self.assert_packet_checksums_valid(packet)
3970 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3974 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3975 IP(src=server.ip4, dst=self.nat_addr) /
3977 IP(src=self.pg2.remote_ip4, dst=self.pg2.remote_ip4) /
3978 TCP(sport=1234, dport=1234))
3979 self.pg0.add_stream(p)
3980 self.pg_enable_capture(self.pg_interfaces)
3982 p = self.pg0.get_capture(1)
3985 self.assertEqual(packet[IP].src, server_nat_ip)
3986 self.assertEqual(packet[IP].dst, host.ip4)
3987 self.assertTrue(packet.haslayer(GRE))
3988 self.assert_packet_checksums_valid(packet)
3990 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3993 def test_output_feature_and_service(self):
3994 """ NAT44 interface output feature and services """
3995 external_addr = '1.2.3.4'
3999 self.vapi.nat44_forwarding_enable_disable(1)
4000 self.nat44_add_address(self.nat_addr)
4001 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
4002 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
4003 local_port, external_port,
4004 proto=IP_PROTOS.tcp, out2in_only=1)
4005 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4008 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4011 # from client to service
4012 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4013 IP(src=self.pg1.remote_ip4, dst=external_addr) /
4014 TCP(sport=12345, dport=external_port))
4015 self.pg1.add_stream(p)
4016 self.pg_enable_capture(self.pg_interfaces)
4018 capture = self.pg0.get_capture(1)
4023 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4024 self.assertEqual(tcp.dport, local_port)
4025 self.assert_packet_checksums_valid(p)
4027 self.logger.error(ppp("Unexpected or invalid packet:", p))
4030 # from service back to client
4031 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4032 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4033 TCP(sport=local_port, dport=12345))
4034 self.pg0.add_stream(p)
4035 self.pg_enable_capture(self.pg_interfaces)
4037 capture = self.pg1.get_capture(1)
4042 self.assertEqual(ip.src, external_addr)
4043 self.assertEqual(tcp.sport, external_port)
4044 self.assert_packet_checksums_valid(p)
4046 self.logger.error(ppp("Unexpected or invalid packet:", p))
4049 # from local network host to external network
4050 pkts = self.create_stream_in(self.pg0, self.pg1)
4051 self.pg0.add_stream(pkts)
4052 self.pg_enable_capture(self.pg_interfaces)
4054 capture = self.pg1.get_capture(len(pkts))
4055 self.verify_capture_out(capture)
4056 pkts = self.create_stream_in(self.pg0, self.pg1)
4057 self.pg0.add_stream(pkts)
4058 self.pg_enable_capture(self.pg_interfaces)
4060 capture = self.pg1.get_capture(len(pkts))
4061 self.verify_capture_out(capture)
4063 # from external network back to local network host
4064 pkts = self.create_stream_out(self.pg1)
4065 self.pg1.add_stream(pkts)
4066 self.pg_enable_capture(self.pg_interfaces)
4068 capture = self.pg0.get_capture(len(pkts))
4069 self.verify_capture_in(capture, self.pg0)
4071 def test_output_feature_and_service2(self):
4072 """ NAT44 interface output feature and service host direct access """
4073 self.vapi.nat44_forwarding_enable_disable(1)
4074 self.nat44_add_address(self.nat_addr)
4075 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4078 # session initiaded from service host - translate
4079 pkts = self.create_stream_in(self.pg0, self.pg1)
4080 self.pg0.add_stream(pkts)
4081 self.pg_enable_capture(self.pg_interfaces)
4083 capture = self.pg1.get_capture(len(pkts))
4084 self.verify_capture_out(capture)
4086 pkts = self.create_stream_out(self.pg1)
4087 self.pg1.add_stream(pkts)
4088 self.pg_enable_capture(self.pg_interfaces)
4090 capture = self.pg0.get_capture(len(pkts))
4091 self.verify_capture_in(capture, self.pg0)
4093 # session initiaded from remote host - do not translate
4094 self.tcp_port_in = 60303
4095 self.udp_port_in = 60304
4096 self.icmp_id_in = 60305
4097 pkts = self.create_stream_out(self.pg1,
4098 self.pg0.remote_ip4,
4099 use_inside_ports=True)
4100 self.pg1.add_stream(pkts)
4101 self.pg_enable_capture(self.pg_interfaces)
4103 capture = self.pg0.get_capture(len(pkts))
4104 self.verify_capture_in(capture, self.pg0)
4106 pkts = self.create_stream_in(self.pg0, self.pg1)
4107 self.pg0.add_stream(pkts)
4108 self.pg_enable_capture(self.pg_interfaces)
4110 capture = self.pg1.get_capture(len(pkts))
4111 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
4114 def test_output_feature_and_service3(self):
4115 """ NAT44 interface output feature and DST NAT """
4116 external_addr = '1.2.3.4'
4120 self.vapi.nat44_forwarding_enable_disable(1)
4121 self.nat44_add_address(self.nat_addr)
4122 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
4123 local_port, external_port,
4124 proto=IP_PROTOS.tcp, out2in_only=1)
4125 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4128 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4131 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4132 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4133 TCP(sport=12345, dport=external_port))
4134 self.pg0.add_stream(p)
4135 self.pg_enable_capture(self.pg_interfaces)
4137 capture = self.pg1.get_capture(1)
4142 self.assertEqual(ip.src, self.pg0.remote_ip4)
4143 self.assertEqual(tcp.sport, 12345)
4144 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4145 self.assertEqual(tcp.dport, local_port)
4146 self.assert_packet_checksums_valid(p)
4148 self.logger.error(ppp("Unexpected or invalid packet:", p))
4151 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4152 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4153 TCP(sport=local_port, dport=12345))
4154 self.pg1.add_stream(p)
4155 self.pg_enable_capture(self.pg_interfaces)
4157 capture = self.pg0.get_capture(1)
4162 self.assertEqual(ip.src, external_addr)
4163 self.assertEqual(tcp.sport, external_port)
4164 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4165 self.assertEqual(tcp.dport, 12345)
4166 self.assert_packet_checksums_valid(p)
4168 self.logger.error(ppp("Unexpected or invalid packet:", p))
4171 def test_next_src_nat(self):
4172 """ On way back forward packet to nat44-in2out node. """
4173 twice_nat_addr = '10.0.1.3'
4176 post_twice_nat_port = 0
4178 self.vapi.nat44_forwarding_enable_disable(1)
4179 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4180 self.nat44_add_static_mapping(self.pg6.remote_ip4, self.pg1.remote_ip4,
4181 local_port, external_port,
4182 proto=IP_PROTOS.tcp, out2in_only=1,
4183 self_twice_nat=1, vrf_id=1)
4184 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4187 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4188 IP(src=self.pg6.remote_ip4, dst=self.pg1.remote_ip4) /
4189 TCP(sport=12345, dport=external_port))
4190 self.pg6.add_stream(p)
4191 self.pg_enable_capture(self.pg_interfaces)
4193 capture = self.pg6.get_capture(1)
4198 self.assertEqual(ip.src, twice_nat_addr)
4199 self.assertNotEqual(tcp.sport, 12345)
4200 post_twice_nat_port = tcp.sport
4201 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4202 self.assertEqual(tcp.dport, local_port)
4203 self.assert_packet_checksums_valid(p)
4205 self.logger.error(ppp("Unexpected or invalid packet:", p))
4208 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4209 IP(src=self.pg6.remote_ip4, dst=twice_nat_addr) /
4210 TCP(sport=local_port, dport=post_twice_nat_port))
4211 self.pg6.add_stream(p)
4212 self.pg_enable_capture(self.pg_interfaces)
4214 capture = self.pg6.get_capture(1)
4219 self.assertEqual(ip.src, self.pg1.remote_ip4)
4220 self.assertEqual(tcp.sport, external_port)
4221 self.assertEqual(ip.dst, self.pg6.remote_ip4)
4222 self.assertEqual(tcp.dport, 12345)
4223 self.assert_packet_checksums_valid(p)
4225 self.logger.error(ppp("Unexpected or invalid packet:", p))
4228 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
4230 twice_nat_addr = '10.0.1.3'
4238 port_in1 = port_in+1
4239 port_in2 = port_in+2
4244 server1 = self.pg0.remote_hosts[0]
4245 server2 = self.pg0.remote_hosts[1]
4257 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
4260 self.nat44_add_address(self.nat_addr)
4261 self.nat44_add_address(twice_nat_addr, twice_nat=1)
4263 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
4265 proto=IP_PROTOS.tcp,
4266 twice_nat=int(not self_twice_nat),
4267 self_twice_nat=int(self_twice_nat))
4269 locals = [{'addr': server1.ip4n,
4273 {'addr': server2.ip4n,
4277 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
4278 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
4282 not self_twice_nat),
4285 local_num=len(locals),
4287 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
4288 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
4295 assert client_id is not None
4297 client = self.pg0.remote_hosts[0]
4298 elif client_id == 2:
4299 client = self.pg0.remote_hosts[1]
4301 client = pg1.remote_hosts[0]
4302 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4303 IP(src=client.ip4, dst=self.nat_addr) /
4304 TCP(sport=eh_port_out, dport=port_out))
4306 self.pg_enable_capture(self.pg_interfaces)
4308 capture = pg0.get_capture(1)
4314 if ip.dst == server1.ip4:
4320 self.assertEqual(ip.dst, server.ip4)
4322 self.assertIn(tcp.dport, [port_in1, port_in2])
4324 self.assertEqual(tcp.dport, port_in)
4326 self.assertEqual(ip.src, twice_nat_addr)
4327 self.assertNotEqual(tcp.sport, eh_port_out)
4329 self.assertEqual(ip.src, client.ip4)
4330 self.assertEqual(tcp.sport, eh_port_out)
4332 eh_port_in = tcp.sport
4333 saved_port_in = tcp.dport
4334 self.assert_packet_checksums_valid(p)
4336 self.logger.error(ppp("Unexpected or invalid packet:", p))
4339 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4340 IP(src=server.ip4, dst=eh_addr_in) /
4341 TCP(sport=saved_port_in, dport=eh_port_in))
4343 self.pg_enable_capture(self.pg_interfaces)
4345 capture = pg1.get_capture(1)
4350 self.assertEqual(ip.dst, client.ip4)
4351 self.assertEqual(ip.src, self.nat_addr)
4352 self.assertEqual(tcp.dport, eh_port_out)
4353 self.assertEqual(tcp.sport, port_out)
4354 self.assert_packet_checksums_valid(p)
4356 self.logger.error(ppp("Unexpected or invalid packet:", p))
4360 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4361 self.assertEqual(len(sessions), 1)
4362 self.assertTrue(sessions[0].ext_host_valid)
4363 self.assertTrue(sessions[0].is_twicenat)
4364 self.vapi.nat44_del_session(
4365 sessions[0].inside_ip_address,
4366 sessions[0].inside_port,
4367 sessions[0].protocol,
4368 ext_host_address=sessions[0].ext_host_nat_address,
4369 ext_host_port=sessions[0].ext_host_nat_port)
4370 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
4371 self.assertEqual(len(sessions), 0)
4373 def test_twice_nat(self):
4375 self.twice_nat_common()
4377 def test_self_twice_nat_positive(self):
4378 """ Self Twice NAT44 (positive test) """
4379 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4381 def test_self_twice_nat_negative(self):
4382 """ Self Twice NAT44 (negative test) """
4383 self.twice_nat_common(self_twice_nat=True)
4385 def test_twice_nat_lb(self):
4386 """ Twice NAT44 local service load balancing """
4387 self.twice_nat_common(lb=True)
4389 def test_self_twice_nat_lb_positive(self):
4390 """ Self Twice NAT44 local service load balancing (positive test) """
4391 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4394 def test_self_twice_nat_lb_negative(self):
4395 """ Self Twice NAT44 local service load balancing (negative test) """
4396 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4399 def test_twice_nat_interface_addr(self):
4400 """ Acquire twice NAT44 addresses from interface """
4401 self.vapi.nat44_add_interface_addr(self.pg3.sw_if_index, twice_nat=1)
4403 # no address in NAT pool
4404 adresses = self.vapi.nat44_address_dump()
4405 self.assertEqual(0, len(adresses))
4407 # configure interface address and check NAT address pool
4408 self.pg3.config_ip4()
4409 adresses = self.vapi.nat44_address_dump()
4410 self.assertEqual(1, len(adresses))
4411 self.assertEqual(adresses[0].ip_address[0:4], self.pg3.local_ip4n)
4412 self.assertEqual(adresses[0].twice_nat, 1)
4414 # remove interface address and check NAT address pool
4415 self.pg3.unconfig_ip4()
4416 adresses = self.vapi.nat44_address_dump()
4417 self.assertEqual(0, len(adresses))
4419 def test_tcp_session_close_in(self):
4420 """ Close TCP session from inside network """
4421 self.tcp_port_out = 10505
4422 self.nat44_add_address(self.nat_addr)
4423 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4427 proto=IP_PROTOS.tcp,
4429 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4430 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4433 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4434 start_sessnum = len(sessions)
4436 self.initiate_tcp_session(self.pg0, self.pg1)
4438 # FIN packet in -> out
4439 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4440 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4441 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4442 flags="FA", seq=100, ack=300))
4443 self.pg0.add_stream(p)
4444 self.pg_enable_capture(self.pg_interfaces)
4446 self.pg1.get_capture(1)
4450 # ACK packet out -> in
4451 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4452 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4453 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4454 flags="A", seq=300, ack=101))
4457 # FIN packet out -> in
4458 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4459 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4460 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4461 flags="FA", seq=300, ack=101))
4464 self.pg1.add_stream(pkts)
4465 self.pg_enable_capture(self.pg_interfaces)
4467 self.pg0.get_capture(2)
4469 # ACK packet in -> out
4470 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4471 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4472 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4473 flags="A", seq=101, ack=301))
4474 self.pg0.add_stream(p)
4475 self.pg_enable_capture(self.pg_interfaces)
4477 self.pg1.get_capture(1)
4479 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4481 self.assertEqual(len(sessions) - start_sessnum, 0)
4483 def test_tcp_session_close_out(self):
4484 """ Close TCP session from outside network """
4485 self.tcp_port_out = 10505
4486 self.nat44_add_address(self.nat_addr)
4487 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4491 proto=IP_PROTOS.tcp,
4493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4497 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4498 start_sessnum = len(sessions)
4500 self.initiate_tcp_session(self.pg0, self.pg1)
4502 # FIN packet out -> in
4503 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4504 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4505 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4506 flags="FA", seq=100, ack=300))
4507 self.pg1.add_stream(p)
4508 self.pg_enable_capture(self.pg_interfaces)
4510 self.pg0.get_capture(1)
4512 # FIN+ACK packet in -> out
4513 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4514 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4515 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4516 flags="FA", seq=300, ack=101))
4518 self.pg0.add_stream(p)
4519 self.pg_enable_capture(self.pg_interfaces)
4521 self.pg1.get_capture(1)
4523 # ACK packet out -> in
4524 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4525 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4526 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4527 flags="A", seq=101, ack=301))
4528 self.pg1.add_stream(p)
4529 self.pg_enable_capture(self.pg_interfaces)
4531 self.pg0.get_capture(1)
4533 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4535 self.assertEqual(len(sessions) - start_sessnum, 0)
4537 def test_tcp_session_close_simultaneous(self):
4538 """ Close TCP session from inside network """
4539 self.tcp_port_out = 10505
4540 self.nat44_add_address(self.nat_addr)
4541 self.nat44_add_static_mapping(self.pg0.remote_ip4,
4545 proto=IP_PROTOS.tcp,
4547 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4548 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4551 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4552 start_sessnum = len(sessions)
4554 self.initiate_tcp_session(self.pg0, self.pg1)
4556 # FIN packet in -> out
4557 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4558 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4559 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4560 flags="FA", seq=100, ack=300))
4561 self.pg0.add_stream(p)
4562 self.pg_enable_capture(self.pg_interfaces)
4564 self.pg1.get_capture(1)
4566 # FIN packet out -> in
4567 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4568 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4569 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4570 flags="FA", seq=300, ack=100))
4571 self.pg1.add_stream(p)
4572 self.pg_enable_capture(self.pg_interfaces)
4574 self.pg0.get_capture(1)
4576 # ACK packet in -> out
4577 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4578 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4579 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4580 flags="A", seq=101, ack=301))
4581 self.pg0.add_stream(p)
4582 self.pg_enable_capture(self.pg_interfaces)
4584 self.pg1.get_capture(1)
4586 # ACK packet out -> in
4587 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4589 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4590 flags="A", seq=301, ack=101))
4591 self.pg1.add_stream(p)
4592 self.pg_enable_capture(self.pg_interfaces)
4594 self.pg0.get_capture(1)
4596 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4598 self.assertEqual(len(sessions) - start_sessnum, 0)
4600 def test_one_armed_nat44_static(self):
4601 """ One armed NAT44 and 1:1 NAPT asymmetrical rule """
4602 remote_host = self.pg4.remote_hosts[0]
4603 local_host = self.pg4.remote_hosts[1]
4608 self.vapi.nat44_forwarding_enable_disable(1)
4609 self.nat44_add_address(self.nat_addr, twice_nat=1)
4610 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
4611 local_port, external_port,
4612 proto=IP_PROTOS.tcp, out2in_only=1,
4614 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
4615 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index,
4618 # from client to service
4619 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4620 IP(src=remote_host.ip4, dst=self.nat_addr) /
4621 TCP(sport=12345, dport=external_port))
4622 self.pg4.add_stream(p)
4623 self.pg_enable_capture(self.pg_interfaces)
4625 capture = self.pg4.get_capture(1)
4630 self.assertEqual(ip.dst, local_host.ip4)
4631 self.assertEqual(ip.src, self.nat_addr)
4632 self.assertEqual(tcp.dport, local_port)
4633 self.assertNotEqual(tcp.sport, 12345)
4634 eh_port_in = tcp.sport
4635 self.assert_packet_checksums_valid(p)
4637 self.logger.error(ppp("Unexpected or invalid packet:", p))
4640 # from service back to client
4641 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
4642 IP(src=local_host.ip4, dst=self.nat_addr) /
4643 TCP(sport=local_port, dport=eh_port_in))
4644 self.pg4.add_stream(p)
4645 self.pg_enable_capture(self.pg_interfaces)
4647 capture = self.pg4.get_capture(1)
4652 self.assertEqual(ip.src, self.nat_addr)
4653 self.assertEqual(ip.dst, remote_host.ip4)
4654 self.assertEqual(tcp.sport, external_port)
4655 self.assertEqual(tcp.dport, 12345)
4656 self.assert_packet_checksums_valid(p)
4658 self.logger.error(ppp("Unexpected or invalid packet:", p))
4661 def test_static_with_port_out2(self):
4662 """ 1:1 NAPT asymmetrical rule """
4667 self.vapi.nat44_forwarding_enable_disable(1)
4668 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
4669 local_port, external_port,
4670 proto=IP_PROTOS.tcp, out2in_only=1)
4671 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4672 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4675 # from client to service
4676 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4677 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4678 TCP(sport=12345, dport=external_port))
4679 self.pg1.add_stream(p)
4680 self.pg_enable_capture(self.pg_interfaces)
4682 capture = self.pg0.get_capture(1)
4687 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4688 self.assertEqual(tcp.dport, local_port)
4689 self.assert_packet_checksums_valid(p)
4691 self.logger.error(ppp("Unexpected or invalid packet:", p))
4695 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4696 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4697 ICMP(type=11) / capture[0][IP])
4698 self.pg0.add_stream(p)
4699 self.pg_enable_capture(self.pg_interfaces)
4701 capture = self.pg1.get_capture(1)
4704 self.assertEqual(p[IP].src, self.nat_addr)
4706 self.assertEqual(inner.dst, self.nat_addr)
4707 self.assertEqual(inner[TCPerror].dport, external_port)
4709 self.logger.error(ppp("Unexpected or invalid packet:", p))
4712 # from service back to client
4713 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4714 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4715 TCP(sport=local_port, dport=12345))
4716 self.pg0.add_stream(p)
4717 self.pg_enable_capture(self.pg_interfaces)
4719 capture = self.pg1.get_capture(1)
4724 self.assertEqual(ip.src, self.nat_addr)
4725 self.assertEqual(tcp.sport, external_port)
4726 self.assert_packet_checksums_valid(p)
4728 self.logger.error(ppp("Unexpected or invalid packet:", p))
4732 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4733 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4734 ICMP(type=11) / capture[0][IP])
4735 self.pg1.add_stream(p)
4736 self.pg_enable_capture(self.pg_interfaces)
4738 capture = self.pg0.get_capture(1)
4741 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
4743 self.assertEqual(inner.src, self.pg0.remote_ip4)
4744 self.assertEqual(inner[TCPerror].sport, local_port)
4746 self.logger.error(ppp("Unexpected or invalid packet:", p))
4749 # from client to server (no translation)
4750 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4751 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
4752 TCP(sport=12346, dport=local_port))
4753 self.pg1.add_stream(p)
4754 self.pg_enable_capture(self.pg_interfaces)
4756 capture = self.pg0.get_capture(1)
4761 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4762 self.assertEqual(tcp.dport, local_port)
4763 self.assert_packet_checksums_valid(p)
4765 self.logger.error(ppp("Unexpected or invalid packet:", p))
4768 # from service back to client (no translation)
4769 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4770 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4771 TCP(sport=local_port, dport=12346))
4772 self.pg0.add_stream(p)
4773 self.pg_enable_capture(self.pg_interfaces)
4775 capture = self.pg1.get_capture(1)
4780 self.assertEqual(ip.src, self.pg0.remote_ip4)
4781 self.assertEqual(tcp.sport, local_port)
4782 self.assert_packet_checksums_valid(p)
4784 self.logger.error(ppp("Unexpected or invalid packet:", p))
4787 def test_output_feature(self):
4788 """ NAT44 interface output feature (in2out postrouting) """
4789 self.vapi.nat44_forwarding_enable_disable(1)
4790 self.nat44_add_address(self.nat_addr)
4791 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4793 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4797 pkts = self.create_stream_in(self.pg0, self.pg1)
4798 self.pg0.add_stream(pkts)
4799 self.pg_enable_capture(self.pg_interfaces)
4801 capture = self.pg1.get_capture(len(pkts))
4802 self.verify_capture_out(capture)
4805 pkts = self.create_stream_out(self.pg1)
4806 self.pg1.add_stream(pkts)
4807 self.pg_enable_capture(self.pg_interfaces)
4809 capture = self.pg0.get_capture(len(pkts))
4810 self.verify_capture_in(capture, self.pg0)
4812 def test_multiple_vrf(self):
4813 """ Multiple VRF setup """
4814 external_addr = '1.2.3.4'
4819 self.vapi.nat44_forwarding_enable_disable(1)
4820 self.nat44_add_address(self.nat_addr)
4821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4822 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4824 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
4826 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
4827 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index,
4829 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index,
4831 self.nat44_add_static_mapping(self.pg5.remote_ip4, external_addr,
4832 local_port, external_port, vrf_id=1,
4833 proto=IP_PROTOS.tcp, out2in_only=1)
4834 self.nat44_add_static_mapping(
4835 self.pg0.remote_ip4, external_sw_if_index=self.pg0.sw_if_index,
4836 local_port=local_port, vrf_id=0, external_port=external_port,
4837 proto=IP_PROTOS.tcp, out2in_only=1)
4839 # from client to service (both VRF1)
4840 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4841 IP(src=self.pg6.remote_ip4, dst=external_addr) /
4842 TCP(sport=12345, dport=external_port))
4843 self.pg6.add_stream(p)
4844 self.pg_enable_capture(self.pg_interfaces)
4846 capture = self.pg5.get_capture(1)
4851 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4852 self.assertEqual(tcp.dport, local_port)
4853 self.assert_packet_checksums_valid(p)
4855 self.logger.error(ppp("Unexpected or invalid packet:", p))
4858 # from service back to client (both VRF1)
4859 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4860 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
4861 TCP(sport=local_port, dport=12345))
4862 self.pg5.add_stream(p)
4863 self.pg_enable_capture(self.pg_interfaces)
4865 capture = self.pg6.get_capture(1)
4870 self.assertEqual(ip.src, external_addr)
4871 self.assertEqual(tcp.sport, external_port)
4872 self.assert_packet_checksums_valid(p)
4874 self.logger.error(ppp("Unexpected or invalid packet:", p))
4877 # dynamic NAT from VRF1 to VRF0 (output-feature)
4878 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4879 IP(src=self.pg5.remote_ip4, dst=self.pg1.remote_ip4) /
4880 TCP(sport=2345, dport=22))
4881 self.pg5.add_stream(p)
4882 self.pg_enable_capture(self.pg_interfaces)
4884 capture = self.pg1.get_capture(1)
4889 self.assertEqual(ip.src, self.nat_addr)
4890 self.assertNotEqual(tcp.sport, 2345)
4891 self.assert_packet_checksums_valid(p)
4894 self.logger.error(ppp("Unexpected or invalid packet:", p))
4897 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4898 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4899 TCP(sport=22, dport=port))
4900 self.pg1.add_stream(p)
4901 self.pg_enable_capture(self.pg_interfaces)
4903 capture = self.pg5.get_capture(1)
4908 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4909 self.assertEqual(tcp.dport, 2345)
4910 self.assert_packet_checksums_valid(p)
4912 self.logger.error(ppp("Unexpected or invalid packet:", p))
4915 # from client VRF1 to service VRF0
4916 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4917 IP(src=self.pg6.remote_ip4, dst=self.pg0.local_ip4) /
4918 TCP(sport=12346, dport=external_port))
4919 self.pg6.add_stream(p)
4920 self.pg_enable_capture(self.pg_interfaces)
4922 capture = self.pg0.get_capture(1)
4927 self.assertEqual(ip.dst, self.pg0.remote_ip4)
4928 self.assertEqual(tcp.dport, local_port)
4929 self.assert_packet_checksums_valid(p)
4931 self.logger.error(ppp("Unexpected or invalid packet:", p))
4934 # from service VRF0 back to client VRF1
4935 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4936 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
4937 TCP(sport=local_port, dport=12346))
4938 self.pg0.add_stream(p)
4939 self.pg_enable_capture(self.pg_interfaces)
4941 capture = self.pg6.get_capture(1)
4946 self.assertEqual(ip.src, self.pg0.local_ip4)
4947 self.assertEqual(tcp.sport, external_port)
4948 self.assert_packet_checksums_valid(p)
4950 self.logger.error(ppp("Unexpected or invalid packet:", p))
4953 # from client VRF0 to service VRF1
4954 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4955 IP(src=self.pg0.remote_ip4, dst=external_addr) /
4956 TCP(sport=12347, dport=external_port))
4957 self.pg0.add_stream(p)
4958 self.pg_enable_capture(self.pg_interfaces)
4960 capture = self.pg5.get_capture(1)
4965 self.assertEqual(ip.dst, self.pg5.remote_ip4)
4966 self.assertEqual(tcp.dport, local_port)
4967 self.assert_packet_checksums_valid(p)
4969 self.logger.error(ppp("Unexpected or invalid packet:", p))
4972 # from service VRF1 back to client VRF0
4973 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
4974 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
4975 TCP(sport=local_port, dport=12347))
4976 self.pg5.add_stream(p)
4977 self.pg_enable_capture(self.pg_interfaces)
4979 capture = self.pg0.get_capture(1)
4984 self.assertEqual(ip.src, external_addr)
4985 self.assertEqual(tcp.sport, external_port)
4986 self.assert_packet_checksums_valid(p)
4988 self.logger.error(ppp("Unexpected or invalid packet:", p))
4991 # from client to server (both VRF1, no translation)
4992 p = (Ether(src=self.pg6.remote_mac, dst=self.pg6.local_mac) /
4993 IP(src=self.pg6.remote_ip4, dst=self.pg5.remote_ip4) /
4994 TCP(sport=12348, dport=local_port))
4995 self.pg6.add_stream(p)
4996 self.pg_enable_capture(self.pg_interfaces)
4998 capture = self.pg5.get_capture(1)
5003 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5004 self.assertEqual(tcp.dport, local_port)
5005 self.assert_packet_checksums_valid(p)
5007 self.logger.error(ppp("Unexpected or invalid packet:", p))
5010 # from server back to client (both VRF1, no translation)
5011 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5012 IP(src=self.pg5.remote_ip4, dst=self.pg6.remote_ip4) /
5013 TCP(sport=local_port, dport=12348))
5014 self.pg5.add_stream(p)
5015 self.pg_enable_capture(self.pg_interfaces)
5017 capture = self.pg6.get_capture(1)
5022 self.assertEqual(ip.src, self.pg5.remote_ip4)
5023 self.assertEqual(tcp.sport, local_port)
5024 self.assert_packet_checksums_valid(p)
5026 self.logger.error(ppp("Unexpected or invalid packet:", p))
5029 # from client VRF1 to server VRF0 (no translation)
5030 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5031 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
5032 TCP(sport=local_port, dport=12349))
5033 self.pg0.add_stream(p)
5034 self.pg_enable_capture(self.pg_interfaces)
5036 capture = self.pg6.get_capture(1)
5041 self.assertEqual(ip.src, self.pg0.remote_ip4)
5042 self.assertEqual(tcp.sport, local_port)
5043 self.assert_packet_checksums_valid(p)
5045 self.logger.error(ppp("Unexpected or invalid packet:", p))
5048 # from server VRF0 back to client VRF1 (no translation)
5049 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5050 IP(src=self.pg0.remote_ip4, dst=self.pg6.remote_ip4) /
5051 TCP(sport=local_port, dport=12349))
5052 self.pg0.add_stream(p)
5053 self.pg_enable_capture(self.pg_interfaces)
5055 capture = self.pg6.get_capture(1)
5060 self.assertEqual(ip.src, self.pg0.remote_ip4)
5061 self.assertEqual(tcp.sport, local_port)
5062 self.assert_packet_checksums_valid(p)
5064 self.logger.error(ppp("Unexpected or invalid packet:", p))
5067 # from client VRF0 to server VRF1 (no translation)
5068 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5069 IP(src=self.pg0.remote_ip4, dst=self.pg5.remote_ip4) /
5070 TCP(sport=12344, dport=local_port))
5071 self.pg0.add_stream(p)
5072 self.pg_enable_capture(self.pg_interfaces)
5074 capture = self.pg5.get_capture(1)
5079 self.assertEqual(ip.dst, self.pg5.remote_ip4)
5080 self.assertEqual(tcp.dport, local_port)
5081 self.assert_packet_checksums_valid(p)
5083 self.logger.error(ppp("Unexpected or invalid packet:", p))
5086 # from server VRF1 back to client VRF0 (no translation)
5087 p = (Ether(src=self.pg5.remote_mac, dst=self.pg5.local_mac) /
5088 IP(src=self.pg5.remote_ip4, dst=self.pg0.remote_ip4) /
5089 TCP(sport=local_port, dport=12344))
5090 self.pg5.add_stream(p)
5091 self.pg_enable_capture(self.pg_interfaces)
5093 capture = self.pg0.get_capture(1)
5098 self.assertEqual(ip.src, self.pg5.remote_ip4)
5099 self.assertEqual(tcp.sport, local_port)
5100 self.assert_packet_checksums_valid(p)
5102 self.logger.error(ppp("Unexpected or invalid packet:", p))
5105 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5106 def test_session_timeout(self):
5107 """ NAT44 session timeouts """
5108 self.nat44_add_address(self.nat_addr)
5109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5112 self.vapi.nat_set_timeouts(icmp=5)
5116 for i in range(0, max_sessions):
5117 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5118 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5119 IP(src=src, dst=self.pg1.remote_ip4) /
5120 ICMP(id=1025, type='echo-request'))
5122 self.pg0.add_stream(pkts)
5123 self.pg_enable_capture(self.pg_interfaces)
5125 self.pg1.get_capture(max_sessions)
5130 for i in range(0, max_sessions):
5131 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
5132 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5133 IP(src=src, dst=self.pg1.remote_ip4) /
5134 ICMP(id=1026, type='echo-request'))
5136 self.pg0.add_stream(pkts)
5137 self.pg_enable_capture(self.pg_interfaces)
5139 self.pg1.get_capture(max_sessions)
5142 users = self.vapi.nat44_user_dump()
5144 nsessions = nsessions + user.nsessions
5145 self.assertLess(nsessions, 2 * max_sessions)
5147 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5148 def test_session_limit_per_user(self):
5149 """ Maximum sessions per user limit """
5150 self.nat44_add_address(self.nat_addr)
5151 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5152 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5154 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5155 src_address=self.pg2.local_ip4n,
5157 template_interval=10)
5159 # get maximum number of translations per user
5160 nat44_config = self.vapi.nat_show_config()
5163 for port in range(0, nat44_config.max_translations_per_user):
5164 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5165 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5166 UDP(sport=1025 + port, dport=1025 + port))
5169 self.pg0.add_stream(pkts)
5170 self.pg_enable_capture(self.pg_interfaces)
5172 capture = self.pg1.get_capture(len(pkts))
5174 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5175 src_port=self.ipfix_src_port)
5177 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5178 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5179 UDP(sport=3001, dport=3002))
5180 self.pg0.add_stream(p)
5181 self.pg_enable_capture(self.pg_interfaces)
5183 capture = self.pg1.assert_nothing_captured()
5185 # verify IPFIX logging
5186 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5188 capture = self.pg2.get_capture(10)
5189 ipfix = IPFIXDecoder()
5190 # first load template
5192 self.assertTrue(p.haslayer(IPFIX))
5193 if p.haslayer(Template):
5194 ipfix.add_template(p.getlayer(Template))
5195 # verify events in data set
5197 if p.haslayer(Data):
5198 data = ipfix.decode_data_set(p.getlayer(Set))
5199 self.verify_ipfix_max_entries_per_user(
5201 nat44_config.max_translations_per_user,
5202 self.pg0.remote_ip4n)
5205 super(TestNAT44EndpointDependent, self).tearDown()
5206 if not self.vpp_dead:
5207 self.logger.info(self.vapi.cli("show nat44 addresses"))
5208 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5209 self.logger.info(self.vapi.cli("show nat44 static mappings"))
5210 self.logger.info(self.vapi.cli("show nat44 interface address"))
5211 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
5212 self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
5213 self.logger.info(self.vapi.cli("show nat timeouts"))
5215 self.vapi.cli("clear logging")
5218 class TestNAT44Out2InDPO(MethodHolder):
5219 """ NAT44 Test Cases using out2in DPO """
5222 def setUpConstants(cls):
5223 super(TestNAT44Out2InDPO, cls).setUpConstants()
5224 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
5227 def setUpClass(cls):
5228 super(TestNAT44Out2InDPO, cls).setUpClass()
5229 cls.vapi.cli("set log class nat level debug")
5232 cls.tcp_port_in = 6303
5233 cls.tcp_port_out = 6303
5234 cls.udp_port_in = 6304
5235 cls.udp_port_out = 6304
5236 cls.icmp_id_in = 6305
5237 cls.icmp_id_out = 6305
5238 cls.nat_addr = '10.0.0.3'
5239 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5240 cls.dst_ip4 = '192.168.70.1'
5242 cls.create_pg_interfaces(range(2))
5245 cls.pg0.config_ip4()
5246 cls.pg0.resolve_arp()
5249 cls.pg1.config_ip6()
5250 cls.pg1.resolve_ndp()
5252 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
5253 dst_address_length=0,
5254 next_hop_address=cls.pg1.remote_ip6n,
5255 next_hop_sw_if_index=cls.pg1.sw_if_index)
5258 super(TestNAT44Out2InDPO, cls).tearDownClass()
5261 def configure_xlat(self):
5262 self.dst_ip6_pfx = '1:2:3::'
5263 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5265 self.dst_ip6_pfx_len = 96
5266 self.src_ip6_pfx = '4:5:6::'
5267 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
5269 self.src_ip6_pfx_len = 96
5270 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
5271 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
5272 '\x00\x00\x00\x00', 0, is_translation=1,
5275 def test_464xlat_ce(self):
5276 """ Test 464XLAT CE with NAT44 """
5278 nat_config = self.vapi.nat_show_config()
5279 self.assertEqual(1, nat_config.out2in_dpo)
5281 self.configure_xlat()
5283 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5284 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
5286 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5287 self.dst_ip6_pfx_len)
5288 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
5289 self.src_ip6_pfx_len)
5292 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5293 self.pg0.add_stream(pkts)
5294 self.pg_enable_capture(self.pg_interfaces)
5296 capture = self.pg1.get_capture(len(pkts))
5297 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
5300 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
5302 self.pg1.add_stream(pkts)
5303 self.pg_enable_capture(self.pg_interfaces)
5305 capture = self.pg0.get_capture(len(pkts))
5306 self.verify_capture_in(capture, self.pg0)
5308 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
5310 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
5311 self.nat_addr_n, is_add=0)
5313 def test_464xlat_ce_no_nat(self):
5314 """ Test 464XLAT CE without NAT44 """
5316 self.configure_xlat()
5318 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
5319 self.dst_ip6_pfx_len)
5320 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
5321 self.src_ip6_pfx_len)
5323 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
5324 self.pg0.add_stream(pkts)
5325 self.pg_enable_capture(self.pg_interfaces)
5327 capture = self.pg1.get_capture(len(pkts))
5328 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
5329 nat_ip=out_dst_ip6, same_port=True)
5331 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
5332 self.pg1.add_stream(pkts)
5333 self.pg_enable_capture(self.pg_interfaces)
5335 capture = self.pg0.get_capture(len(pkts))
5336 self.verify_capture_in(capture, self.pg0)
5339 class TestDeterministicNAT(MethodHolder):
5340 """ Deterministic NAT Test Cases """
5343 def setUpConstants(cls):
5344 super(TestDeterministicNAT, cls).setUpConstants()
5345 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
5348 def setUpClass(cls):
5349 super(TestDeterministicNAT, cls).setUpClass()
5350 cls.vapi.cli("set log class nat level debug")
5353 cls.tcp_port_in = 6303
5354 cls.tcp_external_port = 6303
5355 cls.udp_port_in = 6304
5356 cls.udp_external_port = 6304
5357 cls.icmp_id_in = 6305
5358 cls.nat_addr = '10.0.0.3'
5360 cls.create_pg_interfaces(range(3))
5361 cls.interfaces = list(cls.pg_interfaces)
5363 for i in cls.interfaces:
5368 cls.pg0.generate_remote_hosts(2)
5369 cls.pg0.configure_ipv4_neighbors()
5372 super(TestDeterministicNAT, cls).tearDownClass()
5375 def create_stream_in(self, in_if, out_if, ttl=64):
5377 Create packet stream for inside network
5379 :param in_if: Inside interface
5380 :param out_if: Outside interface
5381 :param ttl: TTL of generated packets
5385 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5386 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5387 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
5391 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5392 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5393 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
5397 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
5398 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
5399 ICMP(id=self.icmp_id_in, type='echo-request'))
5404 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
5406 Create packet stream for outside network
5408 :param out_if: Outside interface
5409 :param dst_ip: Destination IP address (Default use global NAT address)
5410 :param ttl: TTL of generated packets
5413 dst_ip = self.nat_addr
5416 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5417 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5418 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
5422 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5423 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5424 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
5428 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
5429 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
5430 ICMP(id=self.icmp_external_id, type='echo-reply'))
5435 def verify_capture_out(self, capture, nat_ip=None):
5437 Verify captured packets on outside network
5439 :param capture: Captured packets
5440 :param nat_ip: Translated IP address (Default use global NAT address)
5441 :param same_port: Sorce port number is not translated (Default False)
5444 nat_ip = self.nat_addr
5445 for packet in capture:
5447 self.assertEqual(packet[IP].src, nat_ip)
5448 if packet.haslayer(TCP):
5449 self.tcp_port_out = packet[TCP].sport
5450 elif packet.haslayer(UDP):
5451 self.udp_port_out = packet[UDP].sport
5453 self.icmp_external_id = packet[ICMP].id
5455 self.logger.error(ppp("Unexpected or invalid packet "
5456 "(outside network):", packet))
5459 def test_deterministic_mode(self):
5460 """ NAT plugin run deterministic mode """
5461 in_addr = '172.16.255.0'
5462 out_addr = '172.17.255.50'
5463 in_addr_t = '172.16.255.20'
5464 in_addr_n = socket.inet_aton(in_addr)
5465 out_addr_n = socket.inet_aton(out_addr)
5466 in_addr_t_n = socket.inet_aton(in_addr_t)
5470 nat_config = self.vapi.nat_show_config()
5471 self.assertEqual(1, nat_config.deterministic)
5473 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
5475 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
5476 self.assertEqual(rep1.out_addr[:4], out_addr_n)
5477 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
5478 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
5480 deterministic_mappings = self.vapi.nat_det_map_dump()
5481 self.assertEqual(len(deterministic_mappings), 1)
5482 dsm = deterministic_mappings[0]
5483 self.assertEqual(in_addr_n, dsm.in_addr[:4])
5484 self.assertEqual(in_plen, dsm.in_plen)
5485 self.assertEqual(out_addr_n, dsm.out_addr[:4])
5486 self.assertEqual(out_plen, dsm.out_plen)
5488 self.clear_nat_det()
5489 deterministic_mappings = self.vapi.nat_det_map_dump()
5490 self.assertEqual(len(deterministic_mappings), 0)
5492 def test_set_timeouts(self):
5493 """ Set deterministic NAT timeouts """
5494 timeouts_before = self.vapi.nat_get_timeouts()
5496 self.vapi.nat_set_timeouts(timeouts_before.udp + 10,
5497 timeouts_before.tcp_established + 10,
5498 timeouts_before.tcp_transitory + 10,
5499 timeouts_before.icmp + 10)
5501 timeouts_after = self.vapi.nat_get_timeouts()
5503 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
5504 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
5505 self.assertNotEqual(timeouts_before.tcp_established,
5506 timeouts_after.tcp_established)
5507 self.assertNotEqual(timeouts_before.tcp_transitory,
5508 timeouts_after.tcp_transitory)
5510 def test_det_in(self):
5511 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
5513 nat_ip = "10.0.0.10"
5515 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5517 socket.inet_aton(nat_ip),
5519 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5520 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5524 pkts = self.create_stream_in(self.pg0, self.pg1)
5525 self.pg0.add_stream(pkts)
5526 self.pg_enable_capture(self.pg_interfaces)
5528 capture = self.pg1.get_capture(len(pkts))
5529 self.verify_capture_out(capture, nat_ip)
5532 pkts = self.create_stream_out(self.pg1, nat_ip)
5533 self.pg1.add_stream(pkts)
5534 self.pg_enable_capture(self.pg_interfaces)
5536 capture = self.pg0.get_capture(len(pkts))
5537 self.verify_capture_in(capture, self.pg0)
5540 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
5541 self.assertEqual(len(sessions), 3)
5545 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5546 self.assertEqual(s.in_port, self.tcp_port_in)
5547 self.assertEqual(s.out_port, self.tcp_port_out)
5548 self.assertEqual(s.ext_port, self.tcp_external_port)
5552 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5553 self.assertEqual(s.in_port, self.udp_port_in)
5554 self.assertEqual(s.out_port, self.udp_port_out)
5555 self.assertEqual(s.ext_port, self.udp_external_port)
5559 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
5560 self.assertEqual(s.in_port, self.icmp_id_in)
5561 self.assertEqual(s.out_port, self.icmp_external_id)
5563 def test_multiple_users(self):
5564 """ Deterministic NAT multiple users """
5566 nat_ip = "10.0.0.10"
5568 external_port = 6303
5570 host0 = self.pg0.remote_hosts[0]
5571 host1 = self.pg0.remote_hosts[1]
5573 self.vapi.nat_det_add_del_map(host0.ip4n,
5575 socket.inet_aton(nat_ip),
5577 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5578 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5582 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
5583 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
5584 TCP(sport=port_in, dport=external_port))
5585 self.pg0.add_stream(p)
5586 self.pg_enable_capture(self.pg_interfaces)
5588 capture = self.pg1.get_capture(1)
5593 self.assertEqual(ip.src, nat_ip)
5594 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5595 self.assertEqual(tcp.dport, external_port)
5596 port_out0 = tcp.sport
5598 self.logger.error(ppp("Unexpected or invalid packet:", p))
5602 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
5603 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
5604 TCP(sport=port_in, dport=external_port))
5605 self.pg0.add_stream(p)
5606 self.pg_enable_capture(self.pg_interfaces)
5608 capture = self.pg1.get_capture(1)
5613 self.assertEqual(ip.src, nat_ip)
5614 self.assertEqual(ip.dst, self.pg1.remote_ip4)
5615 self.assertEqual(tcp.dport, external_port)
5616 port_out1 = tcp.sport
5618 self.logger.error(ppp("Unexpected or invalid packet:", p))
5621 dms = self.vapi.nat_det_map_dump()
5622 self.assertEqual(1, len(dms))
5623 self.assertEqual(2, dms[0].ses_num)
5626 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5627 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5628 TCP(sport=external_port, dport=port_out0))
5629 self.pg1.add_stream(p)
5630 self.pg_enable_capture(self.pg_interfaces)
5632 capture = self.pg0.get_capture(1)
5637 self.assertEqual(ip.src, self.pg1.remote_ip4)
5638 self.assertEqual(ip.dst, host0.ip4)
5639 self.assertEqual(tcp.dport, port_in)
5640 self.assertEqual(tcp.sport, external_port)
5642 self.logger.error(ppp("Unexpected or invalid packet:", p))
5646 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5647 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
5648 TCP(sport=external_port, dport=port_out1))
5649 self.pg1.add_stream(p)
5650 self.pg_enable_capture(self.pg_interfaces)
5652 capture = self.pg0.get_capture(1)
5657 self.assertEqual(ip.src, self.pg1.remote_ip4)
5658 self.assertEqual(ip.dst, host1.ip4)
5659 self.assertEqual(tcp.dport, port_in)
5660 self.assertEqual(tcp.sport, external_port)
5662 self.logger.error(ppp("Unexpected or invalid packet", p))
5665 # session close api test
5666 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
5668 self.pg1.remote_ip4n,
5670 dms = self.vapi.nat_det_map_dump()
5671 self.assertEqual(dms[0].ses_num, 1)
5673 self.vapi.nat_det_close_session_in(host0.ip4n,
5675 self.pg1.remote_ip4n,
5677 dms = self.vapi.nat_det_map_dump()
5678 self.assertEqual(dms[0].ses_num, 0)
5680 def test_tcp_session_close_detection_in(self):
5681 """ Deterministic NAT TCP session close from inside network """
5682 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5684 socket.inet_aton(self.nat_addr),
5686 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5687 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5690 self.initiate_tcp_session(self.pg0, self.pg1)
5692 # close the session from inside
5694 # FIN packet in -> out
5695 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5696 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5697 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5699 self.pg0.add_stream(p)
5700 self.pg_enable_capture(self.pg_interfaces)
5702 self.pg1.get_capture(1)
5706 # ACK packet out -> in
5707 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5708 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5709 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5713 # FIN packet out -> in
5714 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5715 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5716 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5720 self.pg1.add_stream(pkts)
5721 self.pg_enable_capture(self.pg_interfaces)
5723 self.pg0.get_capture(2)
5725 # ACK packet in -> out
5726 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5727 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5728 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5730 self.pg0.add_stream(p)
5731 self.pg_enable_capture(self.pg_interfaces)
5733 self.pg1.get_capture(1)
5735 # Check if deterministic NAT44 closed the session
5736 dms = self.vapi.nat_det_map_dump()
5737 self.assertEqual(0, dms[0].ses_num)
5739 self.logger.error("TCP session termination failed")
5742 def test_tcp_session_close_detection_out(self):
5743 """ Deterministic NAT TCP session close from outside network """
5744 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5746 socket.inet_aton(self.nat_addr),
5748 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5749 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5752 self.initiate_tcp_session(self.pg0, self.pg1)
5754 # close the session from outside
5756 # FIN packet out -> in
5757 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5758 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5759 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5761 self.pg1.add_stream(p)
5762 self.pg_enable_capture(self.pg_interfaces)
5764 self.pg0.get_capture(1)
5768 # ACK packet in -> out
5769 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5770 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5771 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5775 # ACK packet in -> out
5776 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5777 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5778 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
5782 self.pg0.add_stream(pkts)
5783 self.pg_enable_capture(self.pg_interfaces)
5785 self.pg1.get_capture(2)
5787 # ACK packet out -> in
5788 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
5789 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5790 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
5792 self.pg1.add_stream(p)
5793 self.pg_enable_capture(self.pg_interfaces)
5795 self.pg0.get_capture(1)
5797 # Check if deterministic NAT44 closed the session
5798 dms = self.vapi.nat_det_map_dump()
5799 self.assertEqual(0, dms[0].ses_num)
5801 self.logger.error("TCP session termination failed")
5804 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5805 def test_session_timeout(self):
5806 """ Deterministic NAT session timeouts """
5807 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5809 socket.inet_aton(self.nat_addr),
5811 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5812 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5815 self.initiate_tcp_session(self.pg0, self.pg1)
5816 self.vapi.nat_set_timeouts(5, 5, 5, 5)
5817 pkts = self.create_stream_in(self.pg0, self.pg1)
5818 self.pg0.add_stream(pkts)
5819 self.pg_enable_capture(self.pg_interfaces)
5821 capture = self.pg1.get_capture(len(pkts))
5824 dms = self.vapi.nat_det_map_dump()
5825 self.assertEqual(0, dms[0].ses_num)
5827 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5828 def test_session_limit_per_user(self):
5829 """ Deterministic NAT maximum sessions per user limit """
5830 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
5832 socket.inet_aton(self.nat_addr),
5834 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
5835 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
5837 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
5838 src_address=self.pg2.local_ip4n,
5840 template_interval=10)
5841 self.vapi.nat_ipfix()
5844 for port in range(1025, 2025):
5845 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5846 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5847 UDP(sport=port, dport=port))
5850 self.pg0.add_stream(pkts)
5851 self.pg_enable_capture(self.pg_interfaces)
5853 capture = self.pg1.get_capture(len(pkts))
5855 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5856 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5857 UDP(sport=3001, dport=3002))
5858 self.pg0.add_stream(p)
5859 self.pg_enable_capture(self.pg_interfaces)
5861 capture = self.pg1.assert_nothing_captured()
5863 # verify ICMP error packet
5864 capture = self.pg0.get_capture(1)
5866 self.assertTrue(p.haslayer(ICMP))
5868 self.assertEqual(icmp.type, 3)
5869 self.assertEqual(icmp.code, 1)
5870 self.assertTrue(icmp.haslayer(IPerror))
5871 inner_ip = icmp[IPerror]
5872 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5873 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5875 dms = self.vapi.nat_det_map_dump()
5877 self.assertEqual(1000, dms[0].ses_num)
5879 # verify IPFIX logging
5880 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5882 capture = self.pg2.get_capture(2)
5883 ipfix = IPFIXDecoder()
5884 # first load template
5886 self.assertTrue(p.haslayer(IPFIX))
5887 if p.haslayer(Template):
5888 ipfix.add_template(p.getlayer(Template))
5889 # verify events in data set
5891 if p.haslayer(Data):
5892 data = ipfix.decode_data_set(p.getlayer(Set))
5893 self.verify_ipfix_max_entries_per_user(data,
5895 self.pg0.remote_ip4n)
5897 def clear_nat_det(self):
5899 Clear deterministic NAT configuration.
5901 self.vapi.nat_ipfix(enable=0)
5902 self.vapi.nat_set_timeouts()
5903 deterministic_mappings = self.vapi.nat_det_map_dump()
5904 for dsm in deterministic_mappings:
5905 self.vapi.nat_det_add_del_map(dsm.in_addr,
5911 interfaces = self.vapi.nat44_interface_dump()
5912 for intf in interfaces:
5913 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5918 super(TestDeterministicNAT, self).tearDown()
5919 if not self.vpp_dead:
5920 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5921 self.logger.info(self.vapi.cli("show nat timeouts"))
5923 self.vapi.cli("show nat44 deterministic mappings"))
5925 self.vapi.cli("show nat44 deterministic sessions"))
5926 self.clear_nat_det()
5929 class TestNAT64(MethodHolder):
5930 """ NAT64 Test Cases """
5933 def setUpConstants(cls):
5934 super(TestNAT64, cls).setUpConstants()
5935 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5936 "nat64 st hash buckets 256", "}"])
5939 def setUpClass(cls):
5940 super(TestNAT64, cls).setUpClass()
5943 cls.tcp_port_in = 6303
5944 cls.tcp_port_out = 6303
5945 cls.udp_port_in = 6304
5946 cls.udp_port_out = 6304
5947 cls.icmp_id_in = 6305
5948 cls.icmp_id_out = 6305
5949 cls.nat_addr = '10.0.0.3'
5950 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5952 cls.vrf1_nat_addr = '10.0.10.3'
5953 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5955 cls.ipfix_src_port = 4739
5956 cls.ipfix_domain_id = 1
5958 cls.create_pg_interfaces(range(6))
5959 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5960 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5961 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5963 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5965 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5967 cls.pg0.generate_remote_hosts(2)
5969 for i in cls.ip6_interfaces:
5972 i.configure_ipv6_neighbors()
5974 for i in cls.ip4_interfaces:
5980 cls.pg3.config_ip4()
5981 cls.pg3.resolve_arp()
5982 cls.pg3.config_ip6()
5983 cls.pg3.configure_ipv6_neighbors()
5986 cls.pg5.config_ip6()
5989 super(TestNAT64, cls).tearDownClass()
5992 def test_nat64_inside_interface_handles_neighbor_advertisement(self):
5993 """ NAT64 inside interface handles Neighbor Advertisement """
5995 self.vapi.nat64_add_del_interface(self.pg5.sw_if_index)
5998 ping = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
5999 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
6000 ICMPv6EchoRequest())
6002 self.pg5.add_stream(pkts)
6003 self.pg_enable_capture(self.pg_interfaces)
6006 # Wait for Neighbor Solicitation
6007 capture = self.pg5.get_capture(len(pkts))
6010 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
6011 self.assertTrue(packet.haslayer(ICMPv6ND_NS))
6012 tgt = packet[ICMPv6ND_NS].tgt
6014 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6017 # Send Neighbor Advertisement
6018 p = (Ether(dst=self.pg5.local_mac, src=self.pg5.remote_mac) /
6019 IPv6(src=self.pg5.remote_ip6, dst=self.pg5.local_ip6) /
6020 ICMPv6ND_NA(tgt=tgt) /
6021 ICMPv6NDOptDstLLAddr(lladdr=self.pg5.remote_mac))
6023 self.pg5.add_stream(pkts)
6024 self.pg_enable_capture(self.pg_interfaces)
6027 # Try to send ping again
6029 self.pg5.add_stream(pkts)
6030 self.pg_enable_capture(self.pg_interfaces)
6033 # Wait for ping reply
6034 capture = self.pg5.get_capture(len(pkts))
6037 self.assertEqual(packet[IPv6].src, self.pg5.local_ip6)
6038 self.assertEqual(packet[IPv6].dst, self.pg5.remote_ip6)
6039 self.assertTrue(packet.haslayer(ICMPv6EchoReply))
6041 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6044 def test_pool(self):
6045 """ Add/delete address to NAT64 pool """
6046 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
6048 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
6050 addresses = self.vapi.nat64_pool_addr_dump()
6051 self.assertEqual(len(addresses), 1)
6052 self.assertEqual(addresses[0].address, nat_addr)
6054 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
6056 addresses = self.vapi.nat64_pool_addr_dump()
6057 self.assertEqual(len(addresses), 0)
6059 def test_interface(self):
6060 """ Enable/disable NAT64 feature on the interface """
6061 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6062 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6064 interfaces = self.vapi.nat64_interface_dump()
6065 self.assertEqual(len(interfaces), 2)
6068 for intf in interfaces:
6069 if intf.sw_if_index == self.pg0.sw_if_index:
6070 self.assertEqual(intf.is_inside, 1)
6072 elif intf.sw_if_index == self.pg1.sw_if_index:
6073 self.assertEqual(intf.is_inside, 0)
6075 self.assertTrue(pg0_found)
6076 self.assertTrue(pg1_found)
6078 features = self.vapi.cli("show interface features pg0")
6079 self.assertNotEqual(features.find('nat64-in2out'), -1)
6080 features = self.vapi.cli("show interface features pg1")
6081 self.assertNotEqual(features.find('nat64-out2in'), -1)
6083 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
6084 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
6086 interfaces = self.vapi.nat64_interface_dump()
6087 self.assertEqual(len(interfaces), 0)
6089 def test_static_bib(self):
6090 """ Add/delete static BIB entry """
6091 in_addr = socket.inet_pton(socket.AF_INET6,
6092 '2001:db8:85a3::8a2e:370:7334')
6093 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
6096 proto = IP_PROTOS.tcp
6098 self.vapi.nat64_add_del_static_bib(in_addr,
6103 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6108 self.assertEqual(bibe.i_addr, in_addr)
6109 self.assertEqual(bibe.o_addr, out_addr)
6110 self.assertEqual(bibe.i_port, in_port)
6111 self.assertEqual(bibe.o_port, out_port)
6112 self.assertEqual(static_bib_num, 1)
6114 self.vapi.nat64_add_del_static_bib(in_addr,
6120 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
6125 self.assertEqual(static_bib_num, 0)
6127 def test_set_timeouts(self):
6128 """ Set NAT64 timeouts """
6129 # verify default values
6130 timeouts = self.vapi.nat_get_timeouts()
6131 self.assertEqual(timeouts.udp, 300)
6132 self.assertEqual(timeouts.icmp, 60)
6133 self.assertEqual(timeouts.tcp_transitory, 240)
6134 self.assertEqual(timeouts.tcp_established, 7440)
6136 # set and verify custom values
6137 self.vapi.nat_set_timeouts(udp=200, icmp=30, tcp_transitory=250,
6138 tcp_established=7450)
6139 timeouts = self.vapi.nat_get_timeouts()
6140 self.assertEqual(timeouts.udp, 200)
6141 self.assertEqual(timeouts.icmp, 30)
6142 self.assertEqual(timeouts.tcp_transitory, 250)
6143 self.assertEqual(timeouts.tcp_established, 7450)
6145 def test_dynamic(self):
6146 """ NAT64 dynamic translation test """
6147 self.tcp_port_in = 6303
6148 self.udp_port_in = 6304
6149 self.icmp_id_in = 6305
6151 ses_num_start = self.nat64_get_ses_num()
6153 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6155 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6156 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6159 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6160 self.pg0.add_stream(pkts)
6161 self.pg_enable_capture(self.pg_interfaces)
6163 capture = self.pg1.get_capture(len(pkts))
6164 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6165 dst_ip=self.pg1.remote_ip4)
6168 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6169 self.pg1.add_stream(pkts)
6170 self.pg_enable_capture(self.pg_interfaces)
6172 capture = self.pg0.get_capture(len(pkts))
6173 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6174 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6177 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6178 self.pg0.add_stream(pkts)
6179 self.pg_enable_capture(self.pg_interfaces)
6181 capture = self.pg1.get_capture(len(pkts))
6182 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6183 dst_ip=self.pg1.remote_ip4)
6186 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6187 self.pg1.add_stream(pkts)
6188 self.pg_enable_capture(self.pg_interfaces)
6190 capture = self.pg0.get_capture(len(pkts))
6191 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6193 ses_num_end = self.nat64_get_ses_num()
6195 self.assertEqual(ses_num_end - ses_num_start, 3)
6197 # tenant with specific VRF
6198 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6199 self.vrf1_nat_addr_n,
6200 vrf_id=self.vrf1_id)
6201 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6203 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
6204 self.pg2.add_stream(pkts)
6205 self.pg_enable_capture(self.pg_interfaces)
6207 capture = self.pg1.get_capture(len(pkts))
6208 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6209 dst_ip=self.pg1.remote_ip4)
6211 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6212 self.pg1.add_stream(pkts)
6213 self.pg_enable_capture(self.pg_interfaces)
6215 capture = self.pg2.get_capture(len(pkts))
6216 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
6218 def test_static(self):
6219 """ NAT64 static translation test """
6220 self.tcp_port_in = 60303
6221 self.udp_port_in = 60304
6222 self.icmp_id_in = 60305
6223 self.tcp_port_out = 60303
6224 self.udp_port_out = 60304
6225 self.icmp_id_out = 60305
6227 ses_num_start = self.nat64_get_ses_num()
6229 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6231 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6232 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6234 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6239 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6244 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
6251 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6252 self.pg0.add_stream(pkts)
6253 self.pg_enable_capture(self.pg_interfaces)
6255 capture = self.pg1.get_capture(len(pkts))
6256 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6257 dst_ip=self.pg1.remote_ip4, same_port=True)
6260 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6261 self.pg1.add_stream(pkts)
6262 self.pg_enable_capture(self.pg_interfaces)
6264 capture = self.pg0.get_capture(len(pkts))
6265 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6266 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
6268 ses_num_end = self.nat64_get_ses_num()
6270 self.assertEqual(ses_num_end - ses_num_start, 3)
6272 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6273 def test_session_timeout(self):
6274 """ NAT64 session timeout """
6275 self.icmp_id_in = 1234
6276 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6278 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6279 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6280 self.vapi.nat_set_timeouts(icmp=5, tcp_transitory=5, tcp_established=5)
6282 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6283 self.pg0.add_stream(pkts)
6284 self.pg_enable_capture(self.pg_interfaces)
6286 capture = self.pg1.get_capture(len(pkts))
6288 ses_num_before_timeout = self.nat64_get_ses_num()
6292 # ICMP and TCP session after timeout
6293 ses_num_after_timeout = self.nat64_get_ses_num()
6294 self.assertEqual(ses_num_before_timeout - ses_num_after_timeout, 2)
6296 def test_icmp_error(self):
6297 """ NAT64 ICMP Error message translation """
6298 self.tcp_port_in = 6303
6299 self.udp_port_in = 6304
6300 self.icmp_id_in = 6305
6302 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6304 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6305 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6307 # send some packets to create sessions
6308 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
6309 self.pg0.add_stream(pkts)
6310 self.pg_enable_capture(self.pg_interfaces)
6312 capture_ip4 = self.pg1.get_capture(len(pkts))
6313 self.verify_capture_out(capture_ip4,
6314 nat_ip=self.nat_addr,
6315 dst_ip=self.pg1.remote_ip4)
6317 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6318 self.pg1.add_stream(pkts)
6319 self.pg_enable_capture(self.pg_interfaces)
6321 capture_ip6 = self.pg0.get_capture(len(pkts))
6322 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
6323 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
6324 self.pg0.remote_ip6)
6327 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6328 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
6329 ICMPv6DestUnreach(code=1) /
6330 packet[IPv6] for packet in capture_ip6]
6331 self.pg0.add_stream(pkts)
6332 self.pg_enable_capture(self.pg_interfaces)
6334 capture = self.pg1.get_capture(len(pkts))
6335 for packet in capture:
6337 self.assertEqual(packet[IP].src, self.nat_addr)
6338 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6339 self.assertEqual(packet[ICMP].type, 3)
6340 self.assertEqual(packet[ICMP].code, 13)
6341 inner = packet[IPerror]
6342 self.assertEqual(inner.src, self.pg1.remote_ip4)
6343 self.assertEqual(inner.dst, self.nat_addr)
6344 self.assert_packet_checksums_valid(packet)
6345 if inner.haslayer(TCPerror):
6346 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
6347 elif inner.haslayer(UDPerror):
6348 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
6350 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
6352 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6356 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6357 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6358 ICMP(type=3, code=13) /
6359 packet[IP] for packet in capture_ip4]
6360 self.pg1.add_stream(pkts)
6361 self.pg_enable_capture(self.pg_interfaces)
6363 capture = self.pg0.get_capture(len(pkts))
6364 for packet in capture:
6366 self.assertEqual(packet[IPv6].src, ip.src)
6367 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6368 icmp = packet[ICMPv6DestUnreach]
6369 self.assertEqual(icmp.code, 1)
6370 inner = icmp[IPerror6]
6371 self.assertEqual(inner.src, self.pg0.remote_ip6)
6372 self.assertEqual(inner.dst, ip.src)
6373 self.assert_icmpv6_checksum_valid(packet)
6374 if inner.haslayer(TCPerror):
6375 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
6376 elif inner.haslayer(UDPerror):
6377 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
6379 self.assertEqual(inner[ICMPv6EchoRequest].id,
6382 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6385 def test_hairpinning(self):
6386 """ NAT64 hairpinning """
6388 client = self.pg0.remote_hosts[0]
6389 server = self.pg0.remote_hosts[1]
6390 server_tcp_in_port = 22
6391 server_tcp_out_port = 4022
6392 server_udp_in_port = 23
6393 server_udp_out_port = 4023
6394 client_tcp_in_port = 1234
6395 client_udp_in_port = 1235
6396 client_tcp_out_port = 0
6397 client_udp_out_port = 0
6398 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6399 nat_addr_ip6 = ip.src
6401 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6403 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6404 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6406 self.vapi.nat64_add_del_static_bib(server.ip6n,
6409 server_tcp_out_port,
6411 self.vapi.nat64_add_del_static_bib(server.ip6n,
6414 server_udp_out_port,
6419 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6420 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6421 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6423 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6424 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6425 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
6427 self.pg0.add_stream(pkts)
6428 self.pg_enable_capture(self.pg_interfaces)
6430 capture = self.pg0.get_capture(len(pkts))
6431 for packet in capture:
6433 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6434 self.assertEqual(packet[IPv6].dst, server.ip6)
6435 self.assert_packet_checksums_valid(packet)
6436 if packet.haslayer(TCP):
6437 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
6438 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
6439 client_tcp_out_port = packet[TCP].sport
6441 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
6442 self.assertEqual(packet[UDP].dport, server_udp_in_port)
6443 client_udp_out_port = packet[UDP].sport
6445 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6450 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6451 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6452 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
6454 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6455 IPv6(src=server.ip6, dst=nat_addr_ip6) /
6456 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
6458 self.pg0.add_stream(pkts)
6459 self.pg_enable_capture(self.pg_interfaces)
6461 capture = self.pg0.get_capture(len(pkts))
6462 for packet in capture:
6464 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6465 self.assertEqual(packet[IPv6].dst, client.ip6)
6466 self.assert_packet_checksums_valid(packet)
6467 if packet.haslayer(TCP):
6468 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
6469 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
6471 self.assertEqual(packet[UDP].sport, server_udp_out_port)
6472 self.assertEqual(packet[UDP].dport, client_udp_in_port)
6474 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6479 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6480 IPv6(src=client.ip6, dst=nat_addr_ip6) /
6481 ICMPv6DestUnreach(code=1) /
6482 packet[IPv6] for packet in capture]
6483 self.pg0.add_stream(pkts)
6484 self.pg_enable_capture(self.pg_interfaces)
6486 capture = self.pg0.get_capture(len(pkts))
6487 for packet in capture:
6489 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
6490 self.assertEqual(packet[IPv6].dst, server.ip6)
6491 icmp = packet[ICMPv6DestUnreach]
6492 self.assertEqual(icmp.code, 1)
6493 inner = icmp[IPerror6]
6494 self.assertEqual(inner.src, server.ip6)
6495 self.assertEqual(inner.dst, nat_addr_ip6)
6496 self.assert_packet_checksums_valid(packet)
6497 if inner.haslayer(TCPerror):
6498 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
6499 self.assertEqual(inner[TCPerror].dport,
6500 client_tcp_out_port)
6502 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
6503 self.assertEqual(inner[UDPerror].dport,
6504 client_udp_out_port)
6506 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6509 def test_prefix(self):
6510 """ NAT64 Network-Specific Prefix """
6512 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6514 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6515 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6516 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
6517 self.vrf1_nat_addr_n,
6518 vrf_id=self.vrf1_id)
6519 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
6522 global_pref64 = "2001:db8::"
6523 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
6524 global_pref64_len = 32
6525 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
6527 prefix = self.vapi.nat64_prefix_dump()
6528 self.assertEqual(len(prefix), 1)
6529 self.assertEqual(prefix[0].prefix, global_pref64_n)
6530 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
6531 self.assertEqual(prefix[0].vrf_id, 0)
6533 # Add tenant specific prefix
6534 vrf1_pref64 = "2001:db8:122:300::"
6535 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
6536 vrf1_pref64_len = 56
6537 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
6539 vrf_id=self.vrf1_id)
6540 prefix = self.vapi.nat64_prefix_dump()
6541 self.assertEqual(len(prefix), 2)
6544 pkts = self.create_stream_in_ip6(self.pg0,
6547 plen=global_pref64_len)
6548 self.pg0.add_stream(pkts)
6549 self.pg_enable_capture(self.pg_interfaces)
6551 capture = self.pg1.get_capture(len(pkts))
6552 self.verify_capture_out(capture, nat_ip=self.nat_addr,
6553 dst_ip=self.pg1.remote_ip4)
6555 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
6556 self.pg1.add_stream(pkts)
6557 self.pg_enable_capture(self.pg_interfaces)
6559 capture = self.pg0.get_capture(len(pkts))
6560 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6563 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
6565 # Tenant specific prefix
6566 pkts = self.create_stream_in_ip6(self.pg2,
6569 plen=vrf1_pref64_len)
6570 self.pg2.add_stream(pkts)
6571 self.pg_enable_capture(self.pg_interfaces)
6573 capture = self.pg1.get_capture(len(pkts))
6574 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
6575 dst_ip=self.pg1.remote_ip4)
6577 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
6578 self.pg1.add_stream(pkts)
6579 self.pg_enable_capture(self.pg_interfaces)
6581 capture = self.pg2.get_capture(len(pkts))
6582 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
6585 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
6587 def test_unknown_proto(self):
6588 """ NAT64 translate packet with unknown protocol """
6590 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6592 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6593 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6594 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6597 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6598 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
6599 TCP(sport=self.tcp_port_in, dport=20))
6600 self.pg0.add_stream(p)
6601 self.pg_enable_capture(self.pg_interfaces)
6603 p = self.pg1.get_capture(1)
6605 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6606 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
6608 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6609 TCP(sport=1234, dport=1234))
6610 self.pg0.add_stream(p)
6611 self.pg_enable_capture(self.pg_interfaces)
6613 p = self.pg1.get_capture(1)
6616 self.assertEqual(packet[IP].src, self.nat_addr)
6617 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
6618 self.assertTrue(packet.haslayer(GRE))
6619 self.assert_packet_checksums_valid(packet)
6621 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6625 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6626 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
6628 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6629 TCP(sport=1234, dport=1234))
6630 self.pg1.add_stream(p)
6631 self.pg_enable_capture(self.pg_interfaces)
6633 p = self.pg0.get_capture(1)
6636 self.assertEqual(packet[IPv6].src, remote_ip6)
6637 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6638 self.assertEqual(packet[IPv6].nh, 47)
6640 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6643 def test_hairpinning_unknown_proto(self):
6644 """ NAT64 translate packet with unknown protocol - hairpinning """
6646 client = self.pg0.remote_hosts[0]
6647 server = self.pg0.remote_hosts[1]
6648 server_tcp_in_port = 22
6649 server_tcp_out_port = 4022
6650 client_tcp_in_port = 1234
6651 client_tcp_out_port = 1235
6652 server_nat_ip = "10.0.0.100"
6653 client_nat_ip = "10.0.0.110"
6654 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
6655 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
6656 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
6657 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
6659 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
6661 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6662 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6664 self.vapi.nat64_add_del_static_bib(server.ip6n,
6667 server_tcp_out_port,
6670 self.vapi.nat64_add_del_static_bib(server.ip6n,
6676 self.vapi.nat64_add_del_static_bib(client.ip6n,
6679 client_tcp_out_port,
6683 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6684 IPv6(src=client.ip6, dst=server_nat_ip6) /
6685 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
6686 self.pg0.add_stream(p)
6687 self.pg_enable_capture(self.pg_interfaces)
6689 p = self.pg0.get_capture(1)
6691 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6692 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
6694 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
6695 TCP(sport=1234, dport=1234))
6696 self.pg0.add_stream(p)
6697 self.pg_enable_capture(self.pg_interfaces)
6699 p = self.pg0.get_capture(1)
6702 self.assertEqual(packet[IPv6].src, client_nat_ip6)
6703 self.assertEqual(packet[IPv6].dst, server.ip6)
6704 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6706 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6710 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6711 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
6713 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
6714 TCP(sport=1234, dport=1234))
6715 self.pg0.add_stream(p)
6716 self.pg_enable_capture(self.pg_interfaces)
6718 p = self.pg0.get_capture(1)
6721 self.assertEqual(packet[IPv6].src, server_nat_ip6)
6722 self.assertEqual(packet[IPv6].dst, client.ip6)
6723 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
6725 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6728 def test_one_armed_nat64(self):
6729 """ One armed NAT64 """
6731 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
6735 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6737 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
6738 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
6741 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6742 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
6743 TCP(sport=12345, dport=80))
6744 self.pg3.add_stream(p)
6745 self.pg_enable_capture(self.pg_interfaces)
6747 capture = self.pg3.get_capture(1)
6752 self.assertEqual(ip.src, self.nat_addr)
6753 self.assertEqual(ip.dst, self.pg3.remote_ip4)
6754 self.assertNotEqual(tcp.sport, 12345)
6755 external_port = tcp.sport
6756 self.assertEqual(tcp.dport, 80)
6757 self.assert_packet_checksums_valid(p)
6759 self.logger.error(ppp("Unexpected or invalid packet:", p))
6763 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
6764 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
6765 TCP(sport=80, dport=external_port))
6766 self.pg3.add_stream(p)
6767 self.pg_enable_capture(self.pg_interfaces)
6769 capture = self.pg3.get_capture(1)
6774 self.assertEqual(ip.src, remote_host_ip6)
6775 self.assertEqual(ip.dst, self.pg3.remote_ip6)
6776 self.assertEqual(tcp.sport, 80)
6777 self.assertEqual(tcp.dport, 12345)
6778 self.assert_packet_checksums_valid(p)
6780 self.logger.error(ppp("Unexpected or invalid packet:", p))
6783 def test_frag_in_order(self):
6784 """ NAT64 translate fragments arriving in order """
6785 self.tcp_port_in = random.randint(1025, 65535)
6787 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6789 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6790 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6792 reass = self.vapi.nat_reass_dump()
6793 reass_n_start = len(reass)
6797 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6798 self.tcp_port_in, 20, data)
6799 self.pg0.add_stream(pkts)
6800 self.pg_enable_capture(self.pg_interfaces)
6802 frags = self.pg1.get_capture(len(pkts))
6803 p = self.reass_frags_and_verify(frags,
6805 self.pg1.remote_ip4)
6806 self.assertEqual(p[TCP].dport, 20)
6807 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6808 self.tcp_port_out = p[TCP].sport
6809 self.assertEqual(data, p[Raw].load)
6812 data = "A" * 4 + "b" * 16 + "C" * 3
6813 pkts = self.create_stream_frag(self.pg1,
6818 self.pg1.add_stream(pkts)
6819 self.pg_enable_capture(self.pg_interfaces)
6821 frags = self.pg0.get_capture(len(pkts))
6822 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6823 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6824 self.assertEqual(p[TCP].sport, 20)
6825 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6826 self.assertEqual(data, p[Raw].load)
6828 reass = self.vapi.nat_reass_dump()
6829 reass_n_end = len(reass)
6831 self.assertEqual(reass_n_end - reass_n_start, 2)
6833 def test_reass_hairpinning(self):
6834 """ NAT64 fragments hairpinning """
6836 server = self.pg0.remote_hosts[1]
6837 server_in_port = random.randint(1025, 65535)
6838 server_out_port = random.randint(1025, 65535)
6839 client_in_port = random.randint(1025, 65535)
6840 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
6841 nat_addr_ip6 = ip.src
6843 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6845 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6846 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6848 # add static BIB entry for server
6849 self.vapi.nat64_add_del_static_bib(server.ip6n,
6855 # send packet from host to server
6856 pkts = self.create_stream_frag_ip6(self.pg0,
6861 self.pg0.add_stream(pkts)
6862 self.pg_enable_capture(self.pg_interfaces)
6864 frags = self.pg0.get_capture(len(pkts))
6865 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
6866 self.assertNotEqual(p[TCP].sport, client_in_port)
6867 self.assertEqual(p[TCP].dport, server_in_port)
6868 self.assertEqual(data, p[Raw].load)
6870 def test_frag_out_of_order(self):
6871 """ NAT64 translate fragments arriving out of order """
6872 self.tcp_port_in = random.randint(1025, 65535)
6874 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6876 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6877 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6881 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6882 self.tcp_port_in, 20, data)
6884 self.pg0.add_stream(pkts)
6885 self.pg_enable_capture(self.pg_interfaces)
6887 frags = self.pg1.get_capture(len(pkts))
6888 p = self.reass_frags_and_verify(frags,
6890 self.pg1.remote_ip4)
6891 self.assertEqual(p[TCP].dport, 20)
6892 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
6893 self.tcp_port_out = p[TCP].sport
6894 self.assertEqual(data, p[Raw].load)
6897 data = "A" * 4 + "B" * 16 + "C" * 3
6898 pkts = self.create_stream_frag(self.pg1,
6904 self.pg1.add_stream(pkts)
6905 self.pg_enable_capture(self.pg_interfaces)
6907 frags = self.pg0.get_capture(len(pkts))
6908 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6909 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6910 self.assertEqual(p[TCP].sport, 20)
6911 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6912 self.assertEqual(data, p[Raw].load)
6914 def test_interface_addr(self):
6915 """ Acquire NAT64 pool addresses from interface """
6916 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6918 # no address in NAT64 pool
6919 adresses = self.vapi.nat44_address_dump()
6920 self.assertEqual(0, len(adresses))
6922 # configure interface address and check NAT64 address pool
6923 self.pg4.config_ip4()
6924 addresses = self.vapi.nat64_pool_addr_dump()
6925 self.assertEqual(len(addresses), 1)
6926 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6928 # remove interface address and check NAT64 address pool
6929 self.pg4.unconfig_ip4()
6930 addresses = self.vapi.nat64_pool_addr_dump()
6931 self.assertEqual(0, len(adresses))
6933 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6934 def test_ipfix_max_bibs_sessions(self):
6935 """ IPFIX logging maximum session and BIB entries exceeded """
6938 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6942 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6944 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6945 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6949 for i in range(0, max_bibs):
6950 src = "fd01:aa::%x" % (i)
6951 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6952 IPv6(src=src, dst=remote_host_ip6) /
6953 TCP(sport=12345, dport=80))
6955 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6956 IPv6(src=src, dst=remote_host_ip6) /
6957 TCP(sport=12345, dport=22))
6959 self.pg0.add_stream(pkts)
6960 self.pg_enable_capture(self.pg_interfaces)
6962 self.pg1.get_capture(max_sessions)
6964 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6965 src_address=self.pg3.local_ip4n,
6967 template_interval=10)
6968 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6969 src_port=self.ipfix_src_port)
6971 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6972 IPv6(src=src, dst=remote_host_ip6) /
6973 TCP(sport=12345, dport=25))
6974 self.pg0.add_stream(p)
6975 self.pg_enable_capture(self.pg_interfaces)
6977 self.pg1.assert_nothing_captured()
6979 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6980 capture = self.pg3.get_capture(9)
6981 ipfix = IPFIXDecoder()
6982 # first load template
6984 self.assertTrue(p.haslayer(IPFIX))
6985 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6986 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6987 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6988 self.assertEqual(p[UDP].dport, 4739)
6989 self.assertEqual(p[IPFIX].observationDomainID,
6990 self.ipfix_domain_id)
6991 if p.haslayer(Template):
6992 ipfix.add_template(p.getlayer(Template))
6993 # verify events in data set
6995 if p.haslayer(Data):
6996 data = ipfix.decode_data_set(p.getlayer(Set))
6997 self.verify_ipfix_max_sessions(data, max_sessions)
6999 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7000 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7001 TCP(sport=12345, dport=80))
7002 self.pg0.add_stream(p)
7003 self.pg_enable_capture(self.pg_interfaces)
7005 self.pg1.assert_nothing_captured()
7007 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7008 capture = self.pg3.get_capture(1)
7009 # verify events in data set
7011 self.assertTrue(p.haslayer(IPFIX))
7012 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7013 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7014 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7015 self.assertEqual(p[UDP].dport, 4739)
7016 self.assertEqual(p[IPFIX].observationDomainID,
7017 self.ipfix_domain_id)
7018 if p.haslayer(Data):
7019 data = ipfix.decode_data_set(p.getlayer(Set))
7020 self.verify_ipfix_max_bibs(data, max_bibs)
7022 def test_ipfix_max_frags(self):
7023 """ IPFIX logging maximum fragments pending reassembly exceeded """
7024 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7026 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7027 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7028 self.vapi.nat_set_reass(max_frag=1, is_ip6=1)
7029 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7030 src_address=self.pg3.local_ip4n,
7032 template_interval=10)
7033 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7034 src_port=self.ipfix_src_port)
7037 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
7038 self.tcp_port_in, 20, data)
7040 self.pg0.add_stream(pkts)
7041 self.pg_enable_capture(self.pg_interfaces)
7043 self.pg1.assert_nothing_captured()
7045 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7046 capture = self.pg3.get_capture(9)
7047 ipfix = IPFIXDecoder()
7048 # first load template
7050 self.assertTrue(p.haslayer(IPFIX))
7051 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7052 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7053 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7054 self.assertEqual(p[UDP].dport, 4739)
7055 self.assertEqual(p[IPFIX].observationDomainID,
7056 self.ipfix_domain_id)
7057 if p.haslayer(Template):
7058 ipfix.add_template(p.getlayer(Template))
7059 # verify events in data set
7061 if p.haslayer(Data):
7062 data = ipfix.decode_data_set(p.getlayer(Set))
7063 self.verify_ipfix_max_fragments_ip6(data, 1,
7064 self.pg0.remote_ip6n)
7066 def test_ipfix_bib_ses(self):
7067 """ IPFIX logging NAT64 BIB/session create and delete events """
7068 self.tcp_port_in = random.randint(1025, 65535)
7069 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
7073 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7075 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
7076 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7077 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
7078 src_address=self.pg3.local_ip4n,
7080 template_interval=10)
7081 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
7082 src_port=self.ipfix_src_port)
7085 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
7086 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
7087 TCP(sport=self.tcp_port_in, dport=25))
7088 self.pg0.add_stream(p)
7089 self.pg_enable_capture(self.pg_interfaces)
7091 p = self.pg1.get_capture(1)
7092 self.tcp_port_out = p[0][TCP].sport
7093 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7094 capture = self.pg3.get_capture(10)
7095 ipfix = IPFIXDecoder()
7096 # first load template
7098 self.assertTrue(p.haslayer(IPFIX))
7099 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7100 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7101 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7102 self.assertEqual(p[UDP].dport, 4739)
7103 self.assertEqual(p[IPFIX].observationDomainID,
7104 self.ipfix_domain_id)
7105 if p.haslayer(Template):
7106 ipfix.add_template(p.getlayer(Template))
7107 # verify events in data set
7109 if p.haslayer(Data):
7110 data = ipfix.decode_data_set(p.getlayer(Set))
7111 if ord(data[0][230]) == 10:
7112 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
7113 elif ord(data[0][230]) == 6:
7114 self.verify_ipfix_nat64_ses(data,
7116 self.pg0.remote_ip6n,
7117 self.pg1.remote_ip4,
7120 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7123 self.pg_enable_capture(self.pg_interfaces)
7124 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
7127 self.vapi.cli("ipfix flush") # FIXME this should be an API call
7128 capture = self.pg3.get_capture(2)
7129 # verify events in data set
7131 self.assertTrue(p.haslayer(IPFIX))
7132 self.assertEqual(p[IP].src, self.pg3.local_ip4)
7133 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
7134 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
7135 self.assertEqual(p[UDP].dport, 4739)
7136 self.assertEqual(p[IPFIX].observationDomainID,
7137 self.ipfix_domain_id)
7138 if p.haslayer(Data):
7139 data = ipfix.decode_data_set(p.getlayer(Set))
7140 if ord(data[0][230]) == 11:
7141 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
7142 elif ord(data[0][230]) == 7:
7143 self.verify_ipfix_nat64_ses(data,
7145 self.pg0.remote_ip6n,
7146 self.pg1.remote_ip4,
7149 self.logger.error(ppp("Unexpected or invalid packet: ", p))
7151 def nat64_get_ses_num(self):
7153 Return number of active NAT64 sessions.
7155 st = self.vapi.nat64_st_dump()
7158 def clear_nat64(self):
7160 Clear NAT64 configuration.
7162 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
7163 domain_id=self.ipfix_domain_id)
7164 self.ipfix_src_port = 4739
7165 self.ipfix_domain_id = 1
7167 self.vapi.nat_set_timeouts()
7169 interfaces = self.vapi.nat64_interface_dump()
7170 for intf in interfaces:
7171 if intf.is_inside > 1:
7172 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7175 self.vapi.nat64_add_del_interface(intf.sw_if_index,
7179 bib = self.vapi.nat64_bib_dump(255)
7182 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
7190 adresses = self.vapi.nat64_pool_addr_dump()
7191 for addr in adresses:
7192 self.vapi.nat64_add_del_pool_addr_range(addr.address,
7197 prefixes = self.vapi.nat64_prefix_dump()
7198 for prefix in prefixes:
7199 self.vapi.nat64_add_del_prefix(prefix.prefix,
7201 vrf_id=prefix.vrf_id,
7205 super(TestNAT64, self).tearDown()
7206 if not self.vpp_dead:
7207 self.logger.info(self.vapi.cli("show nat64 pool"))
7208 self.logger.info(self.vapi.cli("show nat64 interfaces"))
7209 self.logger.info(self.vapi.cli("show nat64 prefix"))
7210 self.logger.info(self.vapi.cli("show nat64 bib all"))
7211 self.logger.info(self.vapi.cli("show nat64 session table all"))
7212 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
7216 class TestDSlite(MethodHolder):
7217 """ DS-Lite Test Cases """
7220 def setUpClass(cls):
7221 super(TestDSlite, cls).setUpClass()
7224 cls.nat_addr = '10.0.0.3'
7225 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
7227 cls.create_pg_interfaces(range(2))
7229 cls.pg0.config_ip4()
7230 cls.pg0.resolve_arp()
7232 cls.pg1.config_ip6()
7233 cls.pg1.generate_remote_hosts(2)
7234 cls.pg1.configure_ipv6_neighbors()
7237 super(TestDSlite, cls).tearDownClass()
7240 def test_dslite(self):
7241 """ Test DS-Lite """
7242 nat_config = self.vapi.nat_show_config()
7243 self.assertEqual(0, nat_config.dslite_ce)
7245 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
7247 aftr_ip4 = '192.0.0.1'
7248 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7249 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7250 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7251 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7254 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7255 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
7256 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7257 UDP(sport=20000, dport=10000))
7258 self.pg1.add_stream(p)
7259 self.pg_enable_capture(self.pg_interfaces)
7261 capture = self.pg0.get_capture(1)
7262 capture = capture[0]
7263 self.assertFalse(capture.haslayer(IPv6))
7264 self.assertEqual(capture[IP].src, self.nat_addr)
7265 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7266 self.assertNotEqual(capture[UDP].sport, 20000)
7267 self.assertEqual(capture[UDP].dport, 10000)
7268 self.assert_packet_checksums_valid(capture)
7269 out_port = capture[UDP].sport
7271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7272 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7273 UDP(sport=10000, dport=out_port))
7274 self.pg0.add_stream(p)
7275 self.pg_enable_capture(self.pg_interfaces)
7277 capture = self.pg1.get_capture(1)
7278 capture = capture[0]
7279 self.assertEqual(capture[IPv6].src, aftr_ip6)
7280 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7281 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7282 self.assertEqual(capture[IP].dst, '192.168.1.1')
7283 self.assertEqual(capture[UDP].sport, 10000)
7284 self.assertEqual(capture[UDP].dport, 20000)
7285 self.assert_packet_checksums_valid(capture)
7288 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7289 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7290 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7291 TCP(sport=20001, dport=10001))
7292 self.pg1.add_stream(p)
7293 self.pg_enable_capture(self.pg_interfaces)
7295 capture = self.pg0.get_capture(1)
7296 capture = capture[0]
7297 self.assertFalse(capture.haslayer(IPv6))
7298 self.assertEqual(capture[IP].src, self.nat_addr)
7299 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7300 self.assertNotEqual(capture[TCP].sport, 20001)
7301 self.assertEqual(capture[TCP].dport, 10001)
7302 self.assert_packet_checksums_valid(capture)
7303 out_port = capture[TCP].sport
7305 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7306 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7307 TCP(sport=10001, dport=out_port))
7308 self.pg0.add_stream(p)
7309 self.pg_enable_capture(self.pg_interfaces)
7311 capture = self.pg1.get_capture(1)
7312 capture = capture[0]
7313 self.assertEqual(capture[IPv6].src, aftr_ip6)
7314 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7315 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7316 self.assertEqual(capture[IP].dst, '192.168.1.1')
7317 self.assertEqual(capture[TCP].sport, 10001)
7318 self.assertEqual(capture[TCP].dport, 20001)
7319 self.assert_packet_checksums_valid(capture)
7322 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7323 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
7324 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
7325 ICMP(id=4000, type='echo-request'))
7326 self.pg1.add_stream(p)
7327 self.pg_enable_capture(self.pg_interfaces)
7329 capture = self.pg0.get_capture(1)
7330 capture = capture[0]
7331 self.assertFalse(capture.haslayer(IPv6))
7332 self.assertEqual(capture[IP].src, self.nat_addr)
7333 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7334 self.assertNotEqual(capture[ICMP].id, 4000)
7335 self.assert_packet_checksums_valid(capture)
7336 out_id = capture[ICMP].id
7338 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7339 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
7340 ICMP(id=out_id, type='echo-reply'))
7341 self.pg0.add_stream(p)
7342 self.pg_enable_capture(self.pg_interfaces)
7344 capture = self.pg1.get_capture(1)
7345 capture = capture[0]
7346 self.assertEqual(capture[IPv6].src, aftr_ip6)
7347 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7348 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7349 self.assertEqual(capture[IP].dst, '192.168.1.1')
7350 self.assertEqual(capture[ICMP].id, 4000)
7351 self.assert_packet_checksums_valid(capture)
7353 # ping DS-Lite AFTR tunnel endpoint address
7354 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7355 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
7356 ICMPv6EchoRequest())
7357 self.pg1.add_stream(p)
7358 self.pg_enable_capture(self.pg_interfaces)
7360 capture = self.pg1.get_capture(1)
7361 capture = capture[0]
7362 self.assertEqual(capture[IPv6].src, aftr_ip6)
7363 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
7364 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7367 super(TestDSlite, self).tearDown()
7368 if not self.vpp_dead:
7369 self.logger.info(self.vapi.cli("show dslite pool"))
7371 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7372 self.logger.info(self.vapi.cli("show dslite sessions"))
7375 class TestDSliteCE(MethodHolder):
7376 """ DS-Lite CE Test Cases """
7379 def setUpConstants(cls):
7380 super(TestDSliteCE, cls).setUpConstants()
7381 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
7384 def setUpClass(cls):
7385 super(TestDSliteCE, cls).setUpClass()
7388 cls.create_pg_interfaces(range(2))
7390 cls.pg0.config_ip4()
7391 cls.pg0.resolve_arp()
7393 cls.pg1.config_ip6()
7394 cls.pg1.generate_remote_hosts(1)
7395 cls.pg1.configure_ipv6_neighbors()
7398 super(TestDSliteCE, cls).tearDownClass()
7401 def test_dslite_ce(self):
7402 """ Test DS-Lite CE """
7404 nat_config = self.vapi.nat_show_config()
7405 self.assertEqual(1, nat_config.dslite_ce)
7407 b4_ip4 = '192.0.0.2'
7408 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
7409 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
7410 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
7411 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
7413 aftr_ip4 = '192.0.0.1'
7414 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
7415 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
7416 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
7417 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
7419 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
7420 dst_address_length=128,
7421 next_hop_address=self.pg1.remote_ip6n,
7422 next_hop_sw_if_index=self.pg1.sw_if_index,
7426 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7427 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
7428 UDP(sport=10000, dport=20000))
7429 self.pg0.add_stream(p)
7430 self.pg_enable_capture(self.pg_interfaces)
7432 capture = self.pg1.get_capture(1)
7433 capture = capture[0]
7434 self.assertEqual(capture[IPv6].src, b4_ip6)
7435 self.assertEqual(capture[IPv6].dst, aftr_ip6)
7436 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
7437 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
7438 self.assertEqual(capture[UDP].sport, 10000)
7439 self.assertEqual(capture[UDP].dport, 20000)
7440 self.assert_packet_checksums_valid(capture)
7443 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7444 IPv6(dst=b4_ip6, src=aftr_ip6) /
7445 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
7446 UDP(sport=20000, dport=10000))
7447 self.pg1.add_stream(p)
7448 self.pg_enable_capture(self.pg_interfaces)
7450 capture = self.pg0.get_capture(1)
7451 capture = capture[0]
7452 self.assertFalse(capture.haslayer(IPv6))
7453 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
7454 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
7455 self.assertEqual(capture[UDP].sport, 20000)
7456 self.assertEqual(capture[UDP].dport, 10000)
7457 self.assert_packet_checksums_valid(capture)
7459 # ping DS-Lite B4 tunnel endpoint address
7460 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7461 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
7462 ICMPv6EchoRequest())
7463 self.pg1.add_stream(p)
7464 self.pg_enable_capture(self.pg_interfaces)
7466 capture = self.pg1.get_capture(1)
7467 capture = capture[0]
7468 self.assertEqual(capture[IPv6].src, b4_ip6)
7469 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
7470 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
7473 super(TestDSliteCE, self).tearDown()
7474 if not self.vpp_dead:
7476 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
7478 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
7481 class TestNAT66(MethodHolder):
7482 """ NAT66 Test Cases """
7485 def setUpClass(cls):
7486 super(TestNAT66, cls).setUpClass()
7489 cls.nat_addr = 'fd01:ff::2'
7490 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
7492 cls.create_pg_interfaces(range(2))
7493 cls.interfaces = list(cls.pg_interfaces)
7495 for i in cls.interfaces:
7498 i.configure_ipv6_neighbors()
7501 super(TestNAT66, cls).tearDownClass()
7504 def test_static(self):
7505 """ 1:1 NAT66 test """
7506 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7507 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
7508 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7513 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7514 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7517 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7518 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7521 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7522 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7523 ICMPv6EchoRequest())
7525 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7526 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7527 GRE() / IP() / TCP())
7529 self.pg0.add_stream(pkts)
7530 self.pg_enable_capture(self.pg_interfaces)
7532 capture = self.pg1.get_capture(len(pkts))
7533 for packet in capture:
7535 self.assertEqual(packet[IPv6].src, self.nat_addr)
7536 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7537 self.assert_packet_checksums_valid(packet)
7539 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7544 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7545 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7548 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7549 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7552 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7553 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7556 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
7557 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
7558 GRE() / IP() / TCP())
7560 self.pg1.add_stream(pkts)
7561 self.pg_enable_capture(self.pg_interfaces)
7563 capture = self.pg0.get_capture(len(pkts))
7564 for packet in capture:
7566 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
7567 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
7568 self.assert_packet_checksums_valid(packet)
7570 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7573 sm = self.vapi.nat66_static_mapping_dump()
7574 self.assertEqual(len(sm), 1)
7575 self.assertEqual(sm[0].total_pkts, 8)
7577 def test_check_no_translate(self):
7578 """ NAT66 translate only when egress interface is outside interface """
7579 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
7580 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
7581 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
7585 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
7586 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
7588 self.pg0.add_stream([p])
7589 self.pg_enable_capture(self.pg_interfaces)
7591 capture = self.pg1.get_capture(1)
7594 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
7595 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
7597 self.logger.error(ppp("Unexpected or invalid packet:", packet))
7600 def clear_nat66(self):
7602 Clear NAT66 configuration.
7604 interfaces = self.vapi.nat66_interface_dump()
7605 for intf in interfaces:
7606 self.vapi.nat66_add_del_interface(intf.sw_if_index,
7610 static_mappings = self.vapi.nat66_static_mapping_dump()
7611 for sm in static_mappings:
7612 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
7613 sm.external_ip_address,
7618 super(TestNAT66, self).tearDown()
7619 if not self.vpp_dead:
7620 self.logger.info(self.vapi.cli("show nat66 interfaces"))
7621 self.logger.info(self.vapi.cli("show nat66 static mappings"))
7625 if __name__ == '__main__':
7626 unittest.main(testRunner=VppTestRunner)