9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def initiate_tcp_session(self, in_if, out_if):
698 Initiates TCP session
700 :param in_if: Inside interface
701 :param out_if: Outside interface
705 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
706 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
707 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
710 self.pg_enable_capture(self.pg_interfaces)
712 capture = out_if.get_capture(1)
714 self.tcp_port_out = p[TCP].sport
716 # SYN + ACK packet out->in
717 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
718 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
719 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
722 self.pg_enable_capture(self.pg_interfaces)
727 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
728 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
729 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
732 self.pg_enable_capture(self.pg_interfaces)
734 out_if.get_capture(1)
737 self.logger.error("TCP 3 way handshake failed")
740 def verify_ipfix_nat44_ses(self, data):
742 Verify IPFIX NAT44 session create/delete event
744 :param data: Decoded IPFIX data records
746 nat44_ses_create_num = 0
747 nat44_ses_delete_num = 0
748 self.assertEqual(6, len(data))
751 self.assertIn(ord(record[230]), [4, 5])
752 if ord(record[230]) == 4:
753 nat44_ses_create_num += 1
755 nat44_ses_delete_num += 1
757 self.assertEqual(self.pg0.remote_ip4n, record[8])
758 # postNATSourceIPv4Address
759 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
762 self.assertEqual(struct.pack("!I", 0), record[234])
763 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
764 if IP_PROTOS.icmp == ord(record[4]):
765 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
766 self.assertEqual(struct.pack("!H", self.icmp_id_out),
768 elif IP_PROTOS.tcp == ord(record[4]):
769 self.assertEqual(struct.pack("!H", self.tcp_port_in),
771 self.assertEqual(struct.pack("!H", self.tcp_port_out),
773 elif IP_PROTOS.udp == ord(record[4]):
774 self.assertEqual(struct.pack("!H", self.udp_port_in),
776 self.assertEqual(struct.pack("!H", self.udp_port_out),
779 self.fail("Invalid protocol")
780 self.assertEqual(3, nat44_ses_create_num)
781 self.assertEqual(3, nat44_ses_delete_num)
783 def verify_ipfix_addr_exhausted(self, data):
785 Verify IPFIX NAT addresses event
787 :param data: Decoded IPFIX data records
789 self.assertEqual(1, len(data))
792 self.assertEqual(ord(record[230]), 3)
794 self.assertEqual(struct.pack("!I", 0), record[283])
796 def verify_ipfix_max_sessions(self, data, limit):
798 Verify IPFIX maximum session entries exceeded event
800 :param data: Decoded IPFIX data records
801 :param limit: Number of maximum session entries that can be created.
803 self.assertEqual(1, len(data))
806 self.assertEqual(ord(record[230]), 13)
807 # natQuotaExceededEvent
808 self.assertEqual(struct.pack("I", 1), record[466])
810 self.assertEqual(struct.pack("I", limit), record[471])
812 def verify_ipfix_max_bibs(self, data, limit):
814 Verify IPFIX maximum BIB entries exceeded event
816 :param data: Decoded IPFIX data records
817 :param limit: Number of maximum BIB entries that can be created.
819 self.assertEqual(1, len(data))
822 self.assertEqual(ord(record[230]), 13)
823 # natQuotaExceededEvent
824 self.assertEqual(struct.pack("I", 2), record[466])
826 self.assertEqual(struct.pack("I", limit), record[472])
828 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
830 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
832 :param data: Decoded IPFIX data records
833 :param limit: Number of maximum fragments pending reassembly
834 :param src_addr: IPv6 source address
836 self.assertEqual(1, len(data))
839 self.assertEqual(ord(record[230]), 13)
840 # natQuotaExceededEvent
841 self.assertEqual(struct.pack("I", 5), record[466])
842 # maxFragmentsPendingReassembly
843 self.assertEqual(struct.pack("I", limit), record[475])
845 self.assertEqual(src_addr, record[27])
847 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
849 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
851 :param data: Decoded IPFIX data records
852 :param limit: Number of maximum fragments pending reassembly
853 :param src_addr: IPv4 source address
855 self.assertEqual(1, len(data))
858 self.assertEqual(ord(record[230]), 13)
859 # natQuotaExceededEvent
860 self.assertEqual(struct.pack("I", 5), record[466])
861 # maxFragmentsPendingReassembly
862 self.assertEqual(struct.pack("I", limit), record[475])
864 self.assertEqual(src_addr, record[8])
866 def verify_ipfix_bib(self, data, is_create, src_addr):
868 Verify IPFIX NAT64 BIB create and delete events
870 :param data: Decoded IPFIX data records
871 :param is_create: Create event if nonzero value otherwise delete event
872 :param src_addr: IPv6 source address
874 self.assertEqual(1, len(data))
878 self.assertEqual(ord(record[230]), 10)
880 self.assertEqual(ord(record[230]), 11)
882 self.assertEqual(src_addr, record[27])
883 # postNATSourceIPv4Address
884 self.assertEqual(self.nat_addr_n, record[225])
886 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
888 self.assertEqual(struct.pack("!I", 0), record[234])
889 # sourceTransportPort
890 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
891 # postNAPTSourceTransportPort
892 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
894 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
897 Verify IPFIX NAT64 session create and delete events
899 :param data: Decoded IPFIX data records
900 :param is_create: Create event if nonzero value otherwise delete event
901 :param src_addr: IPv6 source address
902 :param dst_addr: IPv4 destination address
903 :param dst_port: destination TCP port
905 self.assertEqual(1, len(data))
909 self.assertEqual(ord(record[230]), 6)
911 self.assertEqual(ord(record[230]), 7)
913 self.assertEqual(src_addr, record[27])
914 # destinationIPv6Address
915 self.assertEqual(socket.inet_pton(socket.AF_INET6,
916 self.compose_ip6(dst_addr,
920 # postNATSourceIPv4Address
921 self.assertEqual(self.nat_addr_n, record[225])
922 # postNATDestinationIPv4Address
923 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
926 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
928 self.assertEqual(struct.pack("!I", 0), record[234])
929 # sourceTransportPort
930 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
931 # postNAPTSourceTransportPort
932 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
933 # destinationTransportPort
934 self.assertEqual(struct.pack("!H", dst_port), record[11])
935 # postNAPTDestinationTransportPort
936 self.assertEqual(struct.pack("!H", dst_port), record[228])
939 class TestNAT44(MethodHolder):
940 """ NAT44 Test Cases """
944 super(TestNAT44, cls).setUpClass()
947 cls.tcp_port_in = 6303
948 cls.tcp_port_out = 6303
949 cls.udp_port_in = 6304
950 cls.udp_port_out = 6304
951 cls.icmp_id_in = 6305
952 cls.icmp_id_out = 6305
953 cls.nat_addr = '10.0.0.3'
954 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
955 cls.ipfix_src_port = 4739
956 cls.ipfix_domain_id = 1
957 cls.tcp_external_port = 80
959 cls.create_pg_interfaces(range(10))
960 cls.interfaces = list(cls.pg_interfaces[0:4])
962 for i in cls.interfaces:
967 cls.pg0.generate_remote_hosts(3)
968 cls.pg0.configure_ipv4_neighbors()
970 cls.pg1.generate_remote_hosts(1)
971 cls.pg1.configure_ipv4_neighbors()
973 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
974 cls.vapi.ip_table_add_del(10, is_add=1)
975 cls.vapi.ip_table_add_del(20, is_add=1)
977 cls.pg4._local_ip4 = "172.16.255.1"
978 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
979 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
980 cls.pg4.set_table_ip4(10)
981 cls.pg5._local_ip4 = "172.17.255.3"
982 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
983 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
984 cls.pg5.set_table_ip4(10)
985 cls.pg6._local_ip4 = "172.16.255.1"
986 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
987 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
988 cls.pg6.set_table_ip4(20)
989 for i in cls.overlapping_interfaces:
997 cls.pg9.generate_remote_hosts(2)
999 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1000 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1004 cls.pg9.resolve_arp()
1005 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1006 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1007 cls.pg9.resolve_arp()
1010 super(TestNAT44, cls).tearDownClass()
1013 def clear_nat44(self):
1015 Clear NAT44 configuration.
1017 # I found no elegant way to do this
1018 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1019 dst_address_length=32,
1020 next_hop_address=self.pg7.remote_ip4n,
1021 next_hop_sw_if_index=self.pg7.sw_if_index,
1023 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1024 dst_address_length=32,
1025 next_hop_address=self.pg8.remote_ip4n,
1026 next_hop_sw_if_index=self.pg8.sw_if_index,
1029 for intf in [self.pg7, self.pg8]:
1030 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
1032 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
1037 if self.pg7.has_ip4_config:
1038 self.pg7.unconfig_ip4()
1040 self.vapi.nat44_forwarding_enable_disable(0)
1042 interfaces = self.vapi.nat44_interface_addr_dump()
1043 for intf in interfaces:
1044 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
1045 twice_nat=intf.twice_nat,
1048 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1049 domain_id=self.ipfix_domain_id)
1050 self.ipfix_src_port = 4739
1051 self.ipfix_domain_id = 1
1053 interfaces = self.vapi.nat44_interface_dump()
1054 for intf in interfaces:
1055 if intf.is_inside > 1:
1056 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1059 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1063 interfaces = self.vapi.nat44_interface_output_feature_dump()
1064 for intf in interfaces:
1065 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1069 static_mappings = self.vapi.nat44_static_mapping_dump()
1070 for sm in static_mappings:
1071 self.vapi.nat44_add_del_static_mapping(
1072 sm.local_ip_address,
1073 sm.external_ip_address,
1074 local_port=sm.local_port,
1075 external_port=sm.external_port,
1076 addr_only=sm.addr_only,
1078 protocol=sm.protocol,
1079 twice_nat=sm.twice_nat,
1080 self_twice_nat=sm.self_twice_nat,
1081 out2in_only=sm.out2in_only,
1083 external_sw_if_index=sm.external_sw_if_index,
1086 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1087 for lb_sm in lb_static_mappings:
1088 self.vapi.nat44_add_del_lb_static_mapping(
1089 lb_sm.external_addr,
1090 lb_sm.external_port,
1092 vrf_id=lb_sm.vrf_id,
1093 twice_nat=lb_sm.twice_nat,
1094 self_twice_nat=lb_sm.self_twice_nat,
1095 out2in_only=lb_sm.out2in_only,
1101 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1102 for id_m in identity_mappings:
1103 self.vapi.nat44_add_del_identity_mapping(
1104 addr_only=id_m.addr_only,
1107 sw_if_index=id_m.sw_if_index,
1109 protocol=id_m.protocol,
1112 adresses = self.vapi.nat44_address_dump()
1113 for addr in adresses:
1114 self.vapi.nat44_add_del_address_range(addr.ip_address,
1116 twice_nat=addr.twice_nat,
1119 self.vapi.nat_set_reass()
1120 self.vapi.nat_set_reass(is_ip6=1)
1122 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1123 local_port=0, external_port=0, vrf_id=0,
1124 is_add=1, external_sw_if_index=0xFFFFFFFF,
1125 proto=0, twice_nat=0, self_twice_nat=0,
1126 out2in_only=0, tag=""):
1128 Add/delete NAT44 static mapping
1130 :param local_ip: Local IP address
1131 :param external_ip: External IP address
1132 :param local_port: Local port number (Optional)
1133 :param external_port: External port number (Optional)
1134 :param vrf_id: VRF ID (Default 0)
1135 :param is_add: 1 if add, 0 if delete (Default add)
1136 :param external_sw_if_index: External interface instead of IP address
1137 :param proto: IP protocol (Mandatory if port specified)
1138 :param twice_nat: 1 if translate external host address and port
1139 :param self_twice_nat: 1 if translate external host address and port
1140 whenever external host address equals
1141 local address of internal host
1142 :param out2in_only: if 1 rule is matching only out2in direction
1143 :param tag: Opaque string tag
1146 if local_port and external_port:
1148 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1149 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1150 self.vapi.nat44_add_del_static_mapping(
1153 external_sw_if_index,
1165 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1167 Add/delete NAT44 address
1169 :param ip: IP address
1170 :param is_add: 1 if add, 0 if delete (Default add)
1171 :param twice_nat: twice NAT address for extenal hosts
1173 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1174 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1176 twice_nat=twice_nat)
1178 def test_dynamic(self):
1179 """ NAT44 dynamic translation test """
1181 self.nat44_add_address(self.nat_addr)
1182 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1183 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1187 pkts = self.create_stream_in(self.pg0, self.pg1)
1188 self.pg0.add_stream(pkts)
1189 self.pg_enable_capture(self.pg_interfaces)
1191 capture = self.pg1.get_capture(len(pkts))
1192 self.verify_capture_out(capture)
1195 pkts = self.create_stream_out(self.pg1)
1196 self.pg1.add_stream(pkts)
1197 self.pg_enable_capture(self.pg_interfaces)
1199 capture = self.pg0.get_capture(len(pkts))
1200 self.verify_capture_in(capture, self.pg0)
1202 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1203 """ NAT44 handling of client packets with TTL=1 """
1205 self.nat44_add_address(self.nat_addr)
1206 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1210 # Client side - generate traffic
1211 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1212 self.pg0.add_stream(pkts)
1213 self.pg_enable_capture(self.pg_interfaces)
1216 # Client side - verify ICMP type 11 packets
1217 capture = self.pg0.get_capture(len(pkts))
1218 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1220 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1221 """ NAT44 handling of server packets with TTL=1 """
1223 self.nat44_add_address(self.nat_addr)
1224 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1225 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1228 # Client side - create sessions
1229 pkts = self.create_stream_in(self.pg0, self.pg1)
1230 self.pg0.add_stream(pkts)
1231 self.pg_enable_capture(self.pg_interfaces)
1234 # Server side - generate traffic
1235 capture = self.pg1.get_capture(len(pkts))
1236 self.verify_capture_out(capture)
1237 pkts = self.create_stream_out(self.pg1, ttl=1)
1238 self.pg1.add_stream(pkts)
1239 self.pg_enable_capture(self.pg_interfaces)
1242 # Server side - verify ICMP type 11 packets
1243 capture = self.pg1.get_capture(len(pkts))
1244 self.verify_capture_out_with_icmp_errors(capture,
1245 src_ip=self.pg1.local_ip4)
1247 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1248 """ NAT44 handling of error responses to client packets with TTL=2 """
1250 self.nat44_add_address(self.nat_addr)
1251 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1252 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1255 # Client side - generate traffic
1256 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1257 self.pg0.add_stream(pkts)
1258 self.pg_enable_capture(self.pg_interfaces)
1261 # Server side - simulate ICMP type 11 response
1262 capture = self.pg1.get_capture(len(pkts))
1263 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1264 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1265 ICMP(type=11) / packet[IP] for packet in capture]
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1270 # Client side - verify ICMP type 11 packets
1271 capture = self.pg0.get_capture(len(pkts))
1272 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1274 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1275 """ NAT44 handling of error responses to server packets with TTL=2 """
1277 self.nat44_add_address(self.nat_addr)
1278 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1279 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1282 # Client side - create sessions
1283 pkts = self.create_stream_in(self.pg0, self.pg1)
1284 self.pg0.add_stream(pkts)
1285 self.pg_enable_capture(self.pg_interfaces)
1288 # Server side - generate traffic
1289 capture = self.pg1.get_capture(len(pkts))
1290 self.verify_capture_out(capture)
1291 pkts = self.create_stream_out(self.pg1, ttl=2)
1292 self.pg1.add_stream(pkts)
1293 self.pg_enable_capture(self.pg_interfaces)
1296 # Client side - simulate ICMP type 11 response
1297 capture = self.pg0.get_capture(len(pkts))
1298 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1300 ICMP(type=11) / packet[IP] for packet in capture]
1301 self.pg0.add_stream(pkts)
1302 self.pg_enable_capture(self.pg_interfaces)
1305 # Server side - verify ICMP type 11 packets
1306 capture = self.pg1.get_capture(len(pkts))
1307 self.verify_capture_out_with_icmp_errors(capture)
1309 def test_ping_out_interface_from_outside(self):
1310 """ Ping NAT44 out interface from outside network """
1312 self.nat44_add_address(self.nat_addr)
1313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1318 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1319 ICMP(id=self.icmp_id_out, type='echo-request'))
1321 self.pg1.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg1.get_capture(len(pkts))
1325 self.assertEqual(1, len(capture))
1328 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1329 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1330 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1331 self.assertEqual(packet[ICMP].type, 0) # echo reply
1333 self.logger.error(ppp("Unexpected or invalid packet "
1334 "(outside network):", packet))
1337 def test_ping_internal_host_from_outside(self):
1338 """ Ping internal host from outside network """
1340 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1341 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1342 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1346 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1347 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1348 ICMP(id=self.icmp_id_out, type='echo-request'))
1349 self.pg1.add_stream(pkt)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(1)
1353 self.verify_capture_in(capture, self.pg0, packet_num=1)
1354 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1357 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1358 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1359 ICMP(id=self.icmp_id_in, type='echo-reply'))
1360 self.pg0.add_stream(pkt)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg1.get_capture(1)
1364 self.verify_capture_out(capture, same_port=True, packet_num=1)
1365 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1367 def test_forwarding(self):
1368 """ NAT44 forwarding test """
1370 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1371 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1373 self.vapi.nat44_forwarding_enable_disable(1)
1375 real_ip = self.pg0.remote_ip4n
1376 alias_ip = self.nat_addr_n
1377 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1378 external_ip=alias_ip)
1381 # in2out - static mapping match
1383 pkts = self.create_stream_out(self.pg1)
1384 self.pg1.add_stream(pkts)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 capture = self.pg0.get_capture(len(pkts))
1388 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, same_port=True)
1397 # in2out - no static mapping match
1399 host0 = self.pg0.remote_hosts[0]
1400 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1402 pkts = self.create_stream_out(self.pg1,
1403 dst_ip=self.pg0.remote_ip4,
1404 use_inside_ports=True)
1405 self.pg1.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg0.get_capture(len(pkts))
1409 self.verify_capture_in(capture, self.pg0)
1411 pkts = self.create_stream_in(self.pg0, self.pg1)
1412 self.pg0.add_stream(pkts)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg1.get_capture(len(pkts))
1416 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1419 self.pg0.remote_hosts[0] = host0
1422 self.vapi.nat44_forwarding_enable_disable(0)
1423 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1424 external_ip=alias_ip,
1427 def test_static_in(self):
1428 """ 1:1 NAT initialized from inside network """
1430 nat_ip = "10.0.0.10"
1431 self.tcp_port_out = 6303
1432 self.udp_port_out = 6304
1433 self.icmp_id_out = 6305
1435 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1436 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1437 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1439 sm = self.vapi.nat44_static_mapping_dump()
1440 self.assertEqual(len(sm), 1)
1441 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1442 self.assertEqual(sm[0].protocol, 0)
1443 self.assertEqual(sm[0].local_port, 0)
1444 self.assertEqual(sm[0].external_port, 0)
1447 pkts = self.create_stream_in(self.pg0, self.pg1)
1448 self.pg0.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(len(pkts))
1452 self.verify_capture_out(capture, nat_ip, True)
1455 pkts = self.create_stream_out(self.pg1, nat_ip)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1462 def test_static_out(self):
1463 """ 1:1 NAT initialized from outside network """
1465 nat_ip = "10.0.0.20"
1466 self.tcp_port_out = 6303
1467 self.udp_port_out = 6304
1468 self.icmp_id_out = 6305
1471 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1472 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1473 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1475 sm = self.vapi.nat44_static_mapping_dump()
1476 self.assertEqual(len(sm), 1)
1477 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1480 pkts = self.create_stream_out(self.pg1, nat_ip)
1481 self.pg1.add_stream(pkts)
1482 self.pg_enable_capture(self.pg_interfaces)
1484 capture = self.pg0.get_capture(len(pkts))
1485 self.verify_capture_in(capture, self.pg0)
1488 pkts = self.create_stream_in(self.pg0, self.pg1)
1489 self.pg0.add_stream(pkts)
1490 self.pg_enable_capture(self.pg_interfaces)
1492 capture = self.pg1.get_capture(len(pkts))
1493 self.verify_capture_out(capture, nat_ip, True)
1495 def test_static_with_port_in(self):
1496 """ 1:1 NAPT initialized from inside network """
1498 self.tcp_port_out = 3606
1499 self.udp_port_out = 3607
1500 self.icmp_id_out = 3608
1502 self.nat44_add_address(self.nat_addr)
1503 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1504 self.tcp_port_in, self.tcp_port_out,
1505 proto=IP_PROTOS.tcp)
1506 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507 self.udp_port_in, self.udp_port_out,
1508 proto=IP_PROTOS.udp)
1509 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1510 self.icmp_id_in, self.icmp_id_out,
1511 proto=IP_PROTOS.icmp)
1512 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1513 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1525 pkts = self.create_stream_out(self.pg1)
1526 self.pg1.add_stream(pkts)
1527 self.pg_enable_capture(self.pg_interfaces)
1529 capture = self.pg0.get_capture(len(pkts))
1530 self.verify_capture_in(capture, self.pg0)
1532 def test_static_with_port_out(self):
1533 """ 1:1 NAPT initialized from outside network """
1535 self.tcp_port_out = 30606
1536 self.udp_port_out = 30607
1537 self.icmp_id_out = 30608
1539 self.nat44_add_address(self.nat_addr)
1540 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1541 self.tcp_port_in, self.tcp_port_out,
1542 proto=IP_PROTOS.tcp)
1543 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1544 self.udp_port_in, self.udp_port_out,
1545 proto=IP_PROTOS.udp)
1546 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1547 self.icmp_id_in, self.icmp_id_out,
1548 proto=IP_PROTOS.icmp)
1549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1550 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1554 pkts = self.create_stream_out(self.pg1)
1555 self.pg1.add_stream(pkts)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg0.get_capture(len(pkts))
1559 self.verify_capture_in(capture, self.pg0)
1562 pkts = self.create_stream_in(self.pg0, self.pg1)
1563 self.pg0.add_stream(pkts)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg1.get_capture(len(pkts))
1567 self.verify_capture_out(capture)
1569 def test_static_with_port_out2(self):
1570 """ 1:1 NAPT symmetrical rule """
1575 self.vapi.nat44_forwarding_enable_disable(1)
1576 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1577 local_port, external_port,
1578 proto=IP_PROTOS.tcp, out2in_only=1)
1579 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1580 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1583 # from client to service
1584 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1585 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1586 TCP(sport=12345, dport=external_port))
1587 self.pg1.add_stream(p)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(1)
1596 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1597 self.assertEqual(tcp.dport, local_port)
1598 self.check_tcp_checksum(p)
1599 self.check_ip_checksum(p)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1605 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1606 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1607 ICMP(type=11) / capture[0][IP])
1608 self.pg0.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg1.get_capture(1)
1614 self.assertEqual(p[IP].src, self.nat_addr)
1616 self.assertEqual(inner.dst, self.nat_addr)
1617 self.assertEqual(inner[TCPerror].dport, external_port)
1619 self.logger.error(ppp("Unexpected or invalid packet:", p))
1622 # from service back to client
1623 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1624 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1625 TCP(sport=local_port, dport=12345))
1626 self.pg0.add_stream(p)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg1.get_capture(1)
1634 self.assertEqual(ip.src, self.nat_addr)
1635 self.assertEqual(tcp.sport, external_port)
1636 self.check_tcp_checksum(p)
1637 self.check_ip_checksum(p)
1639 self.logger.error(ppp("Unexpected or invalid packet:", p))
1643 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1644 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1645 ICMP(type=11) / capture[0][IP])
1646 self.pg1.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg0.get_capture(1)
1652 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1654 self.assertEqual(inner.src, self.pg0.remote_ip4)
1655 self.assertEqual(inner[TCPerror].sport, local_port)
1657 self.logger.error(ppp("Unexpected or invalid packet:", p))
1660 # from client to server (no translation)
1661 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1662 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1663 TCP(sport=12346, dport=local_port))
1664 self.pg1.add_stream(p)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg0.get_capture(1)
1673 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1674 self.assertEqual(tcp.dport, local_port)
1675 self.check_tcp_checksum(p)
1676 self.check_ip_checksum(p)
1678 self.logger.error(ppp("Unexpected or invalid packet:", p))
1681 # from service back to client (no translation)
1682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1684 TCP(sport=local_port, dport=12346))
1685 self.pg0.add_stream(p)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg1.get_capture(1)
1693 self.assertEqual(ip.src, self.pg0.remote_ip4)
1694 self.assertEqual(tcp.sport, local_port)
1695 self.check_tcp_checksum(p)
1696 self.check_ip_checksum(p)
1698 self.logger.error(ppp("Unexpected or invalid packet:", p))
1701 def test_static_vrf_aware(self):
1702 """ 1:1 NAT VRF awareness """
1704 nat_ip1 = "10.0.0.30"
1705 nat_ip2 = "10.0.0.40"
1706 self.tcp_port_out = 6303
1707 self.udp_port_out = 6304
1708 self.icmp_id_out = 6305
1710 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1712 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1714 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1716 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1717 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1719 # inside interface VRF match NAT44 static mapping VRF
1720 pkts = self.create_stream_in(self.pg4, self.pg3)
1721 self.pg4.add_stream(pkts)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg3.get_capture(len(pkts))
1725 self.verify_capture_out(capture, nat_ip1, True)
1727 # inside interface VRF don't match NAT44 static mapping VRF (packets
1729 pkts = self.create_stream_in(self.pg0, self.pg3)
1730 self.pg0.add_stream(pkts)
1731 self.pg_enable_capture(self.pg_interfaces)
1733 self.pg3.assert_nothing_captured()
1735 def test_dynamic_to_static(self):
1736 """ Switch from dynamic translation to 1:1NAT """
1737 nat_ip = "10.0.0.10"
1738 self.tcp_port_out = 6303
1739 self.udp_port_out = 6304
1740 self.icmp_id_out = 6305
1742 self.nat44_add_address(self.nat_addr)
1743 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1744 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1748 pkts = self.create_stream_in(self.pg0, self.pg1)
1749 self.pg0.add_stream(pkts)
1750 self.pg_enable_capture(self.pg_interfaces)
1752 capture = self.pg1.get_capture(len(pkts))
1753 self.verify_capture_out(capture)
1756 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1757 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1758 self.assertEqual(len(sessions), 0)
1759 pkts = self.create_stream_in(self.pg0, self.pg1)
1760 self.pg0.add_stream(pkts)
1761 self.pg_enable_capture(self.pg_interfaces)
1763 capture = self.pg1.get_capture(len(pkts))
1764 self.verify_capture_out(capture, nat_ip, True)
1766 def test_identity_nat(self):
1767 """ Identity NAT """
1769 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1774 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1775 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1776 TCP(sport=12345, dport=56789))
1777 self.pg1.add_stream(p)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg0.get_capture(1)
1785 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1786 self.assertEqual(ip.src, self.pg1.remote_ip4)
1787 self.assertEqual(tcp.dport, 56789)
1788 self.assertEqual(tcp.sport, 12345)
1789 self.check_tcp_checksum(p)
1790 self.check_ip_checksum(p)
1792 self.logger.error(ppp("Unexpected or invalid packet:", p))
1795 def test_static_lb(self):
1796 """ NAT44 local service load balancing """
1797 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1800 server1 = self.pg0.remote_hosts[0]
1801 server2 = self.pg0.remote_hosts[1]
1803 locals = [{'addr': server1.ip4n,
1806 {'addr': server2.ip4n,
1810 self.nat44_add_address(self.nat_addr)
1811 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1814 local_num=len(locals),
1816 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1817 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1820 # from client to service
1821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1823 TCP(sport=12345, dport=external_port))
1824 self.pg1.add_stream(p)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg0.get_capture(1)
1833 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1834 if ip.dst == server1.ip4:
1838 self.assertEqual(tcp.dport, local_port)
1839 self.check_tcp_checksum(p)
1840 self.check_ip_checksum(p)
1842 self.logger.error(ppp("Unexpected or invalid packet:", p))
1845 # from service back to client
1846 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1847 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1848 TCP(sport=local_port, dport=12345))
1849 self.pg0.add_stream(p)
1850 self.pg_enable_capture(self.pg_interfaces)
1852 capture = self.pg1.get_capture(1)
1857 self.assertEqual(ip.src, self.nat_addr)
1858 self.assertEqual(tcp.sport, external_port)
1859 self.check_tcp_checksum(p)
1860 self.check_ip_checksum(p)
1862 self.logger.error(ppp("Unexpected or invalid packet:", p))
1865 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1866 def test_static_lb_multi_clients(self):
1867 """ NAT44 local service load balancing - multiple clients"""
1869 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1872 server1 = self.pg0.remote_hosts[0]
1873 server2 = self.pg0.remote_hosts[1]
1875 locals = [{'addr': server1.ip4n,
1878 {'addr': server2.ip4n,
1882 self.nat44_add_address(self.nat_addr)
1883 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1886 local_num=len(locals),
1888 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1889 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1894 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1896 for client in clients:
1897 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1898 IP(src=client, dst=self.nat_addr) /
1899 TCP(sport=12345, dport=external_port))
1901 self.pg1.add_stream(pkts)
1902 self.pg_enable_capture(self.pg_interfaces)
1904 capture = self.pg0.get_capture(len(pkts))
1906 if p[IP].dst == server1.ip4:
1910 self.assertTrue(server1_n > server2_n)
1912 def test_static_lb_2(self):
1913 """ NAT44 local service load balancing (asymmetrical rule) """
1914 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1917 server1 = self.pg0.remote_hosts[0]
1918 server2 = self.pg0.remote_hosts[1]
1920 locals = [{'addr': server1.ip4n,
1923 {'addr': server2.ip4n,
1927 self.vapi.nat44_forwarding_enable_disable(1)
1928 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1932 local_num=len(locals),
1934 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1935 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1938 # from client to service
1939 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1940 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1941 TCP(sport=12345, dport=external_port))
1942 self.pg1.add_stream(p)
1943 self.pg_enable_capture(self.pg_interfaces)
1945 capture = self.pg0.get_capture(1)
1951 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1952 if ip.dst == server1.ip4:
1956 self.assertEqual(tcp.dport, local_port)
1957 self.check_tcp_checksum(p)
1958 self.check_ip_checksum(p)
1960 self.logger.error(ppp("Unexpected or invalid packet:", p))
1963 # from service back to client
1964 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1965 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1966 TCP(sport=local_port, dport=12345))
1967 self.pg0.add_stream(p)
1968 self.pg_enable_capture(self.pg_interfaces)
1970 capture = self.pg1.get_capture(1)
1975 self.assertEqual(ip.src, self.nat_addr)
1976 self.assertEqual(tcp.sport, external_port)
1977 self.check_tcp_checksum(p)
1978 self.check_ip_checksum(p)
1980 self.logger.error(ppp("Unexpected or invalid packet:", p))
1983 # from client to server (no translation)
1984 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1985 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1986 TCP(sport=12346, dport=local_port))
1987 self.pg1.add_stream(p)
1988 self.pg_enable_capture(self.pg_interfaces)
1990 capture = self.pg0.get_capture(1)
1996 self.assertEqual(ip.dst, server1.ip4)
1997 self.assertEqual(tcp.dport, local_port)
1998 self.check_tcp_checksum(p)
1999 self.check_ip_checksum(p)
2001 self.logger.error(ppp("Unexpected or invalid packet:", p))
2004 # from service back to client (no translation)
2005 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
2006 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
2007 TCP(sport=local_port, dport=12346))
2008 self.pg0.add_stream(p)
2009 self.pg_enable_capture(self.pg_interfaces)
2011 capture = self.pg1.get_capture(1)
2016 self.assertEqual(ip.src, server1.ip4)
2017 self.assertEqual(tcp.sport, local_port)
2018 self.check_tcp_checksum(p)
2019 self.check_ip_checksum(p)
2021 self.logger.error(ppp("Unexpected or invalid packet:", p))
2024 def test_multiple_inside_interfaces(self):
2025 """ NAT44 multiple non-overlapping address space inside interfaces """
2027 self.nat44_add_address(self.nat_addr)
2028 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2029 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2030 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2033 # between two NAT44 inside interfaces (no translation)
2034 pkts = self.create_stream_in(self.pg0, self.pg1)
2035 self.pg0.add_stream(pkts)
2036 self.pg_enable_capture(self.pg_interfaces)
2038 capture = self.pg1.get_capture(len(pkts))
2039 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
2041 # from NAT44 inside to interface without NAT44 feature (no translation)
2042 pkts = self.create_stream_in(self.pg0, self.pg2)
2043 self.pg0.add_stream(pkts)
2044 self.pg_enable_capture(self.pg_interfaces)
2046 capture = self.pg2.get_capture(len(pkts))
2047 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2049 # in2out 1st interface
2050 pkts = self.create_stream_in(self.pg0, self.pg3)
2051 self.pg0.add_stream(pkts)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg3.get_capture(len(pkts))
2055 self.verify_capture_out(capture)
2057 # out2in 1st interface
2058 pkts = self.create_stream_out(self.pg3)
2059 self.pg3.add_stream(pkts)
2060 self.pg_enable_capture(self.pg_interfaces)
2062 capture = self.pg0.get_capture(len(pkts))
2063 self.verify_capture_in(capture, self.pg0)
2065 # in2out 2nd interface
2066 pkts = self.create_stream_in(self.pg1, self.pg3)
2067 self.pg1.add_stream(pkts)
2068 self.pg_enable_capture(self.pg_interfaces)
2070 capture = self.pg3.get_capture(len(pkts))
2071 self.verify_capture_out(capture)
2073 # out2in 2nd interface
2074 pkts = self.create_stream_out(self.pg3)
2075 self.pg3.add_stream(pkts)
2076 self.pg_enable_capture(self.pg_interfaces)
2078 capture = self.pg1.get_capture(len(pkts))
2079 self.verify_capture_in(capture, self.pg1)
2081 def test_inside_overlapping_interfaces(self):
2082 """ NAT44 multiple inside interfaces with overlapping address space """
2084 static_nat_ip = "10.0.0.10"
2085 self.nat44_add_address(self.nat_addr)
2086 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2088 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2089 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2090 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2091 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2094 # between NAT44 inside interfaces with same VRF (no translation)
2095 pkts = self.create_stream_in(self.pg4, self.pg5)
2096 self.pg4.add_stream(pkts)
2097 self.pg_enable_capture(self.pg_interfaces)
2099 capture = self.pg5.get_capture(len(pkts))
2100 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2102 # between NAT44 inside interfaces with different VRF (hairpinning)
2103 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2104 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2105 TCP(sport=1234, dport=5678))
2106 self.pg4.add_stream(p)
2107 self.pg_enable_capture(self.pg_interfaces)
2109 capture = self.pg6.get_capture(1)
2114 self.assertEqual(ip.src, self.nat_addr)
2115 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2116 self.assertNotEqual(tcp.sport, 1234)
2117 self.assertEqual(tcp.dport, 5678)
2119 self.logger.error(ppp("Unexpected or invalid packet:", p))
2122 # in2out 1st interface
2123 pkts = self.create_stream_in(self.pg4, self.pg3)
2124 self.pg4.add_stream(pkts)
2125 self.pg_enable_capture(self.pg_interfaces)
2127 capture = self.pg3.get_capture(len(pkts))
2128 self.verify_capture_out(capture)
2130 # out2in 1st interface
2131 pkts = self.create_stream_out(self.pg3)
2132 self.pg3.add_stream(pkts)
2133 self.pg_enable_capture(self.pg_interfaces)
2135 capture = self.pg4.get_capture(len(pkts))
2136 self.verify_capture_in(capture, self.pg4)
2138 # in2out 2nd interface
2139 pkts = self.create_stream_in(self.pg5, self.pg3)
2140 self.pg5.add_stream(pkts)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 capture = self.pg3.get_capture(len(pkts))
2144 self.verify_capture_out(capture)
2146 # out2in 2nd interface
2147 pkts = self.create_stream_out(self.pg3)
2148 self.pg3.add_stream(pkts)
2149 self.pg_enable_capture(self.pg_interfaces)
2151 capture = self.pg5.get_capture(len(pkts))
2152 self.verify_capture_in(capture, self.pg5)
2155 addresses = self.vapi.nat44_address_dump()
2156 self.assertEqual(len(addresses), 1)
2157 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2158 self.assertEqual(len(sessions), 3)
2159 for session in sessions:
2160 self.assertFalse(session.is_static)
2161 self.assertEqual(session.inside_ip_address[0:4],
2162 self.pg5.remote_ip4n)
2163 self.assertEqual(session.outside_ip_address,
2164 addresses[0].ip_address)
2165 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2166 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2167 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2168 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2169 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2170 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2171 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2172 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2173 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2175 # in2out 3rd interface
2176 pkts = self.create_stream_in(self.pg6, self.pg3)
2177 self.pg6.add_stream(pkts)
2178 self.pg_enable_capture(self.pg_interfaces)
2180 capture = self.pg3.get_capture(len(pkts))
2181 self.verify_capture_out(capture, static_nat_ip, True)
2183 # out2in 3rd interface
2184 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2185 self.pg3.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg6.get_capture(len(pkts))
2189 self.verify_capture_in(capture, self.pg6)
2191 # general user and session dump verifications
2192 users = self.vapi.nat44_user_dump()
2193 self.assertTrue(len(users) >= 3)
2194 addresses = self.vapi.nat44_address_dump()
2195 self.assertEqual(len(addresses), 1)
2197 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2199 for session in sessions:
2200 self.assertEqual(user.ip_address, session.inside_ip_address)
2201 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2202 self.assertTrue(session.protocol in
2203 [IP_PROTOS.tcp, IP_PROTOS.udp,
2207 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2208 self.assertTrue(len(sessions) >= 4)
2209 for session in sessions:
2210 self.assertFalse(session.is_static)
2211 self.assertEqual(session.inside_ip_address[0:4],
2212 self.pg4.remote_ip4n)
2213 self.assertEqual(session.outside_ip_address,
2214 addresses[0].ip_address)
2217 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2218 self.assertTrue(len(sessions) >= 3)
2219 for session in sessions:
2220 self.assertTrue(session.is_static)
2221 self.assertEqual(session.inside_ip_address[0:4],
2222 self.pg6.remote_ip4n)
2223 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2224 map(int, static_nat_ip.split('.')))
2225 self.assertTrue(session.inside_port in
2226 [self.tcp_port_in, self.udp_port_in,
2229 def test_hairpinning(self):
2230 """ NAT44 hairpinning - 1:1 NAPT """
2232 host = self.pg0.remote_hosts[0]
2233 server = self.pg0.remote_hosts[1]
2236 server_in_port = 5678
2237 server_out_port = 8765
2239 self.nat44_add_address(self.nat_addr)
2240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2243 # add static mapping for server
2244 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2245 server_in_port, server_out_port,
2246 proto=IP_PROTOS.tcp)
2248 # send packet from host to server
2249 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2250 IP(src=host.ip4, dst=self.nat_addr) /
2251 TCP(sport=host_in_port, dport=server_out_port))
2252 self.pg0.add_stream(p)
2253 self.pg_enable_capture(self.pg_interfaces)
2255 capture = self.pg0.get_capture(1)
2260 self.assertEqual(ip.src, self.nat_addr)
2261 self.assertEqual(ip.dst, server.ip4)
2262 self.assertNotEqual(tcp.sport, host_in_port)
2263 self.assertEqual(tcp.dport, server_in_port)
2264 self.check_tcp_checksum(p)
2265 host_out_port = tcp.sport
2267 self.logger.error(ppp("Unexpected or invalid packet:", p))
2270 # send reply from server to host
2271 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2272 IP(src=server.ip4, dst=self.nat_addr) /
2273 TCP(sport=server_in_port, dport=host_out_port))
2274 self.pg0.add_stream(p)
2275 self.pg_enable_capture(self.pg_interfaces)
2277 capture = self.pg0.get_capture(1)
2282 self.assertEqual(ip.src, self.nat_addr)
2283 self.assertEqual(ip.dst, host.ip4)
2284 self.assertEqual(tcp.sport, server_out_port)
2285 self.assertEqual(tcp.dport, host_in_port)
2286 self.check_tcp_checksum(p)
2288 self.logger.error(ppp("Unexpected or invalid packet:", p))
2291 def test_hairpinning2(self):
2292 """ NAT44 hairpinning - 1:1 NAT"""
2294 server1_nat_ip = "10.0.0.10"
2295 server2_nat_ip = "10.0.0.11"
2296 host = self.pg0.remote_hosts[0]
2297 server1 = self.pg0.remote_hosts[1]
2298 server2 = self.pg0.remote_hosts[2]
2299 server_tcp_port = 22
2300 server_udp_port = 20
2302 self.nat44_add_address(self.nat_addr)
2303 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2304 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2307 # add static mapping for servers
2308 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2309 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2313 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2314 IP(src=host.ip4, dst=server1_nat_ip) /
2315 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2317 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2318 IP(src=host.ip4, dst=server1_nat_ip) /
2319 UDP(sport=self.udp_port_in, dport=server_udp_port))
2321 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2322 IP(src=host.ip4, dst=server1_nat_ip) /
2323 ICMP(id=self.icmp_id_in, type='echo-request'))
2325 self.pg0.add_stream(pkts)
2326 self.pg_enable_capture(self.pg_interfaces)
2328 capture = self.pg0.get_capture(len(pkts))
2329 for packet in capture:
2331 self.assertEqual(packet[IP].src, self.nat_addr)
2332 self.assertEqual(packet[IP].dst, server1.ip4)
2333 if packet.haslayer(TCP):
2334 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2335 self.assertEqual(packet[TCP].dport, server_tcp_port)
2336 self.tcp_port_out = packet[TCP].sport
2337 self.check_tcp_checksum(packet)
2338 elif packet.haslayer(UDP):
2339 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2340 self.assertEqual(packet[UDP].dport, server_udp_port)
2341 self.udp_port_out = packet[UDP].sport
2343 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2344 self.icmp_id_out = packet[ICMP].id
2346 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2351 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2352 IP(src=server1.ip4, dst=self.nat_addr) /
2353 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2355 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2356 IP(src=server1.ip4, dst=self.nat_addr) /
2357 UDP(sport=server_udp_port, dport=self.udp_port_out))
2359 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2360 IP(src=server1.ip4, dst=self.nat_addr) /
2361 ICMP(id=self.icmp_id_out, type='echo-reply'))
2363 self.pg0.add_stream(pkts)
2364 self.pg_enable_capture(self.pg_interfaces)
2366 capture = self.pg0.get_capture(len(pkts))
2367 for packet in capture:
2369 self.assertEqual(packet[IP].src, server1_nat_ip)
2370 self.assertEqual(packet[IP].dst, host.ip4)
2371 if packet.haslayer(TCP):
2372 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2373 self.assertEqual(packet[TCP].sport, server_tcp_port)
2374 self.check_tcp_checksum(packet)
2375 elif packet.haslayer(UDP):
2376 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2377 self.assertEqual(packet[UDP].sport, server_udp_port)
2379 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2381 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2384 # server2 to server1
2386 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2387 IP(src=server2.ip4, dst=server1_nat_ip) /
2388 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=server2.ip4, dst=server1_nat_ip) /
2392 UDP(sport=self.udp_port_in, dport=server_udp_port))
2394 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2395 IP(src=server2.ip4, dst=server1_nat_ip) /
2396 ICMP(id=self.icmp_id_in, type='echo-request'))
2398 self.pg0.add_stream(pkts)
2399 self.pg_enable_capture(self.pg_interfaces)
2401 capture = self.pg0.get_capture(len(pkts))
2402 for packet in capture:
2404 self.assertEqual(packet[IP].src, server2_nat_ip)
2405 self.assertEqual(packet[IP].dst, server1.ip4)
2406 if packet.haslayer(TCP):
2407 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2408 self.assertEqual(packet[TCP].dport, server_tcp_port)
2409 self.tcp_port_out = packet[TCP].sport
2410 self.check_tcp_checksum(packet)
2411 elif packet.haslayer(UDP):
2412 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2413 self.assertEqual(packet[UDP].dport, server_udp_port)
2414 self.udp_port_out = packet[UDP].sport
2416 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2417 self.icmp_id_out = packet[ICMP].id
2419 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2422 # server1 to server2
2424 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2425 IP(src=server1.ip4, dst=server2_nat_ip) /
2426 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2428 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2429 IP(src=server1.ip4, dst=server2_nat_ip) /
2430 UDP(sport=server_udp_port, dport=self.udp_port_out))
2432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2433 IP(src=server1.ip4, dst=server2_nat_ip) /
2434 ICMP(id=self.icmp_id_out, type='echo-reply'))
2436 self.pg0.add_stream(pkts)
2437 self.pg_enable_capture(self.pg_interfaces)
2439 capture = self.pg0.get_capture(len(pkts))
2440 for packet in capture:
2442 self.assertEqual(packet[IP].src, server1_nat_ip)
2443 self.assertEqual(packet[IP].dst, server2.ip4)
2444 if packet.haslayer(TCP):
2445 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2446 self.assertEqual(packet[TCP].sport, server_tcp_port)
2447 self.check_tcp_checksum(packet)
2448 elif packet.haslayer(UDP):
2449 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2450 self.assertEqual(packet[UDP].sport, server_udp_port)
2452 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2454 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2457 def test_max_translations_per_user(self):
2458 """ MAX translations per user - recycle the least recently used """
2460 self.nat44_add_address(self.nat_addr)
2461 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2462 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2465 # get maximum number of translations per user
2466 nat44_config = self.vapi.nat_show_config()
2468 # send more than maximum number of translations per user packets
2469 pkts_num = nat44_config.max_translations_per_user + 5
2471 for port in range(0, pkts_num):
2472 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2473 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2474 TCP(sport=1025 + port))
2476 self.pg0.add_stream(pkts)
2477 self.pg_enable_capture(self.pg_interfaces)
2480 # verify number of translated packet
2481 self.pg1.get_capture(pkts_num)
2483 def test_interface_addr(self):
2484 """ Acquire NAT44 addresses from interface """
2485 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2487 # no address in NAT pool
2488 adresses = self.vapi.nat44_address_dump()
2489 self.assertEqual(0, len(adresses))
2491 # configure interface address and check NAT address pool
2492 self.pg7.config_ip4()
2493 adresses = self.vapi.nat44_address_dump()
2494 self.assertEqual(1, len(adresses))
2495 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2497 # remove interface address and check NAT address pool
2498 self.pg7.unconfig_ip4()
2499 adresses = self.vapi.nat44_address_dump()
2500 self.assertEqual(0, len(adresses))
2502 def test_interface_addr_static_mapping(self):
2503 """ Static mapping with addresses from interface """
2506 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2507 self.nat44_add_static_mapping(
2509 external_sw_if_index=self.pg7.sw_if_index,
2512 # static mappings with external interface
2513 static_mappings = self.vapi.nat44_static_mapping_dump()
2514 self.assertEqual(1, len(static_mappings))
2515 self.assertEqual(self.pg7.sw_if_index,
2516 static_mappings[0].external_sw_if_index)
2517 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2519 # configure interface address and check static mappings
2520 self.pg7.config_ip4()
2521 static_mappings = self.vapi.nat44_static_mapping_dump()
2522 self.assertEqual(2, len(static_mappings))
2524 for sm in static_mappings:
2525 if sm.external_sw_if_index == 0xFFFFFFFF:
2526 self.assertEqual(sm.external_ip_address[0:4],
2527 self.pg7.local_ip4n)
2528 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2530 self.assertTrue(resolved)
2532 # remove interface address and check static mappings
2533 self.pg7.unconfig_ip4()
2534 static_mappings = self.vapi.nat44_static_mapping_dump()
2535 self.assertEqual(1, len(static_mappings))
2536 self.assertEqual(self.pg7.sw_if_index,
2537 static_mappings[0].external_sw_if_index)
2538 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2540 # configure interface address again and check static mappings
2541 self.pg7.config_ip4()
2542 static_mappings = self.vapi.nat44_static_mapping_dump()
2543 self.assertEqual(2, len(static_mappings))
2545 for sm in static_mappings:
2546 if sm.external_sw_if_index == 0xFFFFFFFF:
2547 self.assertEqual(sm.external_ip_address[0:4],
2548 self.pg7.local_ip4n)
2549 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2551 self.assertTrue(resolved)
2553 # remove static mapping
2554 self.nat44_add_static_mapping(
2556 external_sw_if_index=self.pg7.sw_if_index,
2559 static_mappings = self.vapi.nat44_static_mapping_dump()
2560 self.assertEqual(0, len(static_mappings))
2562 def test_interface_addr_identity_nat(self):
2563 """ Identity NAT with addresses from interface """
2566 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2567 self.vapi.nat44_add_del_identity_mapping(
2568 sw_if_index=self.pg7.sw_if_index,
2570 protocol=IP_PROTOS.tcp,
2573 # identity mappings with external interface
2574 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2575 self.assertEqual(1, len(identity_mappings))
2576 self.assertEqual(self.pg7.sw_if_index,
2577 identity_mappings[0].sw_if_index)
2579 # configure interface address and check identity mappings
2580 self.pg7.config_ip4()
2581 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2583 self.assertEqual(2, len(identity_mappings))
2584 for sm in identity_mappings:
2585 if sm.sw_if_index == 0xFFFFFFFF:
2586 self.assertEqual(identity_mappings[0].ip_address,
2587 self.pg7.local_ip4n)
2588 self.assertEqual(port, identity_mappings[0].port)
2589 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2591 self.assertTrue(resolved)
2593 # remove interface address and check identity mappings
2594 self.pg7.unconfig_ip4()
2595 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2596 self.assertEqual(1, len(identity_mappings))
2597 self.assertEqual(self.pg7.sw_if_index,
2598 identity_mappings[0].sw_if_index)
2600 def test_ipfix_nat44_sess(self):
2601 """ IPFIX logging NAT44 session created/delted """
2602 self.ipfix_domain_id = 10
2603 self.ipfix_src_port = 20202
2604 colector_port = 30303
2605 bind_layers(UDP, IPFIX, dport=30303)
2606 self.nat44_add_address(self.nat_addr)
2607 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2608 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2610 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2611 src_address=self.pg3.local_ip4n,
2613 template_interval=10,
2614 collector_port=colector_port)
2615 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2616 src_port=self.ipfix_src_port)
2618 pkts = self.create_stream_in(self.pg0, self.pg1)
2619 self.pg0.add_stream(pkts)
2620 self.pg_enable_capture(self.pg_interfaces)
2622 capture = self.pg1.get_capture(len(pkts))
2623 self.verify_capture_out(capture)
2624 self.nat44_add_address(self.nat_addr, is_add=0)
2625 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2626 capture = self.pg3.get_capture(9)
2627 ipfix = IPFIXDecoder()
2628 # first load template
2630 self.assertTrue(p.haslayer(IPFIX))
2631 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2632 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2633 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2634 self.assertEqual(p[UDP].dport, colector_port)
2635 self.assertEqual(p[IPFIX].observationDomainID,
2636 self.ipfix_domain_id)
2637 if p.haslayer(Template):
2638 ipfix.add_template(p.getlayer(Template))
2639 # verify events in data set
2641 if p.haslayer(Data):
2642 data = ipfix.decode_data_set(p.getlayer(Set))
2643 self.verify_ipfix_nat44_ses(data)
2645 def test_ipfix_addr_exhausted(self):
2646 """ IPFIX logging NAT addresses exhausted """
2647 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2648 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2650 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2651 src_address=self.pg3.local_ip4n,
2653 template_interval=10)
2654 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2655 src_port=self.ipfix_src_port)
2657 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2658 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2660 self.pg0.add_stream(p)
2661 self.pg_enable_capture(self.pg_interfaces)
2663 capture = self.pg1.get_capture(0)
2664 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2665 capture = self.pg3.get_capture(9)
2666 ipfix = IPFIXDecoder()
2667 # first load template
2669 self.assertTrue(p.haslayer(IPFIX))
2670 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2671 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2672 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2673 self.assertEqual(p[UDP].dport, 4739)
2674 self.assertEqual(p[IPFIX].observationDomainID,
2675 self.ipfix_domain_id)
2676 if p.haslayer(Template):
2677 ipfix.add_template(p.getlayer(Template))
2678 # verify events in data set
2680 if p.haslayer(Data):
2681 data = ipfix.decode_data_set(p.getlayer(Set))
2682 self.verify_ipfix_addr_exhausted(data)
2684 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2685 def test_ipfix_max_sessions(self):
2686 """ IPFIX logging maximum session entries exceeded """
2687 self.nat44_add_address(self.nat_addr)
2688 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2689 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2692 nat44_config = self.vapi.nat_show_config()
2693 max_sessions = 10 * nat44_config.translation_buckets
2696 for i in range(0, max_sessions):
2697 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2698 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2699 IP(src=src, dst=self.pg1.remote_ip4) /
2702 self.pg0.add_stream(pkts)
2703 self.pg_enable_capture(self.pg_interfaces)
2706 self.pg1.get_capture(max_sessions)
2707 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2708 src_address=self.pg3.local_ip4n,
2710 template_interval=10)
2711 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2712 src_port=self.ipfix_src_port)
2714 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2715 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2717 self.pg0.add_stream(p)
2718 self.pg_enable_capture(self.pg_interfaces)
2720 self.pg1.get_capture(0)
2721 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2722 capture = self.pg3.get_capture(9)
2723 ipfix = IPFIXDecoder()
2724 # first load template
2726 self.assertTrue(p.haslayer(IPFIX))
2727 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2728 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2729 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2730 self.assertEqual(p[UDP].dport, 4739)
2731 self.assertEqual(p[IPFIX].observationDomainID,
2732 self.ipfix_domain_id)
2733 if p.haslayer(Template):
2734 ipfix.add_template(p.getlayer(Template))
2735 # verify events in data set
2737 if p.haslayer(Data):
2738 data = ipfix.decode_data_set(p.getlayer(Set))
2739 self.verify_ipfix_max_sessions(data, max_sessions)
2741 def test_pool_addr_fib(self):
2742 """ NAT44 add pool addresses to FIB """
2743 static_addr = '10.0.0.10'
2744 self.nat44_add_address(self.nat_addr)
2745 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2746 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2748 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2751 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2752 ARP(op=ARP.who_has, pdst=self.nat_addr,
2753 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2754 self.pg1.add_stream(p)
2755 self.pg_enable_capture(self.pg_interfaces)
2757 capture = self.pg1.get_capture(1)
2758 self.assertTrue(capture[0].haslayer(ARP))
2759 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2762 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2763 ARP(op=ARP.who_has, pdst=static_addr,
2764 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2765 self.pg1.add_stream(p)
2766 self.pg_enable_capture(self.pg_interfaces)
2768 capture = self.pg1.get_capture(1)
2769 self.assertTrue(capture[0].haslayer(ARP))
2770 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2772 # send ARP to non-NAT44 interface
2773 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2774 ARP(op=ARP.who_has, pdst=self.nat_addr,
2775 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2776 self.pg2.add_stream(p)
2777 self.pg_enable_capture(self.pg_interfaces)
2779 capture = self.pg1.get_capture(0)
2781 # remove addresses and verify
2782 self.nat44_add_address(self.nat_addr, is_add=0)
2783 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2786 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2787 ARP(op=ARP.who_has, pdst=self.nat_addr,
2788 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2789 self.pg1.add_stream(p)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg1.get_capture(0)
2794 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2795 ARP(op=ARP.who_has, pdst=static_addr,
2796 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2797 self.pg1.add_stream(p)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg1.get_capture(0)
2802 def test_vrf_mode(self):
2803 """ NAT44 tenant VRF aware address pool mode """
2807 nat_ip1 = "10.0.0.10"
2808 nat_ip2 = "10.0.0.11"
2810 self.pg0.unconfig_ip4()
2811 self.pg1.unconfig_ip4()
2812 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2813 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2814 self.pg0.set_table_ip4(vrf_id1)
2815 self.pg1.set_table_ip4(vrf_id2)
2816 self.pg0.config_ip4()
2817 self.pg1.config_ip4()
2819 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2820 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2822 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2823 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2827 pkts = self.create_stream_in(self.pg0, self.pg2)
2828 self.pg0.add_stream(pkts)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 capture = self.pg2.get_capture(len(pkts))
2832 self.verify_capture_out(capture, nat_ip1)
2835 pkts = self.create_stream_in(self.pg1, self.pg2)
2836 self.pg1.add_stream(pkts)
2837 self.pg_enable_capture(self.pg_interfaces)
2839 capture = self.pg2.get_capture(len(pkts))
2840 self.verify_capture_out(capture, nat_ip2)
2842 self.pg0.unconfig_ip4()
2843 self.pg1.unconfig_ip4()
2844 self.pg0.set_table_ip4(0)
2845 self.pg1.set_table_ip4(0)
2846 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2847 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2849 def test_vrf_feature_independent(self):
2850 """ NAT44 tenant VRF independent address pool mode """
2852 nat_ip1 = "10.0.0.10"
2853 nat_ip2 = "10.0.0.11"
2855 self.nat44_add_address(nat_ip1)
2856 self.nat44_add_address(nat_ip2, vrf_id=99)
2857 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2858 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2859 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2863 pkts = self.create_stream_in(self.pg0, self.pg2)
2864 self.pg0.add_stream(pkts)
2865 self.pg_enable_capture(self.pg_interfaces)
2867 capture = self.pg2.get_capture(len(pkts))
2868 self.verify_capture_out(capture, nat_ip1)
2871 pkts = self.create_stream_in(self.pg1, self.pg2)
2872 self.pg1.add_stream(pkts)
2873 self.pg_enable_capture(self.pg_interfaces)
2875 capture = self.pg2.get_capture(len(pkts))
2876 self.verify_capture_out(capture, nat_ip1)
2878 def test_dynamic_ipless_interfaces(self):
2879 """ NAT44 interfaces without configured IP address """
2881 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2882 mactobinary(self.pg7.remote_mac),
2883 self.pg7.remote_ip4n,
2885 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2886 mactobinary(self.pg8.remote_mac),
2887 self.pg8.remote_ip4n,
2890 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2891 dst_address_length=32,
2892 next_hop_address=self.pg7.remote_ip4n,
2893 next_hop_sw_if_index=self.pg7.sw_if_index)
2894 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2895 dst_address_length=32,
2896 next_hop_address=self.pg8.remote_ip4n,
2897 next_hop_sw_if_index=self.pg8.sw_if_index)
2899 self.nat44_add_address(self.nat_addr)
2900 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2901 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2905 pkts = self.create_stream_in(self.pg7, self.pg8)
2906 self.pg7.add_stream(pkts)
2907 self.pg_enable_capture(self.pg_interfaces)
2909 capture = self.pg8.get_capture(len(pkts))
2910 self.verify_capture_out(capture)
2913 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2914 self.pg8.add_stream(pkts)
2915 self.pg_enable_capture(self.pg_interfaces)
2917 capture = self.pg7.get_capture(len(pkts))
2918 self.verify_capture_in(capture, self.pg7)
2920 def test_static_ipless_interfaces(self):
2921 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2923 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2924 mactobinary(self.pg7.remote_mac),
2925 self.pg7.remote_ip4n,
2927 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2928 mactobinary(self.pg8.remote_mac),
2929 self.pg8.remote_ip4n,
2932 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2933 dst_address_length=32,
2934 next_hop_address=self.pg7.remote_ip4n,
2935 next_hop_sw_if_index=self.pg7.sw_if_index)
2936 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2937 dst_address_length=32,
2938 next_hop_address=self.pg8.remote_ip4n,
2939 next_hop_sw_if_index=self.pg8.sw_if_index)
2941 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2942 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2943 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2947 pkts = self.create_stream_out(self.pg8)
2948 self.pg8.add_stream(pkts)
2949 self.pg_enable_capture(self.pg_interfaces)
2951 capture = self.pg7.get_capture(len(pkts))
2952 self.verify_capture_in(capture, self.pg7)
2955 pkts = self.create_stream_in(self.pg7, self.pg8)
2956 self.pg7.add_stream(pkts)
2957 self.pg_enable_capture(self.pg_interfaces)
2959 capture = self.pg8.get_capture(len(pkts))
2960 self.verify_capture_out(capture, self.nat_addr, True)
2962 def test_static_with_port_ipless_interfaces(self):
2963 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2965 self.tcp_port_out = 30606
2966 self.udp_port_out = 30607
2967 self.icmp_id_out = 30608
2969 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2970 mactobinary(self.pg7.remote_mac),
2971 self.pg7.remote_ip4n,
2973 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2974 mactobinary(self.pg8.remote_mac),
2975 self.pg8.remote_ip4n,
2978 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2979 dst_address_length=32,
2980 next_hop_address=self.pg7.remote_ip4n,
2981 next_hop_sw_if_index=self.pg7.sw_if_index)
2982 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2983 dst_address_length=32,
2984 next_hop_address=self.pg8.remote_ip4n,
2985 next_hop_sw_if_index=self.pg8.sw_if_index)
2987 self.nat44_add_address(self.nat_addr)
2988 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2989 self.tcp_port_in, self.tcp_port_out,
2990 proto=IP_PROTOS.tcp)
2991 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2992 self.udp_port_in, self.udp_port_out,
2993 proto=IP_PROTOS.udp)
2994 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2995 self.icmp_id_in, self.icmp_id_out,
2996 proto=IP_PROTOS.icmp)
2997 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2998 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
3002 pkts = self.create_stream_out(self.pg8)
3003 self.pg8.add_stream(pkts)
3004 self.pg_enable_capture(self.pg_interfaces)
3006 capture = self.pg7.get_capture(len(pkts))
3007 self.verify_capture_in(capture, self.pg7)
3010 pkts = self.create_stream_in(self.pg7, self.pg8)
3011 self.pg7.add_stream(pkts)
3012 self.pg_enable_capture(self.pg_interfaces)
3014 capture = self.pg8.get_capture(len(pkts))
3015 self.verify_capture_out(capture)
3017 def test_static_unknown_proto(self):
3018 """ 1:1 NAT translate packet with unknown protocol """
3019 nat_ip = "10.0.0.10"
3020 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
3021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3026 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3027 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3029 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3030 TCP(sport=1234, dport=1234))
3031 self.pg0.add_stream(p)
3032 self.pg_enable_capture(self.pg_interfaces)
3034 p = self.pg1.get_capture(1)
3037 self.assertEqual(packet[IP].src, nat_ip)
3038 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3039 self.assertTrue(packet.haslayer(GRE))
3040 self.check_ip_checksum(packet)
3042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3046 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3047 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3049 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3050 TCP(sport=1234, dport=1234))
3051 self.pg1.add_stream(p)
3052 self.pg_enable_capture(self.pg_interfaces)
3054 p = self.pg0.get_capture(1)
3057 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3058 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3059 self.assertTrue(packet.haslayer(GRE))
3060 self.check_ip_checksum(packet)
3062 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3065 def test_hairpinning_static_unknown_proto(self):
3066 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3068 host = self.pg0.remote_hosts[0]
3069 server = self.pg0.remote_hosts[1]
3071 host_nat_ip = "10.0.0.10"
3072 server_nat_ip = "10.0.0.11"
3074 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3075 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3076 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3077 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3081 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3082 IP(src=host.ip4, dst=server_nat_ip) /
3084 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3085 TCP(sport=1234, dport=1234))
3086 self.pg0.add_stream(p)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 p = self.pg0.get_capture(1)
3092 self.assertEqual(packet[IP].src, host_nat_ip)
3093 self.assertEqual(packet[IP].dst, server.ip4)
3094 self.assertTrue(packet.haslayer(GRE))
3095 self.check_ip_checksum(packet)
3097 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3101 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3102 IP(src=server.ip4, dst=host_nat_ip) /
3104 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3105 TCP(sport=1234, dport=1234))
3106 self.pg0.add_stream(p)
3107 self.pg_enable_capture(self.pg_interfaces)
3109 p = self.pg0.get_capture(1)
3112 self.assertEqual(packet[IP].src, server_nat_ip)
3113 self.assertEqual(packet[IP].dst, host.ip4)
3114 self.assertTrue(packet.haslayer(GRE))
3115 self.check_ip_checksum(packet)
3117 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3120 def test_unknown_proto(self):
3121 """ NAT44 translate packet with unknown protocol """
3122 self.nat44_add_address(self.nat_addr)
3123 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3124 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3128 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3129 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3130 TCP(sport=self.tcp_port_in, dport=20))
3131 self.pg0.add_stream(p)
3132 self.pg_enable_capture(self.pg_interfaces)
3134 p = self.pg1.get_capture(1)
3136 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3137 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3139 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3140 TCP(sport=1234, dport=1234))
3141 self.pg0.add_stream(p)
3142 self.pg_enable_capture(self.pg_interfaces)
3144 p = self.pg1.get_capture(1)
3147 self.assertEqual(packet[IP].src, self.nat_addr)
3148 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3149 self.assertTrue(packet.haslayer(GRE))
3150 self.check_ip_checksum(packet)
3152 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3156 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3157 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3159 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3160 TCP(sport=1234, dport=1234))
3161 self.pg1.add_stream(p)
3162 self.pg_enable_capture(self.pg_interfaces)
3164 p = self.pg0.get_capture(1)
3167 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3168 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3169 self.assertTrue(packet.haslayer(GRE))
3170 self.check_ip_checksum(packet)
3172 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3175 def test_hairpinning_unknown_proto(self):
3176 """ NAT44 translate packet with unknown protocol - hairpinning """
3177 host = self.pg0.remote_hosts[0]
3178 server = self.pg0.remote_hosts[1]
3181 server_in_port = 5678
3182 server_out_port = 8765
3183 server_nat_ip = "10.0.0.11"
3185 self.nat44_add_address(self.nat_addr)
3186 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3187 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3190 # add static mapping for server
3191 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3194 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3195 IP(src=host.ip4, dst=server_nat_ip) /
3196 TCP(sport=host_in_port, dport=server_out_port))
3197 self.pg0.add_stream(p)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg0.get_capture(1)
3202 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3203 IP(src=host.ip4, dst=server_nat_ip) /
3205 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3206 TCP(sport=1234, dport=1234))
3207 self.pg0.add_stream(p)
3208 self.pg_enable_capture(self.pg_interfaces)
3210 p = self.pg0.get_capture(1)
3213 self.assertEqual(packet[IP].src, self.nat_addr)
3214 self.assertEqual(packet[IP].dst, server.ip4)
3215 self.assertTrue(packet.haslayer(GRE))
3216 self.check_ip_checksum(packet)
3218 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3222 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3223 IP(src=server.ip4, dst=self.nat_addr) /
3225 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3226 TCP(sport=1234, dport=1234))
3227 self.pg0.add_stream(p)
3228 self.pg_enable_capture(self.pg_interfaces)
3230 p = self.pg0.get_capture(1)
3233 self.assertEqual(packet[IP].src, server_nat_ip)
3234 self.assertEqual(packet[IP].dst, host.ip4)
3235 self.assertTrue(packet.haslayer(GRE))
3236 self.check_ip_checksum(packet)
3238 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3241 def test_output_feature(self):
3242 """ NAT44 interface output feature (in2out postrouting) """
3243 self.nat44_add_address(self.nat_addr)
3244 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3245 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3246 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3250 pkts = self.create_stream_in(self.pg0, self.pg3)
3251 self.pg0.add_stream(pkts)
3252 self.pg_enable_capture(self.pg_interfaces)
3254 capture = self.pg3.get_capture(len(pkts))
3255 self.verify_capture_out(capture)
3258 pkts = self.create_stream_out(self.pg3)
3259 self.pg3.add_stream(pkts)
3260 self.pg_enable_capture(self.pg_interfaces)
3262 capture = self.pg0.get_capture(len(pkts))
3263 self.verify_capture_in(capture, self.pg0)
3265 # from non-NAT interface to NAT inside interface
3266 pkts = self.create_stream_in(self.pg2, self.pg0)
3267 self.pg2.add_stream(pkts)
3268 self.pg_enable_capture(self.pg_interfaces)
3270 capture = self.pg0.get_capture(len(pkts))
3271 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3273 def test_output_feature_vrf_aware(self):
3274 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3275 nat_ip_vrf10 = "10.0.0.10"
3276 nat_ip_vrf20 = "10.0.0.20"
3278 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3279 dst_address_length=32,
3280 next_hop_address=self.pg3.remote_ip4n,
3281 next_hop_sw_if_index=self.pg3.sw_if_index,
3283 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3284 dst_address_length=32,
3285 next_hop_address=self.pg3.remote_ip4n,
3286 next_hop_sw_if_index=self.pg3.sw_if_index,
3289 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3290 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3291 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3292 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3293 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3297 pkts = self.create_stream_in(self.pg4, self.pg3)
3298 self.pg4.add_stream(pkts)
3299 self.pg_enable_capture(self.pg_interfaces)
3301 capture = self.pg3.get_capture(len(pkts))
3302 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3305 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3306 self.pg3.add_stream(pkts)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 capture = self.pg4.get_capture(len(pkts))
3310 self.verify_capture_in(capture, self.pg4)
3313 pkts = self.create_stream_in(self.pg6, self.pg3)
3314 self.pg6.add_stream(pkts)
3315 self.pg_enable_capture(self.pg_interfaces)
3317 capture = self.pg3.get_capture(len(pkts))
3318 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3321 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3322 self.pg3.add_stream(pkts)
3323 self.pg_enable_capture(self.pg_interfaces)
3325 capture = self.pg6.get_capture(len(pkts))
3326 self.verify_capture_in(capture, self.pg6)
3328 def test_output_feature_hairpinning(self):
3329 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3330 host = self.pg0.remote_hosts[0]
3331 server = self.pg0.remote_hosts[1]
3334 server_in_port = 5678
3335 server_out_port = 8765
3337 self.nat44_add_address(self.nat_addr)
3338 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3339 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3342 # add static mapping for server
3343 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3344 server_in_port, server_out_port,
3345 proto=IP_PROTOS.tcp)
3347 # send packet from host to server
3348 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3349 IP(src=host.ip4, dst=self.nat_addr) /
3350 TCP(sport=host_in_port, dport=server_out_port))
3351 self.pg0.add_stream(p)
3352 self.pg_enable_capture(self.pg_interfaces)
3354 capture = self.pg0.get_capture(1)
3359 self.assertEqual(ip.src, self.nat_addr)
3360 self.assertEqual(ip.dst, server.ip4)
3361 self.assertNotEqual(tcp.sport, host_in_port)
3362 self.assertEqual(tcp.dport, server_in_port)
3363 self.check_tcp_checksum(p)
3364 host_out_port = tcp.sport
3366 self.logger.error(ppp("Unexpected or invalid packet:", p))
3369 # send reply from server to host
3370 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3371 IP(src=server.ip4, dst=self.nat_addr) /
3372 TCP(sport=server_in_port, dport=host_out_port))
3373 self.pg0.add_stream(p)
3374 self.pg_enable_capture(self.pg_interfaces)
3376 capture = self.pg0.get_capture(1)
3381 self.assertEqual(ip.src, self.nat_addr)
3382 self.assertEqual(ip.dst, host.ip4)
3383 self.assertEqual(tcp.sport, server_out_port)
3384 self.assertEqual(tcp.dport, host_in_port)
3385 self.check_tcp_checksum(p)
3387 self.logger.error(ppp("Unexpected or invalid packet:", p))
3390 def test_output_feature_and_service(self):
3391 """ NAT44 interface output feature and services """
3392 external_addr = '1.2.3.4'
3396 self.vapi.nat44_forwarding_enable_disable(1)
3397 self.nat44_add_address(self.nat_addr)
3398 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3399 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3400 local_port, external_port,
3401 proto=IP_PROTOS.tcp, out2in_only=1)
3402 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3403 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3405 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3408 # from client to service
3409 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3410 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3411 TCP(sport=12345, dport=external_port))
3412 self.pg1.add_stream(p)
3413 self.pg_enable_capture(self.pg_interfaces)
3415 capture = self.pg0.get_capture(1)
3421 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3422 self.assertEqual(tcp.dport, local_port)
3423 self.check_tcp_checksum(p)
3424 self.check_ip_checksum(p)
3426 self.logger.error(ppp("Unexpected or invalid packet:", p))
3429 # from service back to client
3430 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3431 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3432 TCP(sport=local_port, dport=12345))
3433 self.pg0.add_stream(p)
3434 self.pg_enable_capture(self.pg_interfaces)
3436 capture = self.pg1.get_capture(1)
3441 self.assertEqual(ip.src, external_addr)
3442 self.assertEqual(tcp.sport, external_port)
3443 self.check_tcp_checksum(p)
3444 self.check_ip_checksum(p)
3446 self.logger.error(ppp("Unexpected or invalid packet:", p))
3449 # from local network host to external network
3450 pkts = self.create_stream_in(self.pg0, self.pg1)
3451 self.pg0.add_stream(pkts)
3452 self.pg_enable_capture(self.pg_interfaces)
3454 capture = self.pg1.get_capture(len(pkts))
3455 self.verify_capture_out(capture)
3456 pkts = self.create_stream_in(self.pg0, self.pg1)
3457 self.pg0.add_stream(pkts)
3458 self.pg_enable_capture(self.pg_interfaces)
3460 capture = self.pg1.get_capture(len(pkts))
3461 self.verify_capture_out(capture)
3463 # from external network back to local network host
3464 pkts = self.create_stream_out(self.pg1)
3465 self.pg1.add_stream(pkts)
3466 self.pg_enable_capture(self.pg_interfaces)
3468 capture = self.pg0.get_capture(len(pkts))
3469 self.verify_capture_in(capture, self.pg0)
3471 def test_output_feature_and_service2(self):
3472 """ NAT44 interface output feature and service host direct access """
3473 self.vapi.nat44_forwarding_enable_disable(1)
3474 self.nat44_add_address(self.nat_addr)
3475 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3478 # session initiaded from service host - translate
3479 pkts = self.create_stream_in(self.pg0, self.pg1)
3480 self.pg0.add_stream(pkts)
3481 self.pg_enable_capture(self.pg_interfaces)
3483 capture = self.pg1.get_capture(len(pkts))
3484 self.verify_capture_out(capture)
3486 pkts = self.create_stream_out(self.pg1)
3487 self.pg1.add_stream(pkts)
3488 self.pg_enable_capture(self.pg_interfaces)
3490 capture = self.pg0.get_capture(len(pkts))
3491 self.verify_capture_in(capture, self.pg0)
3493 tcp_port_out = self.tcp_port_out
3494 udp_port_out = self.udp_port_out
3495 icmp_id_out = self.icmp_id_out
3497 # session initiaded from remote host - do not translate
3498 pkts = self.create_stream_out(self.pg1,
3499 self.pg0.remote_ip4,
3500 use_inside_ports=True)
3501 self.pg1.add_stream(pkts)
3502 self.pg_enable_capture(self.pg_interfaces)
3504 capture = self.pg0.get_capture(len(pkts))
3505 self.verify_capture_in(capture, self.pg0)
3507 pkts = self.create_stream_in(self.pg0, self.pg1)
3508 self.pg0.add_stream(pkts)
3509 self.pg_enable_capture(self.pg_interfaces)
3511 capture = self.pg1.get_capture(len(pkts))
3512 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3515 def test_output_feature_and_service3(self):
3516 """ NAT44 interface output feature and DST NAT """
3517 external_addr = '1.2.3.4'
3521 self.vapi.nat44_forwarding_enable_disable(1)
3522 self.nat44_add_address(self.nat_addr)
3523 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3524 local_port, external_port,
3525 proto=IP_PROTOS.tcp, out2in_only=1)
3526 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3527 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3529 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3532 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3533 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3534 TCP(sport=12345, dport=external_port))
3535 self.pg0.add_stream(p)
3536 self.pg_enable_capture(self.pg_interfaces)
3538 capture = self.pg1.get_capture(1)
3543 self.assertEqual(ip.src, self.pg0.remote_ip4)
3544 self.assertEqual(tcp.sport, 12345)
3545 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3546 self.assertEqual(tcp.dport, local_port)
3547 self.check_tcp_checksum(p)
3548 self.check_ip_checksum(p)
3550 self.logger.error(ppp("Unexpected or invalid packet:", p))
3553 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3554 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3555 TCP(sport=local_port, dport=12345))
3556 self.pg1.add_stream(p)
3557 self.pg_enable_capture(self.pg_interfaces)
3559 capture = self.pg0.get_capture(1)
3564 self.assertEqual(ip.src, external_addr)
3565 self.assertEqual(tcp.sport, external_port)
3566 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3567 self.assertEqual(tcp.dport, 12345)
3568 self.check_tcp_checksum(p)
3569 self.check_ip_checksum(p)
3571 self.logger.error(ppp("Unexpected or invalid packet:", p))
3574 def test_one_armed_nat44(self):
3575 """ One armed NAT44 """
3576 remote_host = self.pg9.remote_hosts[0]
3577 local_host = self.pg9.remote_hosts[1]
3580 self.nat44_add_address(self.nat_addr)
3581 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3582 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3586 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3587 IP(src=local_host.ip4, dst=remote_host.ip4) /
3588 TCP(sport=12345, dport=80))
3589 self.pg9.add_stream(p)
3590 self.pg_enable_capture(self.pg_interfaces)
3592 capture = self.pg9.get_capture(1)
3597 self.assertEqual(ip.src, self.nat_addr)
3598 self.assertEqual(ip.dst, remote_host.ip4)
3599 self.assertNotEqual(tcp.sport, 12345)
3600 external_port = tcp.sport
3601 self.assertEqual(tcp.dport, 80)
3602 self.check_tcp_checksum(p)
3603 self.check_ip_checksum(p)
3605 self.logger.error(ppp("Unexpected or invalid packet:", p))
3609 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3610 IP(src=remote_host.ip4, dst=self.nat_addr) /
3611 TCP(sport=80, dport=external_port))
3612 self.pg9.add_stream(p)
3613 self.pg_enable_capture(self.pg_interfaces)
3615 capture = self.pg9.get_capture(1)
3620 self.assertEqual(ip.src, remote_host.ip4)
3621 self.assertEqual(ip.dst, local_host.ip4)
3622 self.assertEqual(tcp.sport, 80)
3623 self.assertEqual(tcp.dport, 12345)
3624 self.check_tcp_checksum(p)
3625 self.check_ip_checksum(p)
3627 self.logger.error(ppp("Unexpected or invalid packet:", p))
3630 def test_one_armed_nat44_static(self):
3631 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3632 remote_host = self.pg9.remote_hosts[0]
3633 local_host = self.pg9.remote_hosts[1]
3638 self.vapi.nat44_forwarding_enable_disable(1)
3639 self.nat44_add_address(self.nat_addr, twice_nat=1)
3640 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3641 local_port, external_port,
3642 proto=IP_PROTOS.tcp, out2in_only=1,
3644 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3645 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3648 # from client to service
3649 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3650 IP(src=remote_host.ip4, dst=self.nat_addr) /
3651 TCP(sport=12345, dport=external_port))
3652 self.pg9.add_stream(p)
3653 self.pg_enable_capture(self.pg_interfaces)
3655 capture = self.pg9.get_capture(1)
3661 self.assertEqual(ip.dst, local_host.ip4)
3662 self.assertEqual(ip.src, self.nat_addr)
3663 self.assertEqual(tcp.dport, local_port)
3664 self.assertNotEqual(tcp.sport, 12345)
3665 eh_port_in = tcp.sport
3666 self.check_tcp_checksum(p)
3667 self.check_ip_checksum(p)
3669 self.logger.error(ppp("Unexpected or invalid packet:", p))
3672 # from service back to client
3673 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3674 IP(src=local_host.ip4, dst=self.nat_addr) /
3675 TCP(sport=local_port, dport=eh_port_in))
3676 self.pg9.add_stream(p)
3677 self.pg_enable_capture(self.pg_interfaces)
3679 capture = self.pg9.get_capture(1)
3684 self.assertEqual(ip.src, self.nat_addr)
3685 self.assertEqual(ip.dst, remote_host.ip4)
3686 self.assertEqual(tcp.sport, external_port)
3687 self.assertEqual(tcp.dport, 12345)
3688 self.check_tcp_checksum(p)
3689 self.check_ip_checksum(p)
3691 self.logger.error(ppp("Unexpected or invalid packet:", p))
3694 def test_del_session(self):
3695 """ Delete NAT44 session """
3696 self.nat44_add_address(self.nat_addr)
3697 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3698 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3701 pkts = self.create_stream_in(self.pg0, self.pg1)
3702 self.pg0.add_stream(pkts)
3703 self.pg_enable_capture(self.pg_interfaces)
3705 capture = self.pg1.get_capture(len(pkts))
3707 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3708 nsessions = len(sessions)
3710 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3711 sessions[0].inside_port,
3712 sessions[0].protocol)
3713 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3714 sessions[1].outside_port,
3715 sessions[1].protocol,
3718 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3719 self.assertEqual(nsessions - len(sessions), 2)
3721 def test_set_get_reass(self):
3722 """ NAT44 set/get virtual fragmentation reassembly """
3723 reas_cfg1 = self.vapi.nat_get_reass()
3725 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3726 max_reass=reas_cfg1.ip4_max_reass * 2,
3727 max_frag=reas_cfg1.ip4_max_frag * 2)
3729 reas_cfg2 = self.vapi.nat_get_reass()
3731 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3732 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3733 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3735 self.vapi.nat_set_reass(drop_frag=1)
3736 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3738 def test_frag_in_order(self):
3739 """ NAT44 translate fragments arriving in order """
3740 self.nat44_add_address(self.nat_addr)
3741 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3742 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3745 data = "A" * 4 + "B" * 16 + "C" * 3
3746 self.tcp_port_in = random.randint(1025, 65535)
3748 reass = self.vapi.nat_reass_dump()
3749 reass_n_start = len(reass)
3752 pkts = self.create_stream_frag(self.pg0,
3753 self.pg1.remote_ip4,
3757 self.pg0.add_stream(pkts)
3758 self.pg_enable_capture(self.pg_interfaces)
3760 frags = self.pg1.get_capture(len(pkts))
3761 p = self.reass_frags_and_verify(frags,
3763 self.pg1.remote_ip4)
3764 self.assertEqual(p[TCP].dport, 20)
3765 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3766 self.tcp_port_out = p[TCP].sport
3767 self.assertEqual(data, p[Raw].load)
3770 pkts = self.create_stream_frag(self.pg1,
3775 self.pg1.add_stream(pkts)
3776 self.pg_enable_capture(self.pg_interfaces)
3778 frags = self.pg0.get_capture(len(pkts))
3779 p = self.reass_frags_and_verify(frags,
3780 self.pg1.remote_ip4,
3781 self.pg0.remote_ip4)
3782 self.assertEqual(p[TCP].sport, 20)
3783 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3784 self.assertEqual(data, p[Raw].load)
3786 reass = self.vapi.nat_reass_dump()
3787 reass_n_end = len(reass)
3789 self.assertEqual(reass_n_end - reass_n_start, 2)
3791 def test_reass_hairpinning(self):
3792 """ NAT44 fragments hairpinning """
3793 host = self.pg0.remote_hosts[0]
3794 server = self.pg0.remote_hosts[1]
3795 host_in_port = random.randint(1025, 65535)
3797 server_in_port = random.randint(1025, 65535)
3798 server_out_port = random.randint(1025, 65535)
3799 data = "A" * 4 + "B" * 16 + "C" * 3
3801 self.nat44_add_address(self.nat_addr)
3802 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3803 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3805 # add static mapping for server
3806 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3807 server_in_port, server_out_port,
3808 proto=IP_PROTOS.tcp)
3810 # send packet from host to server
3811 pkts = self.create_stream_frag(self.pg0,
3816 self.pg0.add_stream(pkts)
3817 self.pg_enable_capture(self.pg_interfaces)
3819 frags = self.pg0.get_capture(len(pkts))
3820 p = self.reass_frags_and_verify(frags,
3823 self.assertNotEqual(p[TCP].sport, host_in_port)
3824 self.assertEqual(p[TCP].dport, server_in_port)
3825 self.assertEqual(data, p[Raw].load)
3827 def test_frag_out_of_order(self):
3828 """ NAT44 translate fragments arriving out of order """
3829 self.nat44_add_address(self.nat_addr)
3830 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3831 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3834 data = "A" * 4 + "B" * 16 + "C" * 3
3835 random.randint(1025, 65535)
3838 pkts = self.create_stream_frag(self.pg0,
3839 self.pg1.remote_ip4,
3844 self.pg0.add_stream(pkts)
3845 self.pg_enable_capture(self.pg_interfaces)
3847 frags = self.pg1.get_capture(len(pkts))
3848 p = self.reass_frags_and_verify(frags,
3850 self.pg1.remote_ip4)
3851 self.assertEqual(p[TCP].dport, 20)
3852 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3853 self.tcp_port_out = p[TCP].sport
3854 self.assertEqual(data, p[Raw].load)
3857 pkts = self.create_stream_frag(self.pg1,
3863 self.pg1.add_stream(pkts)
3864 self.pg_enable_capture(self.pg_interfaces)
3866 frags = self.pg0.get_capture(len(pkts))
3867 p = self.reass_frags_and_verify(frags,
3868 self.pg1.remote_ip4,
3869 self.pg0.remote_ip4)
3870 self.assertEqual(p[TCP].sport, 20)
3871 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3872 self.assertEqual(data, p[Raw].load)
3874 def test_port_restricted(self):
3875 """ Port restricted NAT44 (MAP-E CE) """
3876 self.nat44_add_address(self.nat_addr)
3877 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3878 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3880 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3881 "psid-offset 6 psid-len 6")
3883 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3884 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3885 TCP(sport=4567, dport=22))
3886 self.pg0.add_stream(p)
3887 self.pg_enable_capture(self.pg_interfaces)
3889 capture = self.pg1.get_capture(1)
3894 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3895 self.assertEqual(ip.src, self.nat_addr)
3896 self.assertEqual(tcp.dport, 22)
3897 self.assertNotEqual(tcp.sport, 4567)
3898 self.assertEqual((tcp.sport >> 6) & 63, 10)
3899 self.check_tcp_checksum(p)
3900 self.check_ip_checksum(p)
3902 self.logger.error(ppp("Unexpected or invalid packet:", p))
3905 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3907 twice_nat_addr = '10.0.1.3'
3915 port_in1 = port_in+1
3916 port_in2 = port_in+2
3921 server1 = self.pg0.remote_hosts[0]
3922 server2 = self.pg0.remote_hosts[1]
3934 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3937 self.nat44_add_address(self.nat_addr)
3938 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3940 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3942 proto=IP_PROTOS.tcp,
3943 twice_nat=int(not self_twice_nat),
3944 self_twice_nat=int(self_twice_nat))
3946 locals = [{'addr': server1.ip4n,
3949 {'addr': server2.ip4n,
3952 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3953 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3957 not self_twice_nat),
3960 local_num=len(locals),
3962 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3963 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3970 assert client_id is not None
3972 client = self.pg0.remote_hosts[0]
3973 elif client_id == 2:
3974 client = self.pg0.remote_hosts[1]
3976 client = pg1.remote_hosts[0]
3977 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3978 IP(src=client.ip4, dst=self.nat_addr) /
3979 TCP(sport=eh_port_out, dport=port_out))
3981 self.pg_enable_capture(self.pg_interfaces)
3983 capture = pg0.get_capture(1)
3989 if ip.dst == server1.ip4:
3995 self.assertEqual(ip.dst, server.ip4)
3997 self.assertIn(tcp.dport, [port_in1, port_in2])
3999 self.assertEqual(tcp.dport, port_in)
4001 self.assertEqual(ip.src, twice_nat_addr)
4002 self.assertNotEqual(tcp.sport, eh_port_out)
4004 self.assertEqual(ip.src, client.ip4)
4005 self.assertEqual(tcp.sport, eh_port_out)
4007 eh_port_in = tcp.sport
4008 saved_port_in = tcp.dport
4009 self.check_tcp_checksum(p)
4010 self.check_ip_checksum(p)
4012 self.logger.error(ppp("Unexpected or invalid packet:", p))
4015 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4016 IP(src=server.ip4, dst=eh_addr_in) /
4017 TCP(sport=saved_port_in, dport=eh_port_in))
4019 self.pg_enable_capture(self.pg_interfaces)
4021 capture = pg1.get_capture(1)
4026 self.assertEqual(ip.dst, client.ip4)
4027 self.assertEqual(ip.src, self.nat_addr)
4028 self.assertEqual(tcp.dport, eh_port_out)
4029 self.assertEqual(tcp.sport, port_out)
4030 self.check_tcp_checksum(p)
4031 self.check_ip_checksum(p)
4033 self.logger.error(ppp("Unexpected or invalid packet:", p))
4036 def test_twice_nat(self):
4038 self.twice_nat_common()
4040 def test_self_twice_nat_positive(self):
4041 """ Self Twice NAT44 (positive test) """
4042 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4044 def test_self_twice_nat_negative(self):
4045 """ Self Twice NAT44 (negative test) """
4046 self.twice_nat_common(self_twice_nat=True)
4048 def test_twice_nat_lb(self):
4049 """ Twice NAT44 local service load balancing """
4050 self.twice_nat_common(lb=True)
4052 def test_self_twice_nat_lb_positive(self):
4053 """ Self Twice NAT44 local service load balancing (positive test) """
4054 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4057 def test_self_twice_nat_lb_negative(self):
4058 """ Self Twice NAT44 local service load balancing (negative test) """
4059 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4062 def test_twice_nat_interface_addr(self):
4063 """ Acquire twice NAT44 addresses from interface """
4064 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
4066 # no address in NAT pool
4067 adresses = self.vapi.nat44_address_dump()
4068 self.assertEqual(0, len(adresses))
4070 # configure interface address and check NAT address pool
4071 self.pg7.config_ip4()
4072 adresses = self.vapi.nat44_address_dump()
4073 self.assertEqual(1, len(adresses))
4074 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
4075 self.assertEqual(adresses[0].twice_nat, 1)
4077 # remove interface address and check NAT address pool
4078 self.pg7.unconfig_ip4()
4079 adresses = self.vapi.nat44_address_dump()
4080 self.assertEqual(0, len(adresses))
4082 def test_ipfix_max_frags(self):
4083 """ IPFIX logging maximum fragments pending reassembly exceeded """
4084 self.nat44_add_address(self.nat_addr)
4085 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4086 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4088 self.vapi.nat_set_reass(max_frag=0)
4089 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4090 src_address=self.pg3.local_ip4n,
4092 template_interval=10)
4093 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4094 src_port=self.ipfix_src_port)
4096 data = "A" * 4 + "B" * 16 + "C" * 3
4097 self.tcp_port_in = random.randint(1025, 65535)
4098 pkts = self.create_stream_frag(self.pg0,
4099 self.pg1.remote_ip4,
4103 self.pg0.add_stream(pkts[-1])
4104 self.pg_enable_capture(self.pg_interfaces)
4106 frags = self.pg1.get_capture(0)
4107 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4108 capture = self.pg3.get_capture(9)
4109 ipfix = IPFIXDecoder()
4110 # first load template
4112 self.assertTrue(p.haslayer(IPFIX))
4113 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4114 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4115 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4116 self.assertEqual(p[UDP].dport, 4739)
4117 self.assertEqual(p[IPFIX].observationDomainID,
4118 self.ipfix_domain_id)
4119 if p.haslayer(Template):
4120 ipfix.add_template(p.getlayer(Template))
4121 # verify events in data set
4123 if p.haslayer(Data):
4124 data = ipfix.decode_data_set(p.getlayer(Set))
4125 self.verify_ipfix_max_fragments_ip4(data, 0,
4126 self.pg0.remote_ip4n)
4128 def test_tcp_session_close_in(self):
4129 """ Close TCP session from inside network """
4130 self.nat44_add_address(self.nat_addr)
4131 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4132 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4135 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4136 start_sessnum = len(sessions)
4138 self.initiate_tcp_session(self.pg0, self.pg1)
4140 # close the session from inside
4142 # FIN packet in -> out
4143 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4144 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4145 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4147 self.pg0.add_stream(p)
4148 self.pg_enable_capture(self.pg_interfaces)
4150 self.pg1.get_capture(1)
4154 # ACK packet out -> in
4155 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4156 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4157 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4161 # FIN packet out -> in
4162 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4163 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4164 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4168 self.pg1.add_stream(pkts)
4169 self.pg_enable_capture(self.pg_interfaces)
4171 self.pg0.get_capture(2)
4173 # ACK packet in -> out
4174 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4175 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4176 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4178 self.pg0.add_stream(p)
4179 self.pg_enable_capture(self.pg_interfaces)
4181 self.pg1.get_capture(1)
4183 self.initiate_tcp_session(self.pg0, self.pg1)
4184 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4186 self.assertEqual(len(sessions) - start_sessnum, 2)
4188 self.logger.error("TCP session termination failed")
4191 def test_tcp_session_close_out(self):
4192 """ Close TCP session from outside network """
4193 self.nat44_add_address(self.nat_addr)
4194 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4195 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4198 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4199 start_sessnum = len(sessions)
4201 self.initiate_tcp_session(self.pg0, self.pg1)
4203 # close the session from outside
4205 # FIN packet out -> in
4206 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4207 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4208 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4210 self.pg1.add_stream(p)
4211 self.pg_enable_capture(self.pg_interfaces)
4213 self.pg0.get_capture(1)
4217 # ACK packet in -> out
4218 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4219 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4220 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4224 # ACK packet in -> out
4225 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4226 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4227 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4231 self.pg0.add_stream(pkts)
4232 self.pg_enable_capture(self.pg_interfaces)
4234 self.pg1.get_capture(2)
4236 # ACK packet out -> in
4237 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4238 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4239 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4241 self.pg1.add_stream(p)
4242 self.pg_enable_capture(self.pg_interfaces)
4244 self.pg0.get_capture(1)
4246 self.initiate_tcp_session(self.pg0, self.pg1)
4247 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4249 self.assertEqual(len(sessions) - start_sessnum, 2)
4251 self.logger.error("TCP session termination failed")
4254 def test_tcp_session_close_simultaneous(self):
4255 """ Close TCP session from inside network """
4256 self.nat44_add_address(self.nat_addr)
4257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4261 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4262 start_sessnum = len(sessions)
4264 self.initiate_tcp_session(self.pg0, self.pg1)
4266 # close the session from inside
4268 # FIN packet in -> out
4269 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4270 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4271 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4273 self.pg0.add_stream(p)
4274 self.pg_enable_capture(self.pg_interfaces)
4276 self.pg1.get_capture(1)
4278 # FIN packet out -> in
4279 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4280 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4281 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4283 self.pg1.add_stream(p)
4284 self.pg_enable_capture(self.pg_interfaces)
4286 self.pg0.get_capture(1)
4288 # ACK packet in -> out
4289 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4290 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4291 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4293 self.pg0.add_stream(p)
4294 self.pg_enable_capture(self.pg_interfaces)
4296 self.pg1.get_capture(1)
4298 # ACK packet out -> in
4299 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4300 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4301 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4303 self.pg1.add_stream(p)
4304 self.pg_enable_capture(self.pg_interfaces)
4306 self.pg0.get_capture(1)
4308 self.initiate_tcp_session(self.pg0, self.pg1)
4309 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4311 self.assertEqual(len(sessions) - start_sessnum, 2)
4313 self.logger.error("TCP session termination failed")
4317 super(TestNAT44, self).tearDown()
4318 if not self.vpp_dead:
4319 self.logger.info(self.vapi.cli("show nat44 addresses"))
4320 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4321 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4322 self.logger.info(self.vapi.cli("show nat44 interface address"))
4323 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4324 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4325 self.vapi.cli("nat addr-port-assignment-alg default")
4329 class TestNAT44Out2InDPO(MethodHolder):
4330 """ NAT44 Test Cases using out2in DPO """
4333 def setUpConstants(cls):
4334 super(TestNAT44Out2InDPO, cls).setUpConstants()
4335 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4338 def setUpClass(cls):
4339 super(TestNAT44Out2InDPO, cls).setUpClass()
4342 cls.tcp_port_in = 6303
4343 cls.tcp_port_out = 6303
4344 cls.udp_port_in = 6304
4345 cls.udp_port_out = 6304
4346 cls.icmp_id_in = 6305
4347 cls.icmp_id_out = 6305
4348 cls.nat_addr = '10.0.0.3'
4349 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4350 cls.dst_ip4 = '192.168.70.1'
4352 cls.create_pg_interfaces(range(2))
4355 cls.pg0.config_ip4()
4356 cls.pg0.resolve_arp()
4359 cls.pg1.config_ip6()
4360 cls.pg1.resolve_ndp()
4362 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4363 dst_address_length=0,
4364 next_hop_address=cls.pg1.remote_ip6n,
4365 next_hop_sw_if_index=cls.pg1.sw_if_index)
4368 super(TestNAT44Out2InDPO, cls).tearDownClass()
4371 def configure_xlat(self):
4372 self.dst_ip6_pfx = '1:2:3::'
4373 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4375 self.dst_ip6_pfx_len = 96
4376 self.src_ip6_pfx = '4:5:6::'
4377 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4379 self.src_ip6_pfx_len = 96
4380 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4381 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4382 '\x00\x00\x00\x00', 0, is_translation=1,
4385 def test_464xlat_ce(self):
4386 """ Test 464XLAT CE with NAT44 """
4388 self.configure_xlat()
4390 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4391 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4393 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4394 self.dst_ip6_pfx_len)
4395 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4396 self.src_ip6_pfx_len)
4399 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4400 self.pg0.add_stream(pkts)
4401 self.pg_enable_capture(self.pg_interfaces)
4403 capture = self.pg1.get_capture(len(pkts))
4404 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4407 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4409 self.pg1.add_stream(pkts)
4410 self.pg_enable_capture(self.pg_interfaces)
4412 capture = self.pg0.get_capture(len(pkts))
4413 self.verify_capture_in(capture, self.pg0)
4415 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4417 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4418 self.nat_addr_n, is_add=0)
4420 def test_464xlat_ce_no_nat(self):
4421 """ Test 464XLAT CE without NAT44 """
4423 self.configure_xlat()
4425 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4426 self.dst_ip6_pfx_len)
4427 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4428 self.src_ip6_pfx_len)
4430 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4431 self.pg0.add_stream(pkts)
4432 self.pg_enable_capture(self.pg_interfaces)
4434 capture = self.pg1.get_capture(len(pkts))
4435 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4436 nat_ip=out_dst_ip6, same_port=True)
4438 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4439 self.pg1.add_stream(pkts)
4440 self.pg_enable_capture(self.pg_interfaces)
4442 capture = self.pg0.get_capture(len(pkts))
4443 self.verify_capture_in(capture, self.pg0)
4446 class TestDeterministicNAT(MethodHolder):
4447 """ Deterministic NAT Test Cases """
4450 def setUpConstants(cls):
4451 super(TestDeterministicNAT, cls).setUpConstants()
4452 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4455 def setUpClass(cls):
4456 super(TestDeterministicNAT, cls).setUpClass()
4459 cls.tcp_port_in = 6303
4460 cls.tcp_external_port = 6303
4461 cls.udp_port_in = 6304
4462 cls.udp_external_port = 6304
4463 cls.icmp_id_in = 6305
4464 cls.nat_addr = '10.0.0.3'
4466 cls.create_pg_interfaces(range(3))
4467 cls.interfaces = list(cls.pg_interfaces)
4469 for i in cls.interfaces:
4474 cls.pg0.generate_remote_hosts(2)
4475 cls.pg0.configure_ipv4_neighbors()
4478 super(TestDeterministicNAT, cls).tearDownClass()
4481 def create_stream_in(self, in_if, out_if, ttl=64):
4483 Create packet stream for inside network
4485 :param in_if: Inside interface
4486 :param out_if: Outside interface
4487 :param ttl: TTL of generated packets
4491 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4492 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4493 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4497 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4498 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4499 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4503 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4504 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4505 ICMP(id=self.icmp_id_in, type='echo-request'))
4510 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4512 Create packet stream for outside network
4514 :param out_if: Outside interface
4515 :param dst_ip: Destination IP address (Default use global NAT address)
4516 :param ttl: TTL of generated packets
4519 dst_ip = self.nat_addr
4522 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4523 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4524 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4528 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4529 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4530 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4534 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4535 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4536 ICMP(id=self.icmp_external_id, type='echo-reply'))
4541 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4543 Verify captured packets on outside network
4545 :param capture: Captured packets
4546 :param nat_ip: Translated IP address (Default use global NAT address)
4547 :param same_port: Sorce port number is not translated (Default False)
4548 :param packet_num: Expected number of packets (Default 3)
4551 nat_ip = self.nat_addr
4552 self.assertEqual(packet_num, len(capture))
4553 for packet in capture:
4555 self.assertEqual(packet[IP].src, nat_ip)
4556 if packet.haslayer(TCP):
4557 self.tcp_port_out = packet[TCP].sport
4558 elif packet.haslayer(UDP):
4559 self.udp_port_out = packet[UDP].sport
4561 self.icmp_external_id = packet[ICMP].id
4563 self.logger.error(ppp("Unexpected or invalid packet "
4564 "(outside network):", packet))
4567 def verify_ipfix_max_entries_per_user(self, data):
4569 Verify IPFIX maximum entries per user exceeded event
4571 :param data: Decoded IPFIX data records
4573 self.assertEqual(1, len(data))
4576 self.assertEqual(ord(record[230]), 13)
4577 # natQuotaExceededEvent
4578 self.assertEqual('\x03\x00\x00\x00', record[466])
4580 self.assertEqual('\xe8\x03\x00\x00', record[473])
4582 self.assertEqual(self.pg0.remote_ip4n, record[8])
4584 def test_deterministic_mode(self):
4585 """ NAT plugin run deterministic mode """
4586 in_addr = '172.16.255.0'
4587 out_addr = '172.17.255.50'
4588 in_addr_t = '172.16.255.20'
4589 in_addr_n = socket.inet_aton(in_addr)
4590 out_addr_n = socket.inet_aton(out_addr)
4591 in_addr_t_n = socket.inet_aton(in_addr_t)
4595 nat_config = self.vapi.nat_show_config()
4596 self.assertEqual(1, nat_config.deterministic)
4598 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4600 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4601 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4602 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4603 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4605 deterministic_mappings = self.vapi.nat_det_map_dump()
4606 self.assertEqual(len(deterministic_mappings), 1)
4607 dsm = deterministic_mappings[0]
4608 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4609 self.assertEqual(in_plen, dsm.in_plen)
4610 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4611 self.assertEqual(out_plen, dsm.out_plen)
4613 self.clear_nat_det()
4614 deterministic_mappings = self.vapi.nat_det_map_dump()
4615 self.assertEqual(len(deterministic_mappings), 0)
4617 def test_set_timeouts(self):
4618 """ Set deterministic NAT timeouts """
4619 timeouts_before = self.vapi.nat_det_get_timeouts()
4621 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4622 timeouts_before.tcp_established + 10,
4623 timeouts_before.tcp_transitory + 10,
4624 timeouts_before.icmp + 10)
4626 timeouts_after = self.vapi.nat_det_get_timeouts()
4628 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4629 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4630 self.assertNotEqual(timeouts_before.tcp_established,
4631 timeouts_after.tcp_established)
4632 self.assertNotEqual(timeouts_before.tcp_transitory,
4633 timeouts_after.tcp_transitory)
4635 def test_det_in(self):
4636 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4638 nat_ip = "10.0.0.10"
4640 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4642 socket.inet_aton(nat_ip),
4644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4649 pkts = self.create_stream_in(self.pg0, self.pg1)
4650 self.pg0.add_stream(pkts)
4651 self.pg_enable_capture(self.pg_interfaces)
4653 capture = self.pg1.get_capture(len(pkts))
4654 self.verify_capture_out(capture, nat_ip)
4657 pkts = self.create_stream_out(self.pg1, nat_ip)
4658 self.pg1.add_stream(pkts)
4659 self.pg_enable_capture(self.pg_interfaces)
4661 capture = self.pg0.get_capture(len(pkts))
4662 self.verify_capture_in(capture, self.pg0)
4665 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4666 self.assertEqual(len(sessions), 3)
4670 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4671 self.assertEqual(s.in_port, self.tcp_port_in)
4672 self.assertEqual(s.out_port, self.tcp_port_out)
4673 self.assertEqual(s.ext_port, self.tcp_external_port)
4677 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4678 self.assertEqual(s.in_port, self.udp_port_in)
4679 self.assertEqual(s.out_port, self.udp_port_out)
4680 self.assertEqual(s.ext_port, self.udp_external_port)
4684 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4685 self.assertEqual(s.in_port, self.icmp_id_in)
4686 self.assertEqual(s.out_port, self.icmp_external_id)
4688 def test_multiple_users(self):
4689 """ Deterministic NAT multiple users """
4691 nat_ip = "10.0.0.10"
4693 external_port = 6303
4695 host0 = self.pg0.remote_hosts[0]
4696 host1 = self.pg0.remote_hosts[1]
4698 self.vapi.nat_det_add_del_map(host0.ip4n,
4700 socket.inet_aton(nat_ip),
4702 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4703 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4707 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4708 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4709 TCP(sport=port_in, dport=external_port))
4710 self.pg0.add_stream(p)
4711 self.pg_enable_capture(self.pg_interfaces)
4713 capture = self.pg1.get_capture(1)
4718 self.assertEqual(ip.src, nat_ip)
4719 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4720 self.assertEqual(tcp.dport, external_port)
4721 port_out0 = tcp.sport
4723 self.logger.error(ppp("Unexpected or invalid packet:", p))
4727 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4728 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4729 TCP(sport=port_in, dport=external_port))
4730 self.pg0.add_stream(p)
4731 self.pg_enable_capture(self.pg_interfaces)
4733 capture = self.pg1.get_capture(1)
4738 self.assertEqual(ip.src, nat_ip)
4739 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4740 self.assertEqual(tcp.dport, external_port)
4741 port_out1 = tcp.sport
4743 self.logger.error(ppp("Unexpected or invalid packet:", p))
4746 dms = self.vapi.nat_det_map_dump()
4747 self.assertEqual(1, len(dms))
4748 self.assertEqual(2, dms[0].ses_num)
4751 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4752 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4753 TCP(sport=external_port, dport=port_out0))
4754 self.pg1.add_stream(p)
4755 self.pg_enable_capture(self.pg_interfaces)
4757 capture = self.pg0.get_capture(1)
4762 self.assertEqual(ip.src, self.pg1.remote_ip4)
4763 self.assertEqual(ip.dst, host0.ip4)
4764 self.assertEqual(tcp.dport, port_in)
4765 self.assertEqual(tcp.sport, external_port)
4767 self.logger.error(ppp("Unexpected or invalid packet:", p))
4771 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4772 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4773 TCP(sport=external_port, dport=port_out1))
4774 self.pg1.add_stream(p)
4775 self.pg_enable_capture(self.pg_interfaces)
4777 capture = self.pg0.get_capture(1)
4782 self.assertEqual(ip.src, self.pg1.remote_ip4)
4783 self.assertEqual(ip.dst, host1.ip4)
4784 self.assertEqual(tcp.dport, port_in)
4785 self.assertEqual(tcp.sport, external_port)
4787 self.logger.error(ppp("Unexpected or invalid packet", p))
4790 # session close api test
4791 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4793 self.pg1.remote_ip4n,
4795 dms = self.vapi.nat_det_map_dump()
4796 self.assertEqual(dms[0].ses_num, 1)
4798 self.vapi.nat_det_close_session_in(host0.ip4n,
4800 self.pg1.remote_ip4n,
4802 dms = self.vapi.nat_det_map_dump()
4803 self.assertEqual(dms[0].ses_num, 0)
4805 def test_tcp_session_close_detection_in(self):
4806 """ Deterministic NAT TCP session close from inside network """
4807 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4809 socket.inet_aton(self.nat_addr),
4811 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4812 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4815 self.initiate_tcp_session(self.pg0, self.pg1)
4817 # close the session from inside
4819 # FIN packet in -> out
4820 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4821 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4822 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4824 self.pg0.add_stream(p)
4825 self.pg_enable_capture(self.pg_interfaces)
4827 self.pg1.get_capture(1)
4831 # ACK packet out -> in
4832 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4833 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4834 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4838 # FIN packet out -> in
4839 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4840 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4841 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4845 self.pg1.add_stream(pkts)
4846 self.pg_enable_capture(self.pg_interfaces)
4848 self.pg0.get_capture(2)
4850 # ACK packet in -> out
4851 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4852 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4853 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4855 self.pg0.add_stream(p)
4856 self.pg_enable_capture(self.pg_interfaces)
4858 self.pg1.get_capture(1)
4860 # Check if deterministic NAT44 closed the session
4861 dms = self.vapi.nat_det_map_dump()
4862 self.assertEqual(0, dms[0].ses_num)
4864 self.logger.error("TCP session termination failed")
4867 def test_tcp_session_close_detection_out(self):
4868 """ Deterministic NAT TCP session close from outside network """
4869 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4871 socket.inet_aton(self.nat_addr),
4873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4874 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4877 self.initiate_tcp_session(self.pg0, self.pg1)
4879 # close the session from outside
4881 # FIN packet out -> in
4882 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4883 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4884 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4886 self.pg1.add_stream(p)
4887 self.pg_enable_capture(self.pg_interfaces)
4889 self.pg0.get_capture(1)
4893 # ACK packet in -> out
4894 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4895 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4896 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4900 # ACK packet in -> out
4901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4902 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4903 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4907 self.pg0.add_stream(pkts)
4908 self.pg_enable_capture(self.pg_interfaces)
4910 self.pg1.get_capture(2)
4912 # ACK packet out -> in
4913 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4914 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4915 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4917 self.pg1.add_stream(p)
4918 self.pg_enable_capture(self.pg_interfaces)
4920 self.pg0.get_capture(1)
4922 # Check if deterministic NAT44 closed the session
4923 dms = self.vapi.nat_det_map_dump()
4924 self.assertEqual(0, dms[0].ses_num)
4926 self.logger.error("TCP session termination failed")
4929 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4930 def test_session_timeout(self):
4931 """ Deterministic NAT session timeouts """
4932 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4934 socket.inet_aton(self.nat_addr),
4936 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4937 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4940 self.initiate_tcp_session(self.pg0, self.pg1)
4941 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4942 pkts = self.create_stream_in(self.pg0, self.pg1)
4943 self.pg0.add_stream(pkts)
4944 self.pg_enable_capture(self.pg_interfaces)
4946 capture = self.pg1.get_capture(len(pkts))
4949 dms = self.vapi.nat_det_map_dump()
4950 self.assertEqual(0, dms[0].ses_num)
4952 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4953 def test_session_limit_per_user(self):
4954 """ Deterministic NAT maximum sessions per user limit """
4955 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4957 socket.inet_aton(self.nat_addr),
4959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4962 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4963 src_address=self.pg2.local_ip4n,
4965 template_interval=10)
4966 self.vapi.nat_ipfix()
4969 for port in range(1025, 2025):
4970 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4971 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4972 UDP(sport=port, dport=port))
4975 self.pg0.add_stream(pkts)
4976 self.pg_enable_capture(self.pg_interfaces)
4978 capture = self.pg1.get_capture(len(pkts))
4980 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4981 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4982 UDP(sport=3001, dport=3002))
4983 self.pg0.add_stream(p)
4984 self.pg_enable_capture(self.pg_interfaces)
4986 capture = self.pg1.assert_nothing_captured()
4988 # verify ICMP error packet
4989 capture = self.pg0.get_capture(1)
4991 self.assertTrue(p.haslayer(ICMP))
4993 self.assertEqual(icmp.type, 3)
4994 self.assertEqual(icmp.code, 1)
4995 self.assertTrue(icmp.haslayer(IPerror))
4996 inner_ip = icmp[IPerror]
4997 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4998 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5000 dms = self.vapi.nat_det_map_dump()
5002 self.assertEqual(1000, dms[0].ses_num)
5004 # verify IPFIX logging
5005 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5007 capture = self.pg2.get_capture(2)
5008 ipfix = IPFIXDecoder()
5009 # first load template
5011 self.assertTrue(p.haslayer(IPFIX))
5012 if p.haslayer(Template):
5013 ipfix.add_template(p.getlayer(Template))
5014 # verify events in data set
5016 if p.haslayer(Data):
5017 data = ipfix.decode_data_set(p.getlayer(Set))
5018 self.verify_ipfix_max_entries_per_user(data)
5020 def clear_nat_det(self):
5022 Clear deterministic NAT configuration.
5024 self.vapi.nat_ipfix(enable=0)
5025 self.vapi.nat_det_set_timeouts()
5026 deterministic_mappings = self.vapi.nat_det_map_dump()
5027 for dsm in deterministic_mappings:
5028 self.vapi.nat_det_add_del_map(dsm.in_addr,
5034 interfaces = self.vapi.nat44_interface_dump()
5035 for intf in interfaces:
5036 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5041 super(TestDeterministicNAT, self).tearDown()
5042 if not self.vpp_dead:
5043 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5045 self.vapi.cli("show nat44 deterministic mappings"))
5047 self.vapi.cli("show nat44 deterministic timeouts"))
5049 self.vapi.cli("show nat44 deterministic sessions"))
5050 self.clear_nat_det()
5053 class TestNAT64(MethodHolder):
5054 """ NAT64 Test Cases """
5057 def setUpConstants(cls):
5058 super(TestNAT64, cls).setUpConstants()
5059 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5060 "nat64 st hash buckets 256", "}"])
5063 def setUpClass(cls):
5064 super(TestNAT64, cls).setUpClass()
5067 cls.tcp_port_in = 6303
5068 cls.tcp_port_out = 6303
5069 cls.udp_port_in = 6304
5070 cls.udp_port_out = 6304
5071 cls.icmp_id_in = 6305
5072 cls.icmp_id_out = 6305
5073 cls.nat_addr = '10.0.0.3'
5074 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5076 cls.vrf1_nat_addr = '10.0.10.3'
5077 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5079 cls.ipfix_src_port = 4739
5080 cls.ipfix_domain_id = 1
5082 cls.create_pg_interfaces(range(5))
5083 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5084 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5085 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5087 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5089 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5091 cls.pg0.generate_remote_hosts(2)
5093 for i in cls.ip6_interfaces:
5096 i.configure_ipv6_neighbors()
5098 for i in cls.ip4_interfaces:
5104 cls.pg3.config_ip4()
5105 cls.pg3.resolve_arp()
5106 cls.pg3.config_ip6()
5107 cls.pg3.configure_ipv6_neighbors()
5110 super(TestNAT64, cls).tearDownClass()
5113 def test_pool(self):
5114 """ Add/delete address to NAT64 pool """
5115 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5117 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5119 addresses = self.vapi.nat64_pool_addr_dump()
5120 self.assertEqual(len(addresses), 1)
5121 self.assertEqual(addresses[0].address, nat_addr)
5123 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5125 addresses = self.vapi.nat64_pool_addr_dump()
5126 self.assertEqual(len(addresses), 0)
5128 def test_interface(self):
5129 """ Enable/disable NAT64 feature on the interface """
5130 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5131 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5133 interfaces = self.vapi.nat64_interface_dump()
5134 self.assertEqual(len(interfaces), 2)
5137 for intf in interfaces:
5138 if intf.sw_if_index == self.pg0.sw_if_index:
5139 self.assertEqual(intf.is_inside, 1)
5141 elif intf.sw_if_index == self.pg1.sw_if_index:
5142 self.assertEqual(intf.is_inside, 0)
5144 self.assertTrue(pg0_found)
5145 self.assertTrue(pg1_found)
5147 features = self.vapi.cli("show interface features pg0")
5148 self.assertNotEqual(features.find('nat64-in2out'), -1)
5149 features = self.vapi.cli("show interface features pg1")
5150 self.assertNotEqual(features.find('nat64-out2in'), -1)
5152 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5153 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5155 interfaces = self.vapi.nat64_interface_dump()
5156 self.assertEqual(len(interfaces), 0)
5158 def test_static_bib(self):
5159 """ Add/delete static BIB entry """
5160 in_addr = socket.inet_pton(socket.AF_INET6,
5161 '2001:db8:85a3::8a2e:370:7334')
5162 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5165 proto = IP_PROTOS.tcp
5167 self.vapi.nat64_add_del_static_bib(in_addr,
5172 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5177 self.assertEqual(bibe.i_addr, in_addr)
5178 self.assertEqual(bibe.o_addr, out_addr)
5179 self.assertEqual(bibe.i_port, in_port)
5180 self.assertEqual(bibe.o_port, out_port)
5181 self.assertEqual(static_bib_num, 1)
5183 self.vapi.nat64_add_del_static_bib(in_addr,
5189 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5194 self.assertEqual(static_bib_num, 0)
5196 def test_set_timeouts(self):
5197 """ Set NAT64 timeouts """
5198 # verify default values
5199 timeouts = self.vapi.nat64_get_timeouts()
5200 self.assertEqual(timeouts.udp, 300)
5201 self.assertEqual(timeouts.icmp, 60)
5202 self.assertEqual(timeouts.tcp_trans, 240)
5203 self.assertEqual(timeouts.tcp_est, 7440)
5204 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5206 # set and verify custom values
5207 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5208 tcp_est=7450, tcp_incoming_syn=10)
5209 timeouts = self.vapi.nat64_get_timeouts()
5210 self.assertEqual(timeouts.udp, 200)
5211 self.assertEqual(timeouts.icmp, 30)
5212 self.assertEqual(timeouts.tcp_trans, 250)
5213 self.assertEqual(timeouts.tcp_est, 7450)
5214 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5216 def test_dynamic(self):
5217 """ NAT64 dynamic translation test """
5218 self.tcp_port_in = 6303
5219 self.udp_port_in = 6304
5220 self.icmp_id_in = 6305
5222 ses_num_start = self.nat64_get_ses_num()
5224 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5226 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5227 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5230 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5231 self.pg0.add_stream(pkts)
5232 self.pg_enable_capture(self.pg_interfaces)
5234 capture = self.pg1.get_capture(len(pkts))
5235 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5236 dst_ip=self.pg1.remote_ip4)
5239 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5240 self.pg1.add_stream(pkts)
5241 self.pg_enable_capture(self.pg_interfaces)
5243 capture = self.pg0.get_capture(len(pkts))
5244 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5245 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5248 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5249 self.pg0.add_stream(pkts)
5250 self.pg_enable_capture(self.pg_interfaces)
5252 capture = self.pg1.get_capture(len(pkts))
5253 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5254 dst_ip=self.pg1.remote_ip4)
5257 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5258 self.pg1.add_stream(pkts)
5259 self.pg_enable_capture(self.pg_interfaces)
5261 capture = self.pg0.get_capture(len(pkts))
5262 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5264 ses_num_end = self.nat64_get_ses_num()
5266 self.assertEqual(ses_num_end - ses_num_start, 3)
5268 # tenant with specific VRF
5269 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5270 self.vrf1_nat_addr_n,
5271 vrf_id=self.vrf1_id)
5272 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5274 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5275 self.pg2.add_stream(pkts)
5276 self.pg_enable_capture(self.pg_interfaces)
5278 capture = self.pg1.get_capture(len(pkts))
5279 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5280 dst_ip=self.pg1.remote_ip4)
5282 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5283 self.pg1.add_stream(pkts)
5284 self.pg_enable_capture(self.pg_interfaces)
5286 capture = self.pg2.get_capture(len(pkts))
5287 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5289 def test_static(self):
5290 """ NAT64 static translation test """
5291 self.tcp_port_in = 60303
5292 self.udp_port_in = 60304
5293 self.icmp_id_in = 60305
5294 self.tcp_port_out = 60303
5295 self.udp_port_out = 60304
5296 self.icmp_id_out = 60305
5298 ses_num_start = self.nat64_get_ses_num()
5300 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5302 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5303 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5305 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5310 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5315 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5322 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5323 self.pg0.add_stream(pkts)
5324 self.pg_enable_capture(self.pg_interfaces)
5326 capture = self.pg1.get_capture(len(pkts))
5327 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5328 dst_ip=self.pg1.remote_ip4, same_port=True)
5331 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5332 self.pg1.add_stream(pkts)
5333 self.pg_enable_capture(self.pg_interfaces)
5335 capture = self.pg0.get_capture(len(pkts))
5336 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5337 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5339 ses_num_end = self.nat64_get_ses_num()
5341 self.assertEqual(ses_num_end - ses_num_start, 3)
5343 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5344 def test_session_timeout(self):
5345 """ NAT64 session timeout """
5346 self.icmp_id_in = 1234
5347 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5349 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5350 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5351 self.vapi.nat64_set_timeouts(icmp=5)
5353 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5354 self.pg0.add_stream(pkts)
5355 self.pg_enable_capture(self.pg_interfaces)
5357 capture = self.pg1.get_capture(len(pkts))
5359 ses_num_before_timeout = self.nat64_get_ses_num()
5363 # ICMP session after timeout
5364 ses_num_after_timeout = self.nat64_get_ses_num()
5365 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5367 def test_icmp_error(self):
5368 """ NAT64 ICMP Error message translation """
5369 self.tcp_port_in = 6303
5370 self.udp_port_in = 6304
5371 self.icmp_id_in = 6305
5373 ses_num_start = self.nat64_get_ses_num()
5375 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5377 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5378 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5380 # send some packets to create sessions
5381 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5382 self.pg0.add_stream(pkts)
5383 self.pg_enable_capture(self.pg_interfaces)
5385 capture_ip4 = self.pg1.get_capture(len(pkts))
5386 self.verify_capture_out(capture_ip4,
5387 nat_ip=self.nat_addr,
5388 dst_ip=self.pg1.remote_ip4)
5390 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5391 self.pg1.add_stream(pkts)
5392 self.pg_enable_capture(self.pg_interfaces)
5394 capture_ip6 = self.pg0.get_capture(len(pkts))
5395 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5396 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5397 self.pg0.remote_ip6)
5400 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5401 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5402 ICMPv6DestUnreach(code=1) /
5403 packet[IPv6] for packet in capture_ip6]
5404 self.pg0.add_stream(pkts)
5405 self.pg_enable_capture(self.pg_interfaces)
5407 capture = self.pg1.get_capture(len(pkts))
5408 for packet in capture:
5410 self.assertEqual(packet[IP].src, self.nat_addr)
5411 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5412 self.assertEqual(packet[ICMP].type, 3)
5413 self.assertEqual(packet[ICMP].code, 13)
5414 inner = packet[IPerror]
5415 self.assertEqual(inner.src, self.pg1.remote_ip4)
5416 self.assertEqual(inner.dst, self.nat_addr)
5417 self.check_icmp_checksum(packet)
5418 if inner.haslayer(TCPerror):
5419 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5420 elif inner.haslayer(UDPerror):
5421 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5423 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5425 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5429 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5430 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5431 ICMP(type=3, code=13) /
5432 packet[IP] for packet in capture_ip4]
5433 self.pg1.add_stream(pkts)
5434 self.pg_enable_capture(self.pg_interfaces)
5436 capture = self.pg0.get_capture(len(pkts))
5437 for packet in capture:
5439 self.assertEqual(packet[IPv6].src, ip.src)
5440 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5441 icmp = packet[ICMPv6DestUnreach]
5442 self.assertEqual(icmp.code, 1)
5443 inner = icmp[IPerror6]
5444 self.assertEqual(inner.src, self.pg0.remote_ip6)
5445 self.assertEqual(inner.dst, ip.src)
5446 self.check_icmpv6_checksum(packet)
5447 if inner.haslayer(TCPerror):
5448 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5449 elif inner.haslayer(UDPerror):
5450 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5452 self.assertEqual(inner[ICMPv6EchoRequest].id,
5455 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5458 def test_hairpinning(self):
5459 """ NAT64 hairpinning """
5461 client = self.pg0.remote_hosts[0]
5462 server = self.pg0.remote_hosts[1]
5463 server_tcp_in_port = 22
5464 server_tcp_out_port = 4022
5465 server_udp_in_port = 23
5466 server_udp_out_port = 4023
5467 client_tcp_in_port = 1234
5468 client_udp_in_port = 1235
5469 client_tcp_out_port = 0
5470 client_udp_out_port = 0
5471 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5472 nat_addr_ip6 = ip.src
5474 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5476 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5477 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5479 self.vapi.nat64_add_del_static_bib(server.ip6n,
5482 server_tcp_out_port,
5484 self.vapi.nat64_add_del_static_bib(server.ip6n,
5487 server_udp_out_port,
5492 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5493 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5494 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5496 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5497 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5498 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5500 self.pg0.add_stream(pkts)
5501 self.pg_enable_capture(self.pg_interfaces)
5503 capture = self.pg0.get_capture(len(pkts))
5504 for packet in capture:
5506 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5507 self.assertEqual(packet[IPv6].dst, server.ip6)
5508 if packet.haslayer(TCP):
5509 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5510 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5511 self.check_tcp_checksum(packet)
5512 client_tcp_out_port = packet[TCP].sport
5514 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5515 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5516 self.check_udp_checksum(packet)
5517 client_udp_out_port = packet[UDP].sport
5519 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5524 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5525 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5526 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5528 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5529 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5530 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5532 self.pg0.add_stream(pkts)
5533 self.pg_enable_capture(self.pg_interfaces)
5535 capture = self.pg0.get_capture(len(pkts))
5536 for packet in capture:
5538 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5539 self.assertEqual(packet[IPv6].dst, client.ip6)
5540 if packet.haslayer(TCP):
5541 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5542 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5543 self.check_tcp_checksum(packet)
5545 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5546 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5547 self.check_udp_checksum(packet)
5549 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5554 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5555 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5556 ICMPv6DestUnreach(code=1) /
5557 packet[IPv6] for packet in capture]
5558 self.pg0.add_stream(pkts)
5559 self.pg_enable_capture(self.pg_interfaces)
5561 capture = self.pg0.get_capture(len(pkts))
5562 for packet in capture:
5564 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5565 self.assertEqual(packet[IPv6].dst, server.ip6)
5566 icmp = packet[ICMPv6DestUnreach]
5567 self.assertEqual(icmp.code, 1)
5568 inner = icmp[IPerror6]
5569 self.assertEqual(inner.src, server.ip6)
5570 self.assertEqual(inner.dst, nat_addr_ip6)
5571 self.check_icmpv6_checksum(packet)
5572 if inner.haslayer(TCPerror):
5573 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5574 self.assertEqual(inner[TCPerror].dport,
5575 client_tcp_out_port)
5577 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5578 self.assertEqual(inner[UDPerror].dport,
5579 client_udp_out_port)
5581 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5584 def test_prefix(self):
5585 """ NAT64 Network-Specific Prefix """
5587 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5589 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5590 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5591 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5592 self.vrf1_nat_addr_n,
5593 vrf_id=self.vrf1_id)
5594 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5597 global_pref64 = "2001:db8::"
5598 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5599 global_pref64_len = 32
5600 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5602 prefix = self.vapi.nat64_prefix_dump()
5603 self.assertEqual(len(prefix), 1)
5604 self.assertEqual(prefix[0].prefix, global_pref64_n)
5605 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5606 self.assertEqual(prefix[0].vrf_id, 0)
5608 # Add tenant specific prefix
5609 vrf1_pref64 = "2001:db8:122:300::"
5610 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5611 vrf1_pref64_len = 56
5612 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5614 vrf_id=self.vrf1_id)
5615 prefix = self.vapi.nat64_prefix_dump()
5616 self.assertEqual(len(prefix), 2)
5619 pkts = self.create_stream_in_ip6(self.pg0,
5622 plen=global_pref64_len)
5623 self.pg0.add_stream(pkts)
5624 self.pg_enable_capture(self.pg_interfaces)
5626 capture = self.pg1.get_capture(len(pkts))
5627 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5628 dst_ip=self.pg1.remote_ip4)
5630 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5631 self.pg1.add_stream(pkts)
5632 self.pg_enable_capture(self.pg_interfaces)
5634 capture = self.pg0.get_capture(len(pkts))
5635 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5638 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5640 # Tenant specific prefix
5641 pkts = self.create_stream_in_ip6(self.pg2,
5644 plen=vrf1_pref64_len)
5645 self.pg2.add_stream(pkts)
5646 self.pg_enable_capture(self.pg_interfaces)
5648 capture = self.pg1.get_capture(len(pkts))
5649 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5650 dst_ip=self.pg1.remote_ip4)
5652 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5653 self.pg1.add_stream(pkts)
5654 self.pg_enable_capture(self.pg_interfaces)
5656 capture = self.pg2.get_capture(len(pkts))
5657 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5660 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5662 def test_unknown_proto(self):
5663 """ NAT64 translate packet with unknown protocol """
5665 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5667 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5668 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5669 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5672 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5673 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5674 TCP(sport=self.tcp_port_in, dport=20))
5675 self.pg0.add_stream(p)
5676 self.pg_enable_capture(self.pg_interfaces)
5678 p = self.pg1.get_capture(1)
5680 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5681 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5683 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5684 TCP(sport=1234, dport=1234))
5685 self.pg0.add_stream(p)
5686 self.pg_enable_capture(self.pg_interfaces)
5688 p = self.pg1.get_capture(1)
5691 self.assertEqual(packet[IP].src, self.nat_addr)
5692 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5693 self.assertTrue(packet.haslayer(GRE))
5694 self.check_ip_checksum(packet)
5696 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5700 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5701 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5703 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5704 TCP(sport=1234, dport=1234))
5705 self.pg1.add_stream(p)
5706 self.pg_enable_capture(self.pg_interfaces)
5708 p = self.pg0.get_capture(1)
5711 self.assertEqual(packet[IPv6].src, remote_ip6)
5712 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5713 self.assertEqual(packet[IPv6].nh, 47)
5715 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5718 def test_hairpinning_unknown_proto(self):
5719 """ NAT64 translate packet with unknown protocol - hairpinning """
5721 client = self.pg0.remote_hosts[0]
5722 server = self.pg0.remote_hosts[1]
5723 server_tcp_in_port = 22
5724 server_tcp_out_port = 4022
5725 client_tcp_in_port = 1234
5726 client_tcp_out_port = 1235
5727 server_nat_ip = "10.0.0.100"
5728 client_nat_ip = "10.0.0.110"
5729 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5730 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5731 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5732 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5734 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5736 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5737 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5739 self.vapi.nat64_add_del_static_bib(server.ip6n,
5742 server_tcp_out_port,
5745 self.vapi.nat64_add_del_static_bib(server.ip6n,
5751 self.vapi.nat64_add_del_static_bib(client.ip6n,
5754 client_tcp_out_port,
5758 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5759 IPv6(src=client.ip6, dst=server_nat_ip6) /
5760 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5761 self.pg0.add_stream(p)
5762 self.pg_enable_capture(self.pg_interfaces)
5764 p = self.pg0.get_capture(1)
5766 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5767 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5769 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5770 TCP(sport=1234, dport=1234))
5771 self.pg0.add_stream(p)
5772 self.pg_enable_capture(self.pg_interfaces)
5774 p = self.pg0.get_capture(1)
5777 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5778 self.assertEqual(packet[IPv6].dst, server.ip6)
5779 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5781 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5785 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5786 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5788 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5789 TCP(sport=1234, dport=1234))
5790 self.pg0.add_stream(p)
5791 self.pg_enable_capture(self.pg_interfaces)
5793 p = self.pg0.get_capture(1)
5796 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5797 self.assertEqual(packet[IPv6].dst, client.ip6)
5798 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5800 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5803 def test_one_armed_nat64(self):
5804 """ One armed NAT64 """
5806 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5810 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5812 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5813 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5816 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5817 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5818 TCP(sport=12345, dport=80))
5819 self.pg3.add_stream(p)
5820 self.pg_enable_capture(self.pg_interfaces)
5822 capture = self.pg3.get_capture(1)
5827 self.assertEqual(ip.src, self.nat_addr)
5828 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5829 self.assertNotEqual(tcp.sport, 12345)
5830 external_port = tcp.sport
5831 self.assertEqual(tcp.dport, 80)
5832 self.check_tcp_checksum(p)
5833 self.check_ip_checksum(p)
5835 self.logger.error(ppp("Unexpected or invalid packet:", p))
5839 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5840 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5841 TCP(sport=80, dport=external_port))
5842 self.pg3.add_stream(p)
5843 self.pg_enable_capture(self.pg_interfaces)
5845 capture = self.pg3.get_capture(1)
5850 self.assertEqual(ip.src, remote_host_ip6)
5851 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5852 self.assertEqual(tcp.sport, 80)
5853 self.assertEqual(tcp.dport, 12345)
5854 self.check_tcp_checksum(p)
5856 self.logger.error(ppp("Unexpected or invalid packet:", p))
5859 def test_frag_in_order(self):
5860 """ NAT64 translate fragments arriving in order """
5861 self.tcp_port_in = random.randint(1025, 65535)
5863 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5865 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5866 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5868 reass = self.vapi.nat_reass_dump()
5869 reass_n_start = len(reass)
5873 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5874 self.tcp_port_in, 20, data)
5875 self.pg0.add_stream(pkts)
5876 self.pg_enable_capture(self.pg_interfaces)
5878 frags = self.pg1.get_capture(len(pkts))
5879 p = self.reass_frags_and_verify(frags,
5881 self.pg1.remote_ip4)
5882 self.assertEqual(p[TCP].dport, 20)
5883 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5884 self.tcp_port_out = p[TCP].sport
5885 self.assertEqual(data, p[Raw].load)
5888 data = "A" * 4 + "b" * 16 + "C" * 3
5889 pkts = self.create_stream_frag(self.pg1,
5894 self.pg1.add_stream(pkts)
5895 self.pg_enable_capture(self.pg_interfaces)
5897 frags = self.pg0.get_capture(len(pkts))
5898 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5899 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5900 self.assertEqual(p[TCP].sport, 20)
5901 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5902 self.assertEqual(data, p[Raw].load)
5904 reass = self.vapi.nat_reass_dump()
5905 reass_n_end = len(reass)
5907 self.assertEqual(reass_n_end - reass_n_start, 2)
5909 def test_reass_hairpinning(self):
5910 """ NAT64 fragments hairpinning """
5912 client = self.pg0.remote_hosts[0]
5913 server = self.pg0.remote_hosts[1]
5914 server_in_port = random.randint(1025, 65535)
5915 server_out_port = random.randint(1025, 65535)
5916 client_in_port = random.randint(1025, 65535)
5917 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5918 nat_addr_ip6 = ip.src
5920 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5922 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5923 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5925 # add static BIB entry for server
5926 self.vapi.nat64_add_del_static_bib(server.ip6n,
5932 # send packet from host to server
5933 pkts = self.create_stream_frag_ip6(self.pg0,
5938 self.pg0.add_stream(pkts)
5939 self.pg_enable_capture(self.pg_interfaces)
5941 frags = self.pg0.get_capture(len(pkts))
5942 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5943 self.assertNotEqual(p[TCP].sport, client_in_port)
5944 self.assertEqual(p[TCP].dport, server_in_port)
5945 self.assertEqual(data, p[Raw].load)
5947 def test_frag_out_of_order(self):
5948 """ NAT64 translate fragments arriving out of order """
5949 self.tcp_port_in = random.randint(1025, 65535)
5951 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5953 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5954 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5958 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5959 self.tcp_port_in, 20, data)
5961 self.pg0.add_stream(pkts)
5962 self.pg_enable_capture(self.pg_interfaces)
5964 frags = self.pg1.get_capture(len(pkts))
5965 p = self.reass_frags_and_verify(frags,
5967 self.pg1.remote_ip4)
5968 self.assertEqual(p[TCP].dport, 20)
5969 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5970 self.tcp_port_out = p[TCP].sport
5971 self.assertEqual(data, p[Raw].load)
5974 data = "A" * 4 + "B" * 16 + "C" * 3
5975 pkts = self.create_stream_frag(self.pg1,
5981 self.pg1.add_stream(pkts)
5982 self.pg_enable_capture(self.pg_interfaces)
5984 frags = self.pg0.get_capture(len(pkts))
5985 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5986 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5987 self.assertEqual(p[TCP].sport, 20)
5988 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5989 self.assertEqual(data, p[Raw].load)
5991 def test_interface_addr(self):
5992 """ Acquire NAT64 pool addresses from interface """
5993 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5995 # no address in NAT64 pool
5996 adresses = self.vapi.nat44_address_dump()
5997 self.assertEqual(0, len(adresses))
5999 # configure interface address and check NAT64 address pool
6000 self.pg4.config_ip4()
6001 addresses = self.vapi.nat64_pool_addr_dump()
6002 self.assertEqual(len(addresses), 1)
6003 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6005 # remove interface address and check NAT64 address pool
6006 self.pg4.unconfig_ip4()
6007 addresses = self.vapi.nat64_pool_addr_dump()
6008 self.assertEqual(0, len(adresses))
6010 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6011 def test_ipfix_max_bibs_sessions(self):
6012 """ IPFIX logging maximum session and BIB entries exceeded """
6015 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6019 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6021 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6022 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6026 for i in range(0, max_bibs):
6027 src = "fd01:aa::%x" % (i)
6028 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6029 IPv6(src=src, dst=remote_host_ip6) /
6030 TCP(sport=12345, dport=80))
6032 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6033 IPv6(src=src, dst=remote_host_ip6) /
6034 TCP(sport=12345, dport=22))
6036 self.pg0.add_stream(pkts)
6037 self.pg_enable_capture(self.pg_interfaces)
6039 self.pg1.get_capture(max_sessions)
6041 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6042 src_address=self.pg3.local_ip4n,
6044 template_interval=10)
6045 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6046 src_port=self.ipfix_src_port)
6048 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6049 IPv6(src=src, dst=remote_host_ip6) /
6050 TCP(sport=12345, dport=25))
6051 self.pg0.add_stream(p)
6052 self.pg_enable_capture(self.pg_interfaces)
6054 self.pg1.get_capture(0)
6055 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6056 capture = self.pg3.get_capture(9)
6057 ipfix = IPFIXDecoder()
6058 # first load template
6060 self.assertTrue(p.haslayer(IPFIX))
6061 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6062 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6063 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6064 self.assertEqual(p[UDP].dport, 4739)
6065 self.assertEqual(p[IPFIX].observationDomainID,
6066 self.ipfix_domain_id)
6067 if p.haslayer(Template):
6068 ipfix.add_template(p.getlayer(Template))
6069 # verify events in data set
6071 if p.haslayer(Data):
6072 data = ipfix.decode_data_set(p.getlayer(Set))
6073 self.verify_ipfix_max_sessions(data, max_sessions)
6075 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6076 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6077 TCP(sport=12345, dport=80))
6078 self.pg0.add_stream(p)
6079 self.pg_enable_capture(self.pg_interfaces)
6081 self.pg1.get_capture(0)
6082 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6083 capture = self.pg3.get_capture(1)
6084 # verify events in data set
6086 self.assertTrue(p.haslayer(IPFIX))
6087 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6088 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6089 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6090 self.assertEqual(p[UDP].dport, 4739)
6091 self.assertEqual(p[IPFIX].observationDomainID,
6092 self.ipfix_domain_id)
6093 if p.haslayer(Data):
6094 data = ipfix.decode_data_set(p.getlayer(Set))
6095 self.verify_ipfix_max_bibs(data, max_bibs)
6097 def test_ipfix_max_frags(self):
6098 """ IPFIX logging maximum fragments pending reassembly exceeded """
6099 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6101 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6102 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6103 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6104 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6105 src_address=self.pg3.local_ip4n,
6107 template_interval=10)
6108 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6109 src_port=self.ipfix_src_port)
6112 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6113 self.tcp_port_in, 20, data)
6114 self.pg0.add_stream(pkts[-1])
6115 self.pg_enable_capture(self.pg_interfaces)
6117 self.pg1.get_capture(0)
6118 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6119 capture = self.pg3.get_capture(9)
6120 ipfix = IPFIXDecoder()
6121 # first load template
6123 self.assertTrue(p.haslayer(IPFIX))
6124 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6125 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6126 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6127 self.assertEqual(p[UDP].dport, 4739)
6128 self.assertEqual(p[IPFIX].observationDomainID,
6129 self.ipfix_domain_id)
6130 if p.haslayer(Template):
6131 ipfix.add_template(p.getlayer(Template))
6132 # verify events in data set
6134 if p.haslayer(Data):
6135 data = ipfix.decode_data_set(p.getlayer(Set))
6136 self.verify_ipfix_max_fragments_ip6(data, 0,
6137 self.pg0.remote_ip6n)
6139 def test_ipfix_bib_ses(self):
6140 """ IPFIX logging NAT64 BIB/session create and delete events """
6141 self.tcp_port_in = random.randint(1025, 65535)
6142 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6146 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6148 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6149 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6150 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6151 src_address=self.pg3.local_ip4n,
6153 template_interval=10)
6154 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6155 src_port=self.ipfix_src_port)
6158 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6159 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6160 TCP(sport=self.tcp_port_in, dport=25))
6161 self.pg0.add_stream(p)
6162 self.pg_enable_capture(self.pg_interfaces)
6164 p = self.pg1.get_capture(1)
6165 self.tcp_port_out = p[0][TCP].sport
6166 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6167 capture = self.pg3.get_capture(10)
6168 ipfix = IPFIXDecoder()
6169 # first load template
6171 self.assertTrue(p.haslayer(IPFIX))
6172 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6173 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6174 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6175 self.assertEqual(p[UDP].dport, 4739)
6176 self.assertEqual(p[IPFIX].observationDomainID,
6177 self.ipfix_domain_id)
6178 if p.haslayer(Template):
6179 ipfix.add_template(p.getlayer(Template))
6180 # verify events in data set
6182 if p.haslayer(Data):
6183 data = ipfix.decode_data_set(p.getlayer(Set))
6184 if ord(data[0][230]) == 10:
6185 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6186 elif ord(data[0][230]) == 6:
6187 self.verify_ipfix_nat64_ses(data,
6189 self.pg0.remote_ip6n,
6190 self.pg1.remote_ip4,
6193 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6196 self.pg_enable_capture(self.pg_interfaces)
6197 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6200 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6201 capture = self.pg3.get_capture(2)
6202 # verify events in data set
6204 self.assertTrue(p.haslayer(IPFIX))
6205 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6206 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6207 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6208 self.assertEqual(p[UDP].dport, 4739)
6209 self.assertEqual(p[IPFIX].observationDomainID,
6210 self.ipfix_domain_id)
6211 if p.haslayer(Data):
6212 data = ipfix.decode_data_set(p.getlayer(Set))
6213 if ord(data[0][230]) == 11:
6214 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6215 elif ord(data[0][230]) == 7:
6216 self.verify_ipfix_nat64_ses(data,
6218 self.pg0.remote_ip6n,
6219 self.pg1.remote_ip4,
6222 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6224 def nat64_get_ses_num(self):
6226 Return number of active NAT64 sessions.
6228 st = self.vapi.nat64_st_dump()
6231 def clear_nat64(self):
6233 Clear NAT64 configuration.
6235 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6236 domain_id=self.ipfix_domain_id)
6237 self.ipfix_src_port = 4739
6238 self.ipfix_domain_id = 1
6240 self.vapi.nat64_set_timeouts()
6242 interfaces = self.vapi.nat64_interface_dump()
6243 for intf in interfaces:
6244 if intf.is_inside > 1:
6245 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6248 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6252 bib = self.vapi.nat64_bib_dump(255)
6255 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6263 adresses = self.vapi.nat64_pool_addr_dump()
6264 for addr in adresses:
6265 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6270 prefixes = self.vapi.nat64_prefix_dump()
6271 for prefix in prefixes:
6272 self.vapi.nat64_add_del_prefix(prefix.prefix,
6274 vrf_id=prefix.vrf_id,
6278 super(TestNAT64, self).tearDown()
6279 if not self.vpp_dead:
6280 self.logger.info(self.vapi.cli("show nat64 pool"))
6281 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6282 self.logger.info(self.vapi.cli("show nat64 prefix"))
6283 self.logger.info(self.vapi.cli("show nat64 bib all"))
6284 self.logger.info(self.vapi.cli("show nat64 session table all"))
6285 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6289 class TestDSlite(MethodHolder):
6290 """ DS-Lite Test Cases """
6293 def setUpClass(cls):
6294 super(TestDSlite, cls).setUpClass()
6297 cls.nat_addr = '10.0.0.3'
6298 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6300 cls.create_pg_interfaces(range(2))
6302 cls.pg0.config_ip4()
6303 cls.pg0.resolve_arp()
6305 cls.pg1.config_ip6()
6306 cls.pg1.generate_remote_hosts(2)
6307 cls.pg1.configure_ipv6_neighbors()
6310 super(TestDSlite, cls).tearDownClass()
6313 def test_dslite(self):
6314 """ Test DS-Lite """
6315 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6317 aftr_ip4 = '192.0.0.1'
6318 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6319 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6320 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6321 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6324 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6325 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6326 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6327 UDP(sport=20000, dport=10000))
6328 self.pg1.add_stream(p)
6329 self.pg_enable_capture(self.pg_interfaces)
6331 capture = self.pg0.get_capture(1)
6332 capture = capture[0]
6333 self.assertFalse(capture.haslayer(IPv6))
6334 self.assertEqual(capture[IP].src, self.nat_addr)
6335 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6336 self.assertNotEqual(capture[UDP].sport, 20000)
6337 self.assertEqual(capture[UDP].dport, 10000)
6338 self.check_ip_checksum(capture)
6339 out_port = capture[UDP].sport
6341 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6342 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6343 UDP(sport=10000, dport=out_port))
6344 self.pg0.add_stream(p)
6345 self.pg_enable_capture(self.pg_interfaces)
6347 capture = self.pg1.get_capture(1)
6348 capture = capture[0]
6349 self.assertEqual(capture[IPv6].src, aftr_ip6)
6350 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6351 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6352 self.assertEqual(capture[IP].dst, '192.168.1.1')
6353 self.assertEqual(capture[UDP].sport, 10000)
6354 self.assertEqual(capture[UDP].dport, 20000)
6355 self.check_ip_checksum(capture)
6358 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6359 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6360 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6361 TCP(sport=20001, dport=10001))
6362 self.pg1.add_stream(p)
6363 self.pg_enable_capture(self.pg_interfaces)
6365 capture = self.pg0.get_capture(1)
6366 capture = capture[0]
6367 self.assertFalse(capture.haslayer(IPv6))
6368 self.assertEqual(capture[IP].src, self.nat_addr)
6369 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6370 self.assertNotEqual(capture[TCP].sport, 20001)
6371 self.assertEqual(capture[TCP].dport, 10001)
6372 self.check_ip_checksum(capture)
6373 self.check_tcp_checksum(capture)
6374 out_port = capture[TCP].sport
6376 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6377 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6378 TCP(sport=10001, dport=out_port))
6379 self.pg0.add_stream(p)
6380 self.pg_enable_capture(self.pg_interfaces)
6382 capture = self.pg1.get_capture(1)
6383 capture = capture[0]
6384 self.assertEqual(capture[IPv6].src, aftr_ip6)
6385 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6386 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6387 self.assertEqual(capture[IP].dst, '192.168.1.1')
6388 self.assertEqual(capture[TCP].sport, 10001)
6389 self.assertEqual(capture[TCP].dport, 20001)
6390 self.check_ip_checksum(capture)
6391 self.check_tcp_checksum(capture)
6394 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6395 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6396 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6397 ICMP(id=4000, type='echo-request'))
6398 self.pg1.add_stream(p)
6399 self.pg_enable_capture(self.pg_interfaces)
6401 capture = self.pg0.get_capture(1)
6402 capture = capture[0]
6403 self.assertFalse(capture.haslayer(IPv6))
6404 self.assertEqual(capture[IP].src, self.nat_addr)
6405 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6406 self.assertNotEqual(capture[ICMP].id, 4000)
6407 self.check_ip_checksum(capture)
6408 self.check_icmp_checksum(capture)
6409 out_id = capture[ICMP].id
6411 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6412 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6413 ICMP(id=out_id, type='echo-reply'))
6414 self.pg0.add_stream(p)
6415 self.pg_enable_capture(self.pg_interfaces)
6417 capture = self.pg1.get_capture(1)
6418 capture = capture[0]
6419 self.assertEqual(capture[IPv6].src, aftr_ip6)
6420 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6421 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6422 self.assertEqual(capture[IP].dst, '192.168.1.1')
6423 self.assertEqual(capture[ICMP].id, 4000)
6424 self.check_ip_checksum(capture)
6425 self.check_icmp_checksum(capture)
6427 # ping DS-Lite AFTR tunnel endpoint address
6428 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6429 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6430 ICMPv6EchoRequest())
6431 self.pg1.add_stream(p)
6432 self.pg_enable_capture(self.pg_interfaces)
6434 capture = self.pg1.get_capture(1)
6435 self.assertEqual(1, len(capture))
6436 capture = capture[0]
6437 self.assertEqual(capture[IPv6].src, aftr_ip6)
6438 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6439 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6442 super(TestDSlite, self).tearDown()
6443 if not self.vpp_dead:
6444 self.logger.info(self.vapi.cli("show dslite pool"))
6446 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6447 self.logger.info(self.vapi.cli("show dslite sessions"))
6450 class TestDSliteCE(MethodHolder):
6451 """ DS-Lite CE Test Cases """
6454 def setUpConstants(cls):
6455 super(TestDSliteCE, cls).setUpConstants()
6456 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6459 def setUpClass(cls):
6460 super(TestDSliteCE, cls).setUpClass()
6463 cls.create_pg_interfaces(range(2))
6465 cls.pg0.config_ip4()
6466 cls.pg0.resolve_arp()
6468 cls.pg1.config_ip6()
6469 cls.pg1.generate_remote_hosts(1)
6470 cls.pg1.configure_ipv6_neighbors()
6473 super(TestDSliteCE, cls).tearDownClass()
6476 def test_dslite_ce(self):
6477 """ Test DS-Lite CE """
6479 b4_ip4 = '192.0.0.2'
6480 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6481 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6482 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6483 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6485 aftr_ip4 = '192.0.0.1'
6486 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6487 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6488 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6489 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6491 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6492 dst_address_length=128,
6493 next_hop_address=self.pg1.remote_ip6n,
6494 next_hop_sw_if_index=self.pg1.sw_if_index,
6498 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6499 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6500 UDP(sport=10000, dport=20000))
6501 self.pg0.add_stream(p)
6502 self.pg_enable_capture(self.pg_interfaces)
6504 capture = self.pg1.get_capture(1)
6505 capture = capture[0]
6506 self.assertEqual(capture[IPv6].src, b4_ip6)
6507 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6508 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6509 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6510 self.assertEqual(capture[UDP].sport, 10000)
6511 self.assertEqual(capture[UDP].dport, 20000)
6512 self.check_ip_checksum(capture)
6515 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6516 IPv6(dst=b4_ip6, src=aftr_ip6) /
6517 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6518 UDP(sport=20000, dport=10000))
6519 self.pg1.add_stream(p)
6520 self.pg_enable_capture(self.pg_interfaces)
6522 capture = self.pg0.get_capture(1)
6523 capture = capture[0]
6524 self.assertFalse(capture.haslayer(IPv6))
6525 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6526 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6527 self.assertEqual(capture[UDP].sport, 20000)
6528 self.assertEqual(capture[UDP].dport, 10000)
6529 self.check_ip_checksum(capture)
6531 # ping DS-Lite B4 tunnel endpoint address
6532 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6533 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6534 ICMPv6EchoRequest())
6535 self.pg1.add_stream(p)
6536 self.pg_enable_capture(self.pg_interfaces)
6538 capture = self.pg1.get_capture(1)
6539 self.assertEqual(1, len(capture))
6540 capture = capture[0]
6541 self.assertEqual(capture[IPv6].src, b4_ip6)
6542 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6543 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6546 super(TestDSliteCE, self).tearDown()
6547 if not self.vpp_dead:
6549 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6551 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6554 class TestNAT66(MethodHolder):
6555 """ NAT66 Test Cases """
6558 def setUpClass(cls):
6559 super(TestNAT66, cls).setUpClass()
6562 cls.nat_addr = 'fd01:ff::2'
6563 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6565 cls.create_pg_interfaces(range(2))
6566 cls.interfaces = list(cls.pg_interfaces)
6568 for i in cls.interfaces:
6571 i.configure_ipv6_neighbors()
6574 super(TestNAT66, cls).tearDownClass()
6577 def test_static(self):
6578 """ 1:1 NAT66 test """
6579 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6580 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6581 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6586 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6587 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6590 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6591 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6594 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6595 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6596 ICMPv6EchoRequest())
6598 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6599 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6600 GRE() / IP() / TCP())
6602 self.pg0.add_stream(pkts)
6603 self.pg_enable_capture(self.pg_interfaces)
6605 capture = self.pg1.get_capture(len(pkts))
6606 for packet in capture:
6608 self.assertEqual(packet[IPv6].src, self.nat_addr)
6609 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6610 if packet.haslayer(TCP):
6611 self.check_tcp_checksum(packet)
6612 elif packet.haslayer(UDP):
6613 self.check_udp_checksum(packet)
6614 elif packet.haslayer(ICMPv6EchoRequest):
6615 self.check_icmpv6_checksum(packet)
6617 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6622 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6623 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6626 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6627 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6630 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6631 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6634 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6635 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6636 GRE() / IP() / TCP())
6638 self.pg1.add_stream(pkts)
6639 self.pg_enable_capture(self.pg_interfaces)
6641 capture = self.pg0.get_capture(len(pkts))
6642 for packet in capture:
6644 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6645 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6646 if packet.haslayer(TCP):
6647 self.check_tcp_checksum(packet)
6648 elif packet.haslayer(UDP):
6649 self.check_udp_checksum(packet)
6650 elif packet.haslayer(ICMPv6EchoReply):
6651 self.check_icmpv6_checksum(packet)
6653 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6656 sm = self.vapi.nat66_static_mapping_dump()
6657 self.assertEqual(len(sm), 1)
6658 self.assertEqual(sm[0].total_pkts, 8)
6660 def test_check_no_translate(self):
6661 """ NAT66 translate only when egress interface is outside interface """
6662 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6663 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6664 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6668 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6669 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6671 self.pg0.add_stream([p])
6672 self.pg_enable_capture(self.pg_interfaces)
6674 capture = self.pg1.get_capture(1)
6677 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6678 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6680 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6683 def clear_nat66(self):
6685 Clear NAT66 configuration.
6687 interfaces = self.vapi.nat66_interface_dump()
6688 for intf in interfaces:
6689 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6693 static_mappings = self.vapi.nat66_static_mapping_dump()
6694 for sm in static_mappings:
6695 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6696 sm.external_ip_address,
6701 super(TestNAT66, self).tearDown()
6702 if not self.vpp_dead:
6703 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6704 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6707 if __name__ == '__main__':
6708 unittest.main(testRunner=VppTestRunner)