9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def initiate_tcp_session(self, in_if, out_if):
698 Initiates TCP session
700 :param in_if: Inside interface
701 :param out_if: Outside interface
705 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
706 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
707 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
710 self.pg_enable_capture(self.pg_interfaces)
712 capture = out_if.get_capture(1)
714 self.tcp_port_out = p[TCP].sport
716 # SYN + ACK packet out->in
717 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
718 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
719 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
722 self.pg_enable_capture(self.pg_interfaces)
727 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
728 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
729 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
732 self.pg_enable_capture(self.pg_interfaces)
734 out_if.get_capture(1)
737 self.logger.error("TCP 3 way handshake failed")
740 def verify_ipfix_nat44_ses(self, data):
742 Verify IPFIX NAT44 session create/delete event
744 :param data: Decoded IPFIX data records
746 nat44_ses_create_num = 0
747 nat44_ses_delete_num = 0
748 self.assertEqual(6, len(data))
751 self.assertIn(ord(record[230]), [4, 5])
752 if ord(record[230]) == 4:
753 nat44_ses_create_num += 1
755 nat44_ses_delete_num += 1
757 self.assertEqual(self.pg0.remote_ip4n, record[8])
758 # postNATSourceIPv4Address
759 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
762 self.assertEqual(struct.pack("!I", 0), record[234])
763 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
764 if IP_PROTOS.icmp == ord(record[4]):
765 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
766 self.assertEqual(struct.pack("!H", self.icmp_id_out),
768 elif IP_PROTOS.tcp == ord(record[4]):
769 self.assertEqual(struct.pack("!H", self.tcp_port_in),
771 self.assertEqual(struct.pack("!H", self.tcp_port_out),
773 elif IP_PROTOS.udp == ord(record[4]):
774 self.assertEqual(struct.pack("!H", self.udp_port_in),
776 self.assertEqual(struct.pack("!H", self.udp_port_out),
779 self.fail("Invalid protocol")
780 self.assertEqual(3, nat44_ses_create_num)
781 self.assertEqual(3, nat44_ses_delete_num)
783 def verify_ipfix_addr_exhausted(self, data):
785 Verify IPFIX NAT addresses event
787 :param data: Decoded IPFIX data records
789 self.assertEqual(1, len(data))
792 self.assertEqual(ord(record[230]), 3)
794 self.assertEqual(struct.pack("!I", 0), record[283])
796 def verify_ipfix_max_sessions(self, data, limit):
798 Verify IPFIX maximum session entries exceeded event
800 :param data: Decoded IPFIX data records
801 :param limit: Number of maximum session entries that can be created.
803 self.assertEqual(1, len(data))
806 self.assertEqual(ord(record[230]), 13)
807 # natQuotaExceededEvent
808 self.assertEqual(struct.pack("I", 1), record[466])
810 self.assertEqual(struct.pack("I", limit), record[471])
812 def verify_ipfix_max_bibs(self, data, limit):
814 Verify IPFIX maximum BIB entries exceeded event
816 :param data: Decoded IPFIX data records
817 :param limit: Number of maximum BIB entries that can be created.
819 self.assertEqual(1, len(data))
822 self.assertEqual(ord(record[230]), 13)
823 # natQuotaExceededEvent
824 self.assertEqual(struct.pack("I", 2), record[466])
826 self.assertEqual(struct.pack("I", limit), record[472])
828 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
830 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
832 :param data: Decoded IPFIX data records
833 :param limit: Number of maximum fragments pending reassembly
834 :param src_addr: IPv6 source address
836 self.assertEqual(1, len(data))
839 self.assertEqual(ord(record[230]), 13)
840 # natQuotaExceededEvent
841 self.assertEqual(struct.pack("I", 5), record[466])
842 # maxFragmentsPendingReassembly
843 self.assertEqual(struct.pack("I", limit), record[475])
845 self.assertEqual(src_addr, record[27])
847 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
849 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
851 :param data: Decoded IPFIX data records
852 :param limit: Number of maximum fragments pending reassembly
853 :param src_addr: IPv4 source address
855 self.assertEqual(1, len(data))
858 self.assertEqual(ord(record[230]), 13)
859 # natQuotaExceededEvent
860 self.assertEqual(struct.pack("I", 5), record[466])
861 # maxFragmentsPendingReassembly
862 self.assertEqual(struct.pack("I", limit), record[475])
864 self.assertEqual(src_addr, record[8])
866 def verify_ipfix_bib(self, data, is_create, src_addr):
868 Verify IPFIX NAT64 BIB create and delete events
870 :param data: Decoded IPFIX data records
871 :param is_create: Create event if nonzero value otherwise delete event
872 :param src_addr: IPv6 source address
874 self.assertEqual(1, len(data))
878 self.assertEqual(ord(record[230]), 10)
880 self.assertEqual(ord(record[230]), 11)
882 self.assertEqual(src_addr, record[27])
883 # postNATSourceIPv4Address
884 self.assertEqual(self.nat_addr_n, record[225])
886 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
888 self.assertEqual(struct.pack("!I", 0), record[234])
889 # sourceTransportPort
890 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
891 # postNAPTSourceTransportPort
892 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
894 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
897 Verify IPFIX NAT64 session create and delete events
899 :param data: Decoded IPFIX data records
900 :param is_create: Create event if nonzero value otherwise delete event
901 :param src_addr: IPv6 source address
902 :param dst_addr: IPv4 destination address
903 :param dst_port: destination TCP port
905 self.assertEqual(1, len(data))
909 self.assertEqual(ord(record[230]), 6)
911 self.assertEqual(ord(record[230]), 7)
913 self.assertEqual(src_addr, record[27])
914 # destinationIPv6Address
915 self.assertEqual(socket.inet_pton(socket.AF_INET6,
916 self.compose_ip6(dst_addr,
920 # postNATSourceIPv4Address
921 self.assertEqual(self.nat_addr_n, record[225])
922 # postNATDestinationIPv4Address
923 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
926 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
928 self.assertEqual(struct.pack("!I", 0), record[234])
929 # sourceTransportPort
930 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
931 # postNAPTSourceTransportPort
932 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
933 # destinationTransportPort
934 self.assertEqual(struct.pack("!H", dst_port), record[11])
935 # postNAPTDestinationTransportPort
936 self.assertEqual(struct.pack("!H", dst_port), record[228])
939 class TestNAT44(MethodHolder):
940 """ NAT44 Test Cases """
944 super(TestNAT44, cls).setUpClass()
947 cls.tcp_port_in = 6303
948 cls.tcp_port_out = 6303
949 cls.udp_port_in = 6304
950 cls.udp_port_out = 6304
951 cls.icmp_id_in = 6305
952 cls.icmp_id_out = 6305
953 cls.nat_addr = '10.0.0.3'
954 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
955 cls.ipfix_src_port = 4739
956 cls.ipfix_domain_id = 1
957 cls.tcp_external_port = 80
959 cls.create_pg_interfaces(range(10))
960 cls.interfaces = list(cls.pg_interfaces[0:4])
962 for i in cls.interfaces:
967 cls.pg0.generate_remote_hosts(3)
968 cls.pg0.configure_ipv4_neighbors()
970 cls.pg1.generate_remote_hosts(1)
971 cls.pg1.configure_ipv4_neighbors()
973 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
974 cls.vapi.ip_table_add_del(10, is_add=1)
975 cls.vapi.ip_table_add_del(20, is_add=1)
977 cls.pg4._local_ip4 = "172.16.255.1"
978 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
979 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
980 cls.pg4.set_table_ip4(10)
981 cls.pg5._local_ip4 = "172.17.255.3"
982 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
983 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
984 cls.pg5.set_table_ip4(10)
985 cls.pg6._local_ip4 = "172.16.255.1"
986 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
987 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
988 cls.pg6.set_table_ip4(20)
989 for i in cls.overlapping_interfaces:
997 cls.pg9.generate_remote_hosts(2)
999 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
1000 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
1004 cls.pg9.resolve_arp()
1005 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
1006 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
1007 cls.pg9.resolve_arp()
1010 super(TestNAT44, cls).tearDownClass()
1013 def clear_nat44(self):
1015 Clear NAT44 configuration.
1017 # I found no elegant way to do this
1018 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1019 dst_address_length=32,
1020 next_hop_address=self.pg7.remote_ip4n,
1021 next_hop_sw_if_index=self.pg7.sw_if_index,
1023 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1024 dst_address_length=32,
1025 next_hop_address=self.pg8.remote_ip4n,
1026 next_hop_sw_if_index=self.pg8.sw_if_index,
1029 for intf in [self.pg7, self.pg8]:
1030 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
1032 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
1037 if self.pg7.has_ip4_config:
1038 self.pg7.unconfig_ip4()
1040 self.vapi.nat44_forwarding_enable_disable(0)
1042 interfaces = self.vapi.nat44_interface_addr_dump()
1043 for intf in interfaces:
1044 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
1045 twice_nat=intf.twice_nat,
1048 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1049 domain_id=self.ipfix_domain_id)
1050 self.ipfix_src_port = 4739
1051 self.ipfix_domain_id = 1
1053 interfaces = self.vapi.nat44_interface_dump()
1054 for intf in interfaces:
1055 if intf.is_inside > 1:
1056 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1059 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1063 interfaces = self.vapi.nat44_interface_output_feature_dump()
1064 for intf in interfaces:
1065 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1069 static_mappings = self.vapi.nat44_static_mapping_dump()
1070 for sm in static_mappings:
1071 self.vapi.nat44_add_del_static_mapping(
1072 sm.local_ip_address,
1073 sm.external_ip_address,
1074 local_port=sm.local_port,
1075 external_port=sm.external_port,
1076 addr_only=sm.addr_only,
1078 protocol=sm.protocol,
1079 twice_nat=sm.twice_nat,
1080 self_twice_nat=sm.self_twice_nat,
1081 out2in_only=sm.out2in_only,
1083 external_sw_if_index=sm.external_sw_if_index,
1086 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1087 for lb_sm in lb_static_mappings:
1088 self.vapi.nat44_add_del_lb_static_mapping(
1089 lb_sm.external_addr,
1090 lb_sm.external_port,
1092 vrf_id=lb_sm.vrf_id,
1093 twice_nat=lb_sm.twice_nat,
1094 self_twice_nat=lb_sm.self_twice_nat,
1095 out2in_only=lb_sm.out2in_only,
1101 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1102 for id_m in identity_mappings:
1103 self.vapi.nat44_add_del_identity_mapping(
1104 addr_only=id_m.addr_only,
1107 sw_if_index=id_m.sw_if_index,
1109 protocol=id_m.protocol,
1112 adresses = self.vapi.nat44_address_dump()
1113 for addr in adresses:
1114 self.vapi.nat44_add_del_address_range(addr.ip_address,
1116 twice_nat=addr.twice_nat,
1119 self.vapi.nat_set_reass()
1120 self.vapi.nat_set_reass(is_ip6=1)
1122 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1123 local_port=0, external_port=0, vrf_id=0,
1124 is_add=1, external_sw_if_index=0xFFFFFFFF,
1125 proto=0, twice_nat=0, self_twice_nat=0,
1126 out2in_only=0, tag=""):
1128 Add/delete NAT44 static mapping
1130 :param local_ip: Local IP address
1131 :param external_ip: External IP address
1132 :param local_port: Local port number (Optional)
1133 :param external_port: External port number (Optional)
1134 :param vrf_id: VRF ID (Default 0)
1135 :param is_add: 1 if add, 0 if delete (Default add)
1136 :param external_sw_if_index: External interface instead of IP address
1137 :param proto: IP protocol (Mandatory if port specified)
1138 :param twice_nat: 1 if translate external host address and port
1139 :param self_twice_nat: 1 if translate external host address and port
1140 whenever external host address equals
1141 local address of internal host
1142 :param out2in_only: if 1 rule is matching only out2in direction
1143 :param tag: Opaque string tag
1146 if local_port and external_port:
1148 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1149 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1150 self.vapi.nat44_add_del_static_mapping(
1153 external_sw_if_index,
1165 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1167 Add/delete NAT44 address
1169 :param ip: IP address
1170 :param is_add: 1 if add, 0 if delete (Default add)
1171 :param twice_nat: twice NAT address for extenal hosts
1173 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1174 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1176 twice_nat=twice_nat)
1178 def test_dynamic(self):
1179 """ NAT44 dynamic translation test """
1181 self.nat44_add_address(self.nat_addr)
1182 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1183 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1187 pkts = self.create_stream_in(self.pg0, self.pg1)
1188 self.pg0.add_stream(pkts)
1189 self.pg_enable_capture(self.pg_interfaces)
1191 capture = self.pg1.get_capture(len(pkts))
1192 self.verify_capture_out(capture)
1195 pkts = self.create_stream_out(self.pg1)
1196 self.pg1.add_stream(pkts)
1197 self.pg_enable_capture(self.pg_interfaces)
1199 capture = self.pg0.get_capture(len(pkts))
1200 self.verify_capture_in(capture, self.pg0)
1202 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1203 """ NAT44 handling of client packets with TTL=1 """
1205 self.nat44_add_address(self.nat_addr)
1206 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1207 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1210 # Client side - generate traffic
1211 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1212 self.pg0.add_stream(pkts)
1213 self.pg_enable_capture(self.pg_interfaces)
1216 # Client side - verify ICMP type 11 packets
1217 capture = self.pg0.get_capture(len(pkts))
1218 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1220 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1221 """ NAT44 handling of server packets with TTL=1 """
1223 self.nat44_add_address(self.nat_addr)
1224 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1225 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1228 # Client side - create sessions
1229 pkts = self.create_stream_in(self.pg0, self.pg1)
1230 self.pg0.add_stream(pkts)
1231 self.pg_enable_capture(self.pg_interfaces)
1234 # Server side - generate traffic
1235 capture = self.pg1.get_capture(len(pkts))
1236 self.verify_capture_out(capture)
1237 pkts = self.create_stream_out(self.pg1, ttl=1)
1238 self.pg1.add_stream(pkts)
1239 self.pg_enable_capture(self.pg_interfaces)
1242 # Server side - verify ICMP type 11 packets
1243 capture = self.pg1.get_capture(len(pkts))
1244 self.verify_capture_out_with_icmp_errors(capture,
1245 src_ip=self.pg1.local_ip4)
1247 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1248 """ NAT44 handling of error responses to client packets with TTL=2 """
1250 self.nat44_add_address(self.nat_addr)
1251 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1252 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1255 # Client side - generate traffic
1256 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1257 self.pg0.add_stream(pkts)
1258 self.pg_enable_capture(self.pg_interfaces)
1261 # Server side - simulate ICMP type 11 response
1262 capture = self.pg1.get_capture(len(pkts))
1263 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1264 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1265 ICMP(type=11) / packet[IP] for packet in capture]
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1270 # Client side - verify ICMP type 11 packets
1271 capture = self.pg0.get_capture(len(pkts))
1272 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1274 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1275 """ NAT44 handling of error responses to server packets with TTL=2 """
1277 self.nat44_add_address(self.nat_addr)
1278 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1279 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1282 # Client side - create sessions
1283 pkts = self.create_stream_in(self.pg0, self.pg1)
1284 self.pg0.add_stream(pkts)
1285 self.pg_enable_capture(self.pg_interfaces)
1288 # Server side - generate traffic
1289 capture = self.pg1.get_capture(len(pkts))
1290 self.verify_capture_out(capture)
1291 pkts = self.create_stream_out(self.pg1, ttl=2)
1292 self.pg1.add_stream(pkts)
1293 self.pg_enable_capture(self.pg_interfaces)
1296 # Client side - simulate ICMP type 11 response
1297 capture = self.pg0.get_capture(len(pkts))
1298 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1299 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1300 ICMP(type=11) / packet[IP] for packet in capture]
1301 self.pg0.add_stream(pkts)
1302 self.pg_enable_capture(self.pg_interfaces)
1305 # Server side - verify ICMP type 11 packets
1306 capture = self.pg1.get_capture(len(pkts))
1307 self.verify_capture_out_with_icmp_errors(capture)
1309 def test_ping_out_interface_from_outside(self):
1310 """ Ping NAT44 out interface from outside network """
1312 self.nat44_add_address(self.nat_addr)
1313 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1314 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1318 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1319 ICMP(id=self.icmp_id_out, type='echo-request'))
1321 self.pg1.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg1.get_capture(len(pkts))
1325 self.assertEqual(1, len(capture))
1328 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1329 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1330 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1331 self.assertEqual(packet[ICMP].type, 0) # echo reply
1333 self.logger.error(ppp("Unexpected or invalid packet "
1334 "(outside network):", packet))
1337 def test_ping_internal_host_from_outside(self):
1338 """ Ping internal host from outside network """
1340 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1341 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1342 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1346 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1347 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1348 ICMP(id=self.icmp_id_out, type='echo-request'))
1349 self.pg1.add_stream(pkt)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(1)
1353 self.verify_capture_in(capture, self.pg0, packet_num=1)
1354 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1357 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1358 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1359 ICMP(id=self.icmp_id_in, type='echo-reply'))
1360 self.pg0.add_stream(pkt)
1361 self.pg_enable_capture(self.pg_interfaces)
1363 capture = self.pg1.get_capture(1)
1364 self.verify_capture_out(capture, same_port=True, packet_num=1)
1365 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1367 def test_forwarding(self):
1368 """ NAT44 forwarding test """
1370 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1371 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1373 self.vapi.nat44_forwarding_enable_disable(1)
1375 real_ip = self.pg0.remote_ip4n
1376 alias_ip = self.nat_addr_n
1377 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1378 external_ip=alias_ip)
1381 # in2out - static mapping match
1383 pkts = self.create_stream_out(self.pg1)
1384 self.pg1.add_stream(pkts)
1385 self.pg_enable_capture(self.pg_interfaces)
1387 capture = self.pg0.get_capture(len(pkts))
1388 self.verify_capture_in(capture, self.pg0)
1390 pkts = self.create_stream_in(self.pg0, self.pg1)
1391 self.pg0.add_stream(pkts)
1392 self.pg_enable_capture(self.pg_interfaces)
1394 capture = self.pg1.get_capture(len(pkts))
1395 self.verify_capture_out(capture, same_port=True)
1397 # in2out - no static mapping match
1399 host0 = self.pg0.remote_hosts[0]
1400 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1402 pkts = self.create_stream_out(self.pg1,
1403 dst_ip=self.pg0.remote_ip4,
1404 use_inside_ports=True)
1405 self.pg1.add_stream(pkts)
1406 self.pg_enable_capture(self.pg_interfaces)
1408 capture = self.pg0.get_capture(len(pkts))
1409 self.verify_capture_in(capture, self.pg0)
1411 pkts = self.create_stream_in(self.pg0, self.pg1)
1412 self.pg0.add_stream(pkts)
1413 self.pg_enable_capture(self.pg_interfaces)
1415 capture = self.pg1.get_capture(len(pkts))
1416 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1419 self.pg0.remote_hosts[0] = host0
1422 self.vapi.nat44_forwarding_enable_disable(0)
1423 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1424 external_ip=alias_ip,
1427 def test_static_in(self):
1428 """ 1:1 NAT initialized from inside network """
1430 nat_ip = "10.0.0.10"
1431 self.tcp_port_out = 6303
1432 self.udp_port_out = 6304
1433 self.icmp_id_out = 6305
1435 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1436 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1437 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1439 sm = self.vapi.nat44_static_mapping_dump()
1440 self.assertEqual(len(sm), 1)
1441 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1442 self.assertEqual(sm[0].protocol, 0)
1443 self.assertEqual(sm[0].local_port, 0)
1444 self.assertEqual(sm[0].external_port, 0)
1447 pkts = self.create_stream_in(self.pg0, self.pg1)
1448 self.pg0.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(len(pkts))
1452 self.verify_capture_out(capture, nat_ip, True)
1455 pkts = self.create_stream_out(self.pg1, nat_ip)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1462 def test_static_out(self):
1463 """ 1:1 NAT initialized from outside network """
1465 nat_ip = "10.0.0.20"
1466 self.tcp_port_out = 6303
1467 self.udp_port_out = 6304
1468 self.icmp_id_out = 6305
1471 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1472 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1473 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1475 sm = self.vapi.nat44_static_mapping_dump()
1476 self.assertEqual(len(sm), 1)
1477 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1480 pkts = self.create_stream_out(self.pg1, nat_ip)
1481 self.pg1.add_stream(pkts)
1482 self.pg_enable_capture(self.pg_interfaces)
1484 capture = self.pg0.get_capture(len(pkts))
1485 self.verify_capture_in(capture, self.pg0)
1488 pkts = self.create_stream_in(self.pg0, self.pg1)
1489 self.pg0.add_stream(pkts)
1490 self.pg_enable_capture(self.pg_interfaces)
1492 capture = self.pg1.get_capture(len(pkts))
1493 self.verify_capture_out(capture, nat_ip, True)
1495 def test_static_with_port_in(self):
1496 """ 1:1 NAPT initialized from inside network """
1498 self.tcp_port_out = 3606
1499 self.udp_port_out = 3607
1500 self.icmp_id_out = 3608
1502 self.nat44_add_address(self.nat_addr)
1503 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1504 self.tcp_port_in, self.tcp_port_out,
1505 proto=IP_PROTOS.tcp)
1506 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507 self.udp_port_in, self.udp_port_out,
1508 proto=IP_PROTOS.udp)
1509 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1510 self.icmp_id_in, self.icmp_id_out,
1511 proto=IP_PROTOS.icmp)
1512 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1513 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1517 pkts = self.create_stream_in(self.pg0, self.pg1)
1518 self.pg0.add_stream(pkts)
1519 self.pg_enable_capture(self.pg_interfaces)
1521 capture = self.pg1.get_capture(len(pkts))
1522 self.verify_capture_out(capture)
1525 pkts = self.create_stream_out(self.pg1)
1526 self.pg1.add_stream(pkts)
1527 self.pg_enable_capture(self.pg_interfaces)
1529 capture = self.pg0.get_capture(len(pkts))
1530 self.verify_capture_in(capture, self.pg0)
1532 def test_static_with_port_out(self):
1533 """ 1:1 NAPT initialized from outside network """
1535 self.tcp_port_out = 30606
1536 self.udp_port_out = 30607
1537 self.icmp_id_out = 30608
1539 self.nat44_add_address(self.nat_addr)
1540 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1541 self.tcp_port_in, self.tcp_port_out,
1542 proto=IP_PROTOS.tcp)
1543 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1544 self.udp_port_in, self.udp_port_out,
1545 proto=IP_PROTOS.udp)
1546 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1547 self.icmp_id_in, self.icmp_id_out,
1548 proto=IP_PROTOS.icmp)
1549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1550 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1554 pkts = self.create_stream_out(self.pg1)
1555 self.pg1.add_stream(pkts)
1556 self.pg_enable_capture(self.pg_interfaces)
1558 capture = self.pg0.get_capture(len(pkts))
1559 self.verify_capture_in(capture, self.pg0)
1562 pkts = self.create_stream_in(self.pg0, self.pg1)
1563 self.pg0.add_stream(pkts)
1564 self.pg_enable_capture(self.pg_interfaces)
1566 capture = self.pg1.get_capture(len(pkts))
1567 self.verify_capture_out(capture)
1569 def test_static_with_port_out2(self):
1570 """ 1:1 NAPT symmetrical rule """
1575 self.vapi.nat44_forwarding_enable_disable(1)
1576 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1577 local_port, external_port,
1578 proto=IP_PROTOS.tcp, out2in_only=1)
1579 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1580 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1583 # from client to service
1584 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1585 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1586 TCP(sport=12345, dport=external_port))
1587 self.pg1.add_stream(p)
1588 self.pg_enable_capture(self.pg_interfaces)
1590 capture = self.pg0.get_capture(1)
1596 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1597 self.assertEqual(tcp.dport, local_port)
1598 self.check_tcp_checksum(p)
1599 self.check_ip_checksum(p)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1605 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1606 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1607 ICMP(type=11) / capture[0][IP])
1608 self.pg0.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg1.get_capture(1)
1614 self.assertEqual(p[IP].src, self.nat_addr)
1616 self.assertEqual(inner.dst, self.nat_addr)
1617 self.assertEqual(inner[TCPerror].dport, external_port)
1619 self.logger.error(ppp("Unexpected or invalid packet:", p))
1622 # from service back to client
1623 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1624 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1625 TCP(sport=local_port, dport=12345))
1626 self.pg0.add_stream(p)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg1.get_capture(1)
1634 self.assertEqual(ip.src, self.nat_addr)
1635 self.assertEqual(tcp.sport, external_port)
1636 self.check_tcp_checksum(p)
1637 self.check_ip_checksum(p)
1639 self.logger.error(ppp("Unexpected or invalid packet:", p))
1643 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1644 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1645 ICMP(type=11) / capture[0][IP])
1646 self.pg1.add_stream(p)
1647 self.pg_enable_capture(self.pg_interfaces)
1649 capture = self.pg0.get_capture(1)
1652 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1654 self.assertEqual(inner.src, self.pg0.remote_ip4)
1655 self.assertEqual(inner[TCPerror].sport, local_port)
1657 self.logger.error(ppp("Unexpected or invalid packet:", p))
1660 # from client to server (no translation)
1661 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1662 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1663 TCP(sport=12346, dport=local_port))
1664 self.pg1.add_stream(p)
1665 self.pg_enable_capture(self.pg_interfaces)
1667 capture = self.pg0.get_capture(1)
1673 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1674 self.assertEqual(tcp.dport, local_port)
1675 self.check_tcp_checksum(p)
1676 self.check_ip_checksum(p)
1678 self.logger.error(ppp("Unexpected or invalid packet:", p))
1681 # from service back to client (no translation)
1682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1684 TCP(sport=local_port, dport=12346))
1685 self.pg0.add_stream(p)
1686 self.pg_enable_capture(self.pg_interfaces)
1688 capture = self.pg1.get_capture(1)
1693 self.assertEqual(ip.src, self.pg0.remote_ip4)
1694 self.assertEqual(tcp.sport, local_port)
1695 self.check_tcp_checksum(p)
1696 self.check_ip_checksum(p)
1698 self.logger.error(ppp("Unexpected or invalid packet:", p))
1701 def test_static_vrf_aware(self):
1702 """ 1:1 NAT VRF awareness """
1704 nat_ip1 = "10.0.0.30"
1705 nat_ip2 = "10.0.0.40"
1706 self.tcp_port_out = 6303
1707 self.udp_port_out = 6304
1708 self.icmp_id_out = 6305
1710 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1712 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1714 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1716 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1717 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1719 # inside interface VRF match NAT44 static mapping VRF
1720 pkts = self.create_stream_in(self.pg4, self.pg3)
1721 self.pg4.add_stream(pkts)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg3.get_capture(len(pkts))
1725 self.verify_capture_out(capture, nat_ip1, True)
1727 # inside interface VRF don't match NAT44 static mapping VRF (packets
1729 pkts = self.create_stream_in(self.pg0, self.pg3)
1730 self.pg0.add_stream(pkts)
1731 self.pg_enable_capture(self.pg_interfaces)
1733 self.pg3.assert_nothing_captured()
1735 def test_dynamic_to_static(self):
1736 """ Switch from dynamic translation to 1:1NAT """
1737 nat_ip = "10.0.0.10"
1738 self.tcp_port_out = 6303
1739 self.udp_port_out = 6304
1740 self.icmp_id_out = 6305
1742 self.nat44_add_address(self.nat_addr)
1743 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1744 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1748 pkts = self.create_stream_in(self.pg0, self.pg1)
1749 self.pg0.add_stream(pkts)
1750 self.pg_enable_capture(self.pg_interfaces)
1752 capture = self.pg1.get_capture(len(pkts))
1753 self.verify_capture_out(capture)
1756 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1757 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1758 self.assertEqual(len(sessions), 0)
1759 pkts = self.create_stream_in(self.pg0, self.pg1)
1760 self.pg0.add_stream(pkts)
1761 self.pg_enable_capture(self.pg_interfaces)
1763 capture = self.pg1.get_capture(len(pkts))
1764 self.verify_capture_out(capture, nat_ip, True)
1766 def test_identity_nat(self):
1767 """ Identity NAT """
1769 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1774 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1775 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1776 TCP(sport=12345, dport=56789))
1777 self.pg1.add_stream(p)
1778 self.pg_enable_capture(self.pg_interfaces)
1780 capture = self.pg0.get_capture(1)
1785 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1786 self.assertEqual(ip.src, self.pg1.remote_ip4)
1787 self.assertEqual(tcp.dport, 56789)
1788 self.assertEqual(tcp.sport, 12345)
1789 self.check_tcp_checksum(p)
1790 self.check_ip_checksum(p)
1792 self.logger.error(ppp("Unexpected or invalid packet:", p))
1795 def test_static_lb(self):
1796 """ NAT44 local service load balancing """
1797 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1800 server1 = self.pg0.remote_hosts[0]
1801 server2 = self.pg0.remote_hosts[1]
1803 locals = [{'addr': server1.ip4n,
1806 {'addr': server2.ip4n,
1810 self.nat44_add_address(self.nat_addr)
1811 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1814 local_num=len(locals),
1816 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1817 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1820 # from client to service
1821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1823 TCP(sport=12345, dport=external_port))
1824 self.pg1.add_stream(p)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg0.get_capture(1)
1833 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1834 if ip.dst == server1.ip4:
1838 self.assertEqual(tcp.dport, local_port)
1839 self.check_tcp_checksum(p)
1840 self.check_ip_checksum(p)
1842 self.logger.error(ppp("Unexpected or invalid packet:", p))
1845 # from service back to client
1846 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1847 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1848 TCP(sport=local_port, dport=12345))
1849 self.pg0.add_stream(p)
1850 self.pg_enable_capture(self.pg_interfaces)
1852 capture = self.pg1.get_capture(1)
1857 self.assertEqual(ip.src, self.nat_addr)
1858 self.assertEqual(tcp.sport, external_port)
1859 self.check_tcp_checksum(p)
1860 self.check_ip_checksum(p)
1862 self.logger.error(ppp("Unexpected or invalid packet:", p))
1865 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1866 def test_static_lb_multi_clients(self):
1867 """ NAT44 local service load balancing - multiple clients"""
1869 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1872 server1 = self.pg0.remote_hosts[0]
1873 server2 = self.pg0.remote_hosts[1]
1875 locals = [{'addr': server1.ip4n,
1878 {'addr': server2.ip4n,
1882 self.nat44_add_address(self.nat_addr)
1883 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1886 local_num=len(locals),
1888 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1889 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1894 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1896 for client in clients:
1897 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1898 IP(src=client, dst=self.nat_addr) /
1899 TCP(sport=12345, dport=external_port))
1901 self.pg1.add_stream(pkts)
1902 self.pg_enable_capture(self.pg_interfaces)
1904 capture = self.pg0.get_capture(len(pkts))
1906 if p[IP].dst == server1.ip4:
1910 self.assertTrue(server1_n > server2_n)
1912 def test_static_lb_2(self):
1913 """ NAT44 local service load balancing (asymmetrical rule) """
1914 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1917 server1 = self.pg0.remote_hosts[0]
1918 server2 = self.pg0.remote_hosts[1]
1920 locals = [{'addr': server1.ip4n,
1923 {'addr': server2.ip4n,
1927 self.vapi.nat44_forwarding_enable_disable(1)
1928 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1932 local_num=len(locals),
1934 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1935 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1938 # from client to service
1939 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1940 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1941 TCP(sport=12345, dport=external_port))
1942 self.pg1.add_stream(p)
1943 self.pg_enable_capture(self.pg_interfaces)
1945 capture = self.pg0.get_capture(1)
1951 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1952 if ip.dst == server1.ip4:
1956 self.assertEqual(tcp.dport, local_port)
1957 self.check_tcp_checksum(p)
1958 self.check_ip_checksum(p)
1960 self.logger.error(ppp("Unexpected or invalid packet:", p))
1963 # from service back to client
1964 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1965 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1966 TCP(sport=local_port, dport=12345))
1967 self.pg0.add_stream(p)
1968 self.pg_enable_capture(self.pg_interfaces)
1970 capture = self.pg1.get_capture(1)
1975 self.assertEqual(ip.src, self.nat_addr)
1976 self.assertEqual(tcp.sport, external_port)
1977 self.check_tcp_checksum(p)
1978 self.check_ip_checksum(p)
1980 self.logger.error(ppp("Unexpected or invalid packet:", p))
1983 # from client to server (no translation)
1984 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1985 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1986 TCP(sport=12346, dport=local_port))
1987 self.pg1.add_stream(p)
1988 self.pg_enable_capture(self.pg_interfaces)
1990 capture = self.pg0.get_capture(1)
1996 self.assertEqual(ip.dst, server1.ip4)
1997 self.assertEqual(tcp.dport, local_port)
1998 self.check_tcp_checksum(p)
1999 self.check_ip_checksum(p)
2001 self.logger.error(ppp("Unexpected or invalid packet:", p))
2004 # from service back to client (no translation)
2005 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
2006 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
2007 TCP(sport=local_port, dport=12346))
2008 self.pg0.add_stream(p)
2009 self.pg_enable_capture(self.pg_interfaces)
2011 capture = self.pg1.get_capture(1)
2016 self.assertEqual(ip.src, server1.ip4)
2017 self.assertEqual(tcp.sport, local_port)
2018 self.check_tcp_checksum(p)
2019 self.check_ip_checksum(p)
2021 self.logger.error(ppp("Unexpected or invalid packet:", p))
2024 def test_multiple_inside_interfaces(self):
2025 """ NAT44 multiple non-overlapping address space inside interfaces """
2027 self.nat44_add_address(self.nat_addr)
2028 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2029 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2030 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2033 # between two NAT44 inside interfaces (no translation)
2034 pkts = self.create_stream_in(self.pg0, self.pg1)
2035 self.pg0.add_stream(pkts)
2036 self.pg_enable_capture(self.pg_interfaces)
2038 capture = self.pg1.get_capture(len(pkts))
2039 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
2041 # from NAT44 inside to interface without NAT44 feature (no translation)
2042 pkts = self.create_stream_in(self.pg0, self.pg2)
2043 self.pg0.add_stream(pkts)
2044 self.pg_enable_capture(self.pg_interfaces)
2046 capture = self.pg2.get_capture(len(pkts))
2047 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
2049 # in2out 1st interface
2050 pkts = self.create_stream_in(self.pg0, self.pg3)
2051 self.pg0.add_stream(pkts)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg3.get_capture(len(pkts))
2055 self.verify_capture_out(capture)
2057 # out2in 1st interface
2058 pkts = self.create_stream_out(self.pg3)
2059 self.pg3.add_stream(pkts)
2060 self.pg_enable_capture(self.pg_interfaces)
2062 capture = self.pg0.get_capture(len(pkts))
2063 self.verify_capture_in(capture, self.pg0)
2065 # in2out 2nd interface
2066 pkts = self.create_stream_in(self.pg1, self.pg3)
2067 self.pg1.add_stream(pkts)
2068 self.pg_enable_capture(self.pg_interfaces)
2070 capture = self.pg3.get_capture(len(pkts))
2071 self.verify_capture_out(capture)
2073 # out2in 2nd interface
2074 pkts = self.create_stream_out(self.pg3)
2075 self.pg3.add_stream(pkts)
2076 self.pg_enable_capture(self.pg_interfaces)
2078 capture = self.pg1.get_capture(len(pkts))
2079 self.verify_capture_in(capture, self.pg1)
2081 def test_inside_overlapping_interfaces(self):
2082 """ NAT44 multiple inside interfaces with overlapping address space """
2084 static_nat_ip = "10.0.0.10"
2085 self.nat44_add_address(self.nat_addr)
2086 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2088 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2089 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2090 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2091 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2094 # between NAT44 inside interfaces with same VRF (no translation)
2095 pkts = self.create_stream_in(self.pg4, self.pg5)
2096 self.pg4.add_stream(pkts)
2097 self.pg_enable_capture(self.pg_interfaces)
2099 capture = self.pg5.get_capture(len(pkts))
2100 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2102 # between NAT44 inside interfaces with different VRF (hairpinning)
2103 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2104 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2105 TCP(sport=1234, dport=5678))
2106 self.pg4.add_stream(p)
2107 self.pg_enable_capture(self.pg_interfaces)
2109 capture = self.pg6.get_capture(1)
2114 self.assertEqual(ip.src, self.nat_addr)
2115 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2116 self.assertNotEqual(tcp.sport, 1234)
2117 self.assertEqual(tcp.dport, 5678)
2119 self.logger.error(ppp("Unexpected or invalid packet:", p))
2122 # in2out 1st interface
2123 pkts = self.create_stream_in(self.pg4, self.pg3)
2124 self.pg4.add_stream(pkts)
2125 self.pg_enable_capture(self.pg_interfaces)
2127 capture = self.pg3.get_capture(len(pkts))
2128 self.verify_capture_out(capture)
2130 # out2in 1st interface
2131 pkts = self.create_stream_out(self.pg3)
2132 self.pg3.add_stream(pkts)
2133 self.pg_enable_capture(self.pg_interfaces)
2135 capture = self.pg4.get_capture(len(pkts))
2136 self.verify_capture_in(capture, self.pg4)
2138 # in2out 2nd interface
2139 pkts = self.create_stream_in(self.pg5, self.pg3)
2140 self.pg5.add_stream(pkts)
2141 self.pg_enable_capture(self.pg_interfaces)
2143 capture = self.pg3.get_capture(len(pkts))
2144 self.verify_capture_out(capture)
2146 # out2in 2nd interface
2147 pkts = self.create_stream_out(self.pg3)
2148 self.pg3.add_stream(pkts)
2149 self.pg_enable_capture(self.pg_interfaces)
2151 capture = self.pg5.get_capture(len(pkts))
2152 self.verify_capture_in(capture, self.pg5)
2155 addresses = self.vapi.nat44_address_dump()
2156 self.assertEqual(len(addresses), 1)
2157 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2158 self.assertEqual(len(sessions), 3)
2159 for session in sessions:
2160 self.assertFalse(session.is_static)
2161 self.assertEqual(session.inside_ip_address[0:4],
2162 self.pg5.remote_ip4n)
2163 self.assertEqual(session.outside_ip_address,
2164 addresses[0].ip_address)
2165 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2166 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2167 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2168 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2169 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2170 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2171 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2172 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2173 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2175 # in2out 3rd interface
2176 pkts = self.create_stream_in(self.pg6, self.pg3)
2177 self.pg6.add_stream(pkts)
2178 self.pg_enable_capture(self.pg_interfaces)
2180 capture = self.pg3.get_capture(len(pkts))
2181 self.verify_capture_out(capture, static_nat_ip, True)
2183 # out2in 3rd interface
2184 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2185 self.pg3.add_stream(pkts)
2186 self.pg_enable_capture(self.pg_interfaces)
2188 capture = self.pg6.get_capture(len(pkts))
2189 self.verify_capture_in(capture, self.pg6)
2191 # general user and session dump verifications
2192 users = self.vapi.nat44_user_dump()
2193 self.assertTrue(len(users) >= 3)
2194 addresses = self.vapi.nat44_address_dump()
2195 self.assertEqual(len(addresses), 1)
2197 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2199 for session in sessions:
2200 self.assertEqual(user.ip_address, session.inside_ip_address)
2201 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2202 self.assertTrue(session.protocol in
2203 [IP_PROTOS.tcp, IP_PROTOS.udp,
2207 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2208 self.assertTrue(len(sessions) >= 4)
2209 for session in sessions:
2210 self.assertFalse(session.is_static)
2211 self.assertEqual(session.inside_ip_address[0:4],
2212 self.pg4.remote_ip4n)
2213 self.assertEqual(session.outside_ip_address,
2214 addresses[0].ip_address)
2217 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2218 self.assertTrue(len(sessions) >= 3)
2219 for session in sessions:
2220 self.assertTrue(session.is_static)
2221 self.assertEqual(session.inside_ip_address[0:4],
2222 self.pg6.remote_ip4n)
2223 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2224 map(int, static_nat_ip.split('.')))
2225 self.assertTrue(session.inside_port in
2226 [self.tcp_port_in, self.udp_port_in,
2229 def test_hairpinning(self):
2230 """ NAT44 hairpinning - 1:1 NAPT """
2232 host = self.pg0.remote_hosts[0]
2233 server = self.pg0.remote_hosts[1]
2236 server_in_port = 5678
2237 server_out_port = 8765
2239 self.nat44_add_address(self.nat_addr)
2240 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2241 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2243 # add static mapping for server
2244 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2245 server_in_port, server_out_port,
2246 proto=IP_PROTOS.tcp)
2248 # send packet from host to server
2249 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2250 IP(src=host.ip4, dst=self.nat_addr) /
2251 TCP(sport=host_in_port, dport=server_out_port))
2252 self.pg0.add_stream(p)
2253 self.pg_enable_capture(self.pg_interfaces)
2255 capture = self.pg0.get_capture(1)
2260 self.assertEqual(ip.src, self.nat_addr)
2261 self.assertEqual(ip.dst, server.ip4)
2262 self.assertNotEqual(tcp.sport, host_in_port)
2263 self.assertEqual(tcp.dport, server_in_port)
2264 self.check_tcp_checksum(p)
2265 host_out_port = tcp.sport
2267 self.logger.error(ppp("Unexpected or invalid packet:", p))
2270 # send reply from server to host
2271 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2272 IP(src=server.ip4, dst=self.nat_addr) /
2273 TCP(sport=server_in_port, dport=host_out_port))
2274 self.pg0.add_stream(p)
2275 self.pg_enable_capture(self.pg_interfaces)
2277 capture = self.pg0.get_capture(1)
2282 self.assertEqual(ip.src, self.nat_addr)
2283 self.assertEqual(ip.dst, host.ip4)
2284 self.assertEqual(tcp.sport, server_out_port)
2285 self.assertEqual(tcp.dport, host_in_port)
2286 self.check_tcp_checksum(p)
2288 self.logger.error(ppp("Unexpected or invalid packet:", p))
2291 def test_hairpinning2(self):
2292 """ NAT44 hairpinning - 1:1 NAT"""
2294 server1_nat_ip = "10.0.0.10"
2295 server2_nat_ip = "10.0.0.11"
2296 host = self.pg0.remote_hosts[0]
2297 server1 = self.pg0.remote_hosts[1]
2298 server2 = self.pg0.remote_hosts[2]
2299 server_tcp_port = 22
2300 server_udp_port = 20
2302 self.nat44_add_address(self.nat_addr)
2303 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2304 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2307 # add static mapping for servers
2308 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2309 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2313 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2314 IP(src=host.ip4, dst=server1_nat_ip) /
2315 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2317 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2318 IP(src=host.ip4, dst=server1_nat_ip) /
2319 UDP(sport=self.udp_port_in, dport=server_udp_port))
2321 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2322 IP(src=host.ip4, dst=server1_nat_ip) /
2323 ICMP(id=self.icmp_id_in, type='echo-request'))
2325 self.pg0.add_stream(pkts)
2326 self.pg_enable_capture(self.pg_interfaces)
2328 capture = self.pg0.get_capture(len(pkts))
2329 for packet in capture:
2331 self.assertEqual(packet[IP].src, self.nat_addr)
2332 self.assertEqual(packet[IP].dst, server1.ip4)
2333 if packet.haslayer(TCP):
2334 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2335 self.assertEqual(packet[TCP].dport, server_tcp_port)
2336 self.tcp_port_out = packet[TCP].sport
2337 self.check_tcp_checksum(packet)
2338 elif packet.haslayer(UDP):
2339 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2340 self.assertEqual(packet[UDP].dport, server_udp_port)
2341 self.udp_port_out = packet[UDP].sport
2343 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2344 self.icmp_id_out = packet[ICMP].id
2346 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2351 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2352 IP(src=server1.ip4, dst=self.nat_addr) /
2353 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2355 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2356 IP(src=server1.ip4, dst=self.nat_addr) /
2357 UDP(sport=server_udp_port, dport=self.udp_port_out))
2359 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2360 IP(src=server1.ip4, dst=self.nat_addr) /
2361 ICMP(id=self.icmp_id_out, type='echo-reply'))
2363 self.pg0.add_stream(pkts)
2364 self.pg_enable_capture(self.pg_interfaces)
2366 capture = self.pg0.get_capture(len(pkts))
2367 for packet in capture:
2369 self.assertEqual(packet[IP].src, server1_nat_ip)
2370 self.assertEqual(packet[IP].dst, host.ip4)
2371 if packet.haslayer(TCP):
2372 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2373 self.assertEqual(packet[TCP].sport, server_tcp_port)
2374 self.check_tcp_checksum(packet)
2375 elif packet.haslayer(UDP):
2376 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2377 self.assertEqual(packet[UDP].sport, server_udp_port)
2379 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2381 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2384 # server2 to server1
2386 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2387 IP(src=server2.ip4, dst=server1_nat_ip) /
2388 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=server2.ip4, dst=server1_nat_ip) /
2392 UDP(sport=self.udp_port_in, dport=server_udp_port))
2394 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2395 IP(src=server2.ip4, dst=server1_nat_ip) /
2396 ICMP(id=self.icmp_id_in, type='echo-request'))
2398 self.pg0.add_stream(pkts)
2399 self.pg_enable_capture(self.pg_interfaces)
2401 capture = self.pg0.get_capture(len(pkts))
2402 for packet in capture:
2404 self.assertEqual(packet[IP].src, server2_nat_ip)
2405 self.assertEqual(packet[IP].dst, server1.ip4)
2406 if packet.haslayer(TCP):
2407 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2408 self.assertEqual(packet[TCP].dport, server_tcp_port)
2409 self.tcp_port_out = packet[TCP].sport
2410 self.check_tcp_checksum(packet)
2411 elif packet.haslayer(UDP):
2412 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2413 self.assertEqual(packet[UDP].dport, server_udp_port)
2414 self.udp_port_out = packet[UDP].sport
2416 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2417 self.icmp_id_out = packet[ICMP].id
2419 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2422 # server1 to server2
2424 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2425 IP(src=server1.ip4, dst=server2_nat_ip) /
2426 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2428 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2429 IP(src=server1.ip4, dst=server2_nat_ip) /
2430 UDP(sport=server_udp_port, dport=self.udp_port_out))
2432 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2433 IP(src=server1.ip4, dst=server2_nat_ip) /
2434 ICMP(id=self.icmp_id_out, type='echo-reply'))
2436 self.pg0.add_stream(pkts)
2437 self.pg_enable_capture(self.pg_interfaces)
2439 capture = self.pg0.get_capture(len(pkts))
2440 for packet in capture:
2442 self.assertEqual(packet[IP].src, server1_nat_ip)
2443 self.assertEqual(packet[IP].dst, server2.ip4)
2444 if packet.haslayer(TCP):
2445 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2446 self.assertEqual(packet[TCP].sport, server_tcp_port)
2447 self.check_tcp_checksum(packet)
2448 elif packet.haslayer(UDP):
2449 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2450 self.assertEqual(packet[UDP].sport, server_udp_port)
2452 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2454 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2457 def test_max_translations_per_user(self):
2458 """ MAX translations per user - recycle the least recently used """
2460 self.nat44_add_address(self.nat_addr)
2461 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2462 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2465 # get maximum number of translations per user
2466 nat44_config = self.vapi.nat_show_config()
2468 # send more than maximum number of translations per user packets
2469 pkts_num = nat44_config.max_translations_per_user + 5
2471 for port in range(0, pkts_num):
2472 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2473 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2474 TCP(sport=1025 + port))
2476 self.pg0.add_stream(pkts)
2477 self.pg_enable_capture(self.pg_interfaces)
2480 # verify number of translated packet
2481 self.pg1.get_capture(pkts_num)
2483 users = self.vapi.nat44_user_dump()
2485 if user.ip_address == self.pg0.remote_ip4n:
2486 self.assertEqual(user.nsessions,
2487 nat44_config.max_translations_per_user)
2488 self.assertEqual(user.nstaticsessions, 0)
2491 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2493 proto=IP_PROTOS.tcp)
2494 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2495 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2496 TCP(sport=tcp_port))
2497 self.pg0.add_stream(p)
2498 self.pg_enable_capture(self.pg_interfaces)
2500 self.pg1.get_capture(1)
2501 users = self.vapi.nat44_user_dump()
2503 if user.ip_address == self.pg0.remote_ip4n:
2504 self.assertEqual(user.nsessions,
2505 nat44_config.max_translations_per_user - 1)
2506 self.assertEqual(user.nstaticsessions, 1)
2508 def test_interface_addr(self):
2509 """ Acquire NAT44 addresses from interface """
2510 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2512 # no address in NAT pool
2513 adresses = self.vapi.nat44_address_dump()
2514 self.assertEqual(0, len(adresses))
2516 # configure interface address and check NAT address pool
2517 self.pg7.config_ip4()
2518 adresses = self.vapi.nat44_address_dump()
2519 self.assertEqual(1, len(adresses))
2520 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2522 # remove interface address and check NAT address pool
2523 self.pg7.unconfig_ip4()
2524 adresses = self.vapi.nat44_address_dump()
2525 self.assertEqual(0, len(adresses))
2527 def test_interface_addr_static_mapping(self):
2528 """ Static mapping with addresses from interface """
2531 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2532 self.nat44_add_static_mapping(
2534 external_sw_if_index=self.pg7.sw_if_index,
2537 # static mappings with external interface
2538 static_mappings = self.vapi.nat44_static_mapping_dump()
2539 self.assertEqual(1, len(static_mappings))
2540 self.assertEqual(self.pg7.sw_if_index,
2541 static_mappings[0].external_sw_if_index)
2542 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2544 # configure interface address and check static mappings
2545 self.pg7.config_ip4()
2546 static_mappings = self.vapi.nat44_static_mapping_dump()
2547 self.assertEqual(2, len(static_mappings))
2549 for sm in static_mappings:
2550 if sm.external_sw_if_index == 0xFFFFFFFF:
2551 self.assertEqual(sm.external_ip_address[0:4],
2552 self.pg7.local_ip4n)
2553 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2555 self.assertTrue(resolved)
2557 # remove interface address and check static mappings
2558 self.pg7.unconfig_ip4()
2559 static_mappings = self.vapi.nat44_static_mapping_dump()
2560 self.assertEqual(1, len(static_mappings))
2561 self.assertEqual(self.pg7.sw_if_index,
2562 static_mappings[0].external_sw_if_index)
2563 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2565 # configure interface address again and check static mappings
2566 self.pg7.config_ip4()
2567 static_mappings = self.vapi.nat44_static_mapping_dump()
2568 self.assertEqual(2, len(static_mappings))
2570 for sm in static_mappings:
2571 if sm.external_sw_if_index == 0xFFFFFFFF:
2572 self.assertEqual(sm.external_ip_address[0:4],
2573 self.pg7.local_ip4n)
2574 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2576 self.assertTrue(resolved)
2578 # remove static mapping
2579 self.nat44_add_static_mapping(
2581 external_sw_if_index=self.pg7.sw_if_index,
2584 static_mappings = self.vapi.nat44_static_mapping_dump()
2585 self.assertEqual(0, len(static_mappings))
2587 def test_interface_addr_identity_nat(self):
2588 """ Identity NAT with addresses from interface """
2591 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2592 self.vapi.nat44_add_del_identity_mapping(
2593 sw_if_index=self.pg7.sw_if_index,
2595 protocol=IP_PROTOS.tcp,
2598 # identity mappings with external interface
2599 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2600 self.assertEqual(1, len(identity_mappings))
2601 self.assertEqual(self.pg7.sw_if_index,
2602 identity_mappings[0].sw_if_index)
2604 # configure interface address and check identity mappings
2605 self.pg7.config_ip4()
2606 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2608 self.assertEqual(2, len(identity_mappings))
2609 for sm in identity_mappings:
2610 if sm.sw_if_index == 0xFFFFFFFF:
2611 self.assertEqual(identity_mappings[0].ip_address,
2612 self.pg7.local_ip4n)
2613 self.assertEqual(port, identity_mappings[0].port)
2614 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2616 self.assertTrue(resolved)
2618 # remove interface address and check identity mappings
2619 self.pg7.unconfig_ip4()
2620 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2621 self.assertEqual(1, len(identity_mappings))
2622 self.assertEqual(self.pg7.sw_if_index,
2623 identity_mappings[0].sw_if_index)
2625 def test_ipfix_nat44_sess(self):
2626 """ IPFIX logging NAT44 session created/delted """
2627 self.ipfix_domain_id = 10
2628 self.ipfix_src_port = 20202
2629 colector_port = 30303
2630 bind_layers(UDP, IPFIX, dport=30303)
2631 self.nat44_add_address(self.nat_addr)
2632 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2633 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2635 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2636 src_address=self.pg3.local_ip4n,
2638 template_interval=10,
2639 collector_port=colector_port)
2640 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2641 src_port=self.ipfix_src_port)
2643 pkts = self.create_stream_in(self.pg0, self.pg1)
2644 self.pg0.add_stream(pkts)
2645 self.pg_enable_capture(self.pg_interfaces)
2647 capture = self.pg1.get_capture(len(pkts))
2648 self.verify_capture_out(capture)
2649 self.nat44_add_address(self.nat_addr, is_add=0)
2650 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2651 capture = self.pg3.get_capture(9)
2652 ipfix = IPFIXDecoder()
2653 # first load template
2655 self.assertTrue(p.haslayer(IPFIX))
2656 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2657 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2658 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2659 self.assertEqual(p[UDP].dport, colector_port)
2660 self.assertEqual(p[IPFIX].observationDomainID,
2661 self.ipfix_domain_id)
2662 if p.haslayer(Template):
2663 ipfix.add_template(p.getlayer(Template))
2664 # verify events in data set
2666 if p.haslayer(Data):
2667 data = ipfix.decode_data_set(p.getlayer(Set))
2668 self.verify_ipfix_nat44_ses(data)
2670 def test_ipfix_addr_exhausted(self):
2671 """ IPFIX logging NAT addresses exhausted """
2672 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2673 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2675 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2676 src_address=self.pg3.local_ip4n,
2678 template_interval=10)
2679 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2680 src_port=self.ipfix_src_port)
2682 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2683 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2685 self.pg0.add_stream(p)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg1.get_capture(0)
2689 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2690 capture = self.pg3.get_capture(9)
2691 ipfix = IPFIXDecoder()
2692 # first load template
2694 self.assertTrue(p.haslayer(IPFIX))
2695 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2696 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2697 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2698 self.assertEqual(p[UDP].dport, 4739)
2699 self.assertEqual(p[IPFIX].observationDomainID,
2700 self.ipfix_domain_id)
2701 if p.haslayer(Template):
2702 ipfix.add_template(p.getlayer(Template))
2703 # verify events in data set
2705 if p.haslayer(Data):
2706 data = ipfix.decode_data_set(p.getlayer(Set))
2707 self.verify_ipfix_addr_exhausted(data)
2709 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2710 def test_ipfix_max_sessions(self):
2711 """ IPFIX logging maximum session entries exceeded """
2712 self.nat44_add_address(self.nat_addr)
2713 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2714 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2717 nat44_config = self.vapi.nat_show_config()
2718 max_sessions = 10 * nat44_config.translation_buckets
2721 for i in range(0, max_sessions):
2722 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2723 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2724 IP(src=src, dst=self.pg1.remote_ip4) /
2727 self.pg0.add_stream(pkts)
2728 self.pg_enable_capture(self.pg_interfaces)
2731 self.pg1.get_capture(max_sessions)
2732 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2733 src_address=self.pg3.local_ip4n,
2735 template_interval=10)
2736 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2737 src_port=self.ipfix_src_port)
2739 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2740 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2742 self.pg0.add_stream(p)
2743 self.pg_enable_capture(self.pg_interfaces)
2745 self.pg1.get_capture(0)
2746 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2747 capture = self.pg3.get_capture(9)
2748 ipfix = IPFIXDecoder()
2749 # first load template
2751 self.assertTrue(p.haslayer(IPFIX))
2752 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2753 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2754 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2755 self.assertEqual(p[UDP].dport, 4739)
2756 self.assertEqual(p[IPFIX].observationDomainID,
2757 self.ipfix_domain_id)
2758 if p.haslayer(Template):
2759 ipfix.add_template(p.getlayer(Template))
2760 # verify events in data set
2762 if p.haslayer(Data):
2763 data = ipfix.decode_data_set(p.getlayer(Set))
2764 self.verify_ipfix_max_sessions(data, max_sessions)
2766 def test_pool_addr_fib(self):
2767 """ NAT44 add pool addresses to FIB """
2768 static_addr = '10.0.0.10'
2769 self.nat44_add_address(self.nat_addr)
2770 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2771 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2773 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2776 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2777 ARP(op=ARP.who_has, pdst=self.nat_addr,
2778 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2779 self.pg1.add_stream(p)
2780 self.pg_enable_capture(self.pg_interfaces)
2782 capture = self.pg1.get_capture(1)
2783 self.assertTrue(capture[0].haslayer(ARP))
2784 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2787 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2788 ARP(op=ARP.who_has, pdst=static_addr,
2789 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2790 self.pg1.add_stream(p)
2791 self.pg_enable_capture(self.pg_interfaces)
2793 capture = self.pg1.get_capture(1)
2794 self.assertTrue(capture[0].haslayer(ARP))
2795 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2797 # send ARP to non-NAT44 interface
2798 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2799 ARP(op=ARP.who_has, pdst=self.nat_addr,
2800 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2801 self.pg2.add_stream(p)
2802 self.pg_enable_capture(self.pg_interfaces)
2804 capture = self.pg1.get_capture(0)
2806 # remove addresses and verify
2807 self.nat44_add_address(self.nat_addr, is_add=0)
2808 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2811 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2812 ARP(op=ARP.who_has, pdst=self.nat_addr,
2813 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2814 self.pg1.add_stream(p)
2815 self.pg_enable_capture(self.pg_interfaces)
2817 capture = self.pg1.get_capture(0)
2819 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2820 ARP(op=ARP.who_has, pdst=static_addr,
2821 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2822 self.pg1.add_stream(p)
2823 self.pg_enable_capture(self.pg_interfaces)
2825 capture = self.pg1.get_capture(0)
2827 def test_vrf_mode(self):
2828 """ NAT44 tenant VRF aware address pool mode """
2832 nat_ip1 = "10.0.0.10"
2833 nat_ip2 = "10.0.0.11"
2835 self.pg0.unconfig_ip4()
2836 self.pg1.unconfig_ip4()
2837 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2838 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2839 self.pg0.set_table_ip4(vrf_id1)
2840 self.pg1.set_table_ip4(vrf_id2)
2841 self.pg0.config_ip4()
2842 self.pg1.config_ip4()
2844 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2845 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2846 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2847 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2848 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2852 pkts = self.create_stream_in(self.pg0, self.pg2)
2853 self.pg0.add_stream(pkts)
2854 self.pg_enable_capture(self.pg_interfaces)
2856 capture = self.pg2.get_capture(len(pkts))
2857 self.verify_capture_out(capture, nat_ip1)
2860 pkts = self.create_stream_in(self.pg1, self.pg2)
2861 self.pg1.add_stream(pkts)
2862 self.pg_enable_capture(self.pg_interfaces)
2864 capture = self.pg2.get_capture(len(pkts))
2865 self.verify_capture_out(capture, nat_ip2)
2867 self.pg0.unconfig_ip4()
2868 self.pg1.unconfig_ip4()
2869 self.pg0.set_table_ip4(0)
2870 self.pg1.set_table_ip4(0)
2871 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2872 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2874 def test_vrf_feature_independent(self):
2875 """ NAT44 tenant VRF independent address pool mode """
2877 nat_ip1 = "10.0.0.10"
2878 nat_ip2 = "10.0.0.11"
2880 self.nat44_add_address(nat_ip1)
2881 self.nat44_add_address(nat_ip2, vrf_id=99)
2882 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2883 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2884 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2888 pkts = self.create_stream_in(self.pg0, self.pg2)
2889 self.pg0.add_stream(pkts)
2890 self.pg_enable_capture(self.pg_interfaces)
2892 capture = self.pg2.get_capture(len(pkts))
2893 self.verify_capture_out(capture, nat_ip1)
2896 pkts = self.create_stream_in(self.pg1, self.pg2)
2897 self.pg1.add_stream(pkts)
2898 self.pg_enable_capture(self.pg_interfaces)
2900 capture = self.pg2.get_capture(len(pkts))
2901 self.verify_capture_out(capture, nat_ip1)
2903 def test_dynamic_ipless_interfaces(self):
2904 """ NAT44 interfaces without configured IP address """
2906 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2907 mactobinary(self.pg7.remote_mac),
2908 self.pg7.remote_ip4n,
2910 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2911 mactobinary(self.pg8.remote_mac),
2912 self.pg8.remote_ip4n,
2915 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2916 dst_address_length=32,
2917 next_hop_address=self.pg7.remote_ip4n,
2918 next_hop_sw_if_index=self.pg7.sw_if_index)
2919 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2920 dst_address_length=32,
2921 next_hop_address=self.pg8.remote_ip4n,
2922 next_hop_sw_if_index=self.pg8.sw_if_index)
2924 self.nat44_add_address(self.nat_addr)
2925 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2926 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2930 pkts = self.create_stream_in(self.pg7, self.pg8)
2931 self.pg7.add_stream(pkts)
2932 self.pg_enable_capture(self.pg_interfaces)
2934 capture = self.pg8.get_capture(len(pkts))
2935 self.verify_capture_out(capture)
2938 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2939 self.pg8.add_stream(pkts)
2940 self.pg_enable_capture(self.pg_interfaces)
2942 capture = self.pg7.get_capture(len(pkts))
2943 self.verify_capture_in(capture, self.pg7)
2945 def test_static_ipless_interfaces(self):
2946 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2948 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2949 mactobinary(self.pg7.remote_mac),
2950 self.pg7.remote_ip4n,
2952 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2953 mactobinary(self.pg8.remote_mac),
2954 self.pg8.remote_ip4n,
2957 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2958 dst_address_length=32,
2959 next_hop_address=self.pg7.remote_ip4n,
2960 next_hop_sw_if_index=self.pg7.sw_if_index)
2961 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2962 dst_address_length=32,
2963 next_hop_address=self.pg8.remote_ip4n,
2964 next_hop_sw_if_index=self.pg8.sw_if_index)
2966 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2967 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2968 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2972 pkts = self.create_stream_out(self.pg8)
2973 self.pg8.add_stream(pkts)
2974 self.pg_enable_capture(self.pg_interfaces)
2976 capture = self.pg7.get_capture(len(pkts))
2977 self.verify_capture_in(capture, self.pg7)
2980 pkts = self.create_stream_in(self.pg7, self.pg8)
2981 self.pg7.add_stream(pkts)
2982 self.pg_enable_capture(self.pg_interfaces)
2984 capture = self.pg8.get_capture(len(pkts))
2985 self.verify_capture_out(capture, self.nat_addr, True)
2987 def test_static_with_port_ipless_interfaces(self):
2988 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2990 self.tcp_port_out = 30606
2991 self.udp_port_out = 30607
2992 self.icmp_id_out = 30608
2994 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2995 mactobinary(self.pg7.remote_mac),
2996 self.pg7.remote_ip4n,
2998 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2999 mactobinary(self.pg8.remote_mac),
3000 self.pg8.remote_ip4n,
3003 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
3004 dst_address_length=32,
3005 next_hop_address=self.pg7.remote_ip4n,
3006 next_hop_sw_if_index=self.pg7.sw_if_index)
3007 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
3008 dst_address_length=32,
3009 next_hop_address=self.pg8.remote_ip4n,
3010 next_hop_sw_if_index=self.pg8.sw_if_index)
3012 self.nat44_add_address(self.nat_addr)
3013 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3014 self.tcp_port_in, self.tcp_port_out,
3015 proto=IP_PROTOS.tcp)
3016 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3017 self.udp_port_in, self.udp_port_out,
3018 proto=IP_PROTOS.udp)
3019 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
3020 self.icmp_id_in, self.icmp_id_out,
3021 proto=IP_PROTOS.icmp)
3022 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
3023 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
3027 pkts = self.create_stream_out(self.pg8)
3028 self.pg8.add_stream(pkts)
3029 self.pg_enable_capture(self.pg_interfaces)
3031 capture = self.pg7.get_capture(len(pkts))
3032 self.verify_capture_in(capture, self.pg7)
3035 pkts = self.create_stream_in(self.pg7, self.pg8)
3036 self.pg7.add_stream(pkts)
3037 self.pg_enable_capture(self.pg_interfaces)
3039 capture = self.pg8.get_capture(len(pkts))
3040 self.verify_capture_out(capture)
3042 def test_static_unknown_proto(self):
3043 """ 1:1 NAT translate packet with unknown protocol """
3044 nat_ip = "10.0.0.10"
3045 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
3046 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3047 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3052 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3054 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3055 TCP(sport=1234, dport=1234))
3056 self.pg0.add_stream(p)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 p = self.pg1.get_capture(1)
3062 self.assertEqual(packet[IP].src, nat_ip)
3063 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3064 self.assertTrue(packet.haslayer(GRE))
3065 self.check_ip_checksum(packet)
3067 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3071 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3072 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
3074 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3075 TCP(sport=1234, dport=1234))
3076 self.pg1.add_stream(p)
3077 self.pg_enable_capture(self.pg_interfaces)
3079 p = self.pg0.get_capture(1)
3082 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3083 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3084 self.assertTrue(packet.haslayer(GRE))
3085 self.check_ip_checksum(packet)
3087 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3090 def test_hairpinning_static_unknown_proto(self):
3091 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3093 host = self.pg0.remote_hosts[0]
3094 server = self.pg0.remote_hosts[1]
3096 host_nat_ip = "10.0.0.10"
3097 server_nat_ip = "10.0.0.11"
3099 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3100 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3101 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3102 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3106 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3107 IP(src=host.ip4, dst=server_nat_ip) /
3109 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3110 TCP(sport=1234, dport=1234))
3111 self.pg0.add_stream(p)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 p = self.pg0.get_capture(1)
3117 self.assertEqual(packet[IP].src, host_nat_ip)
3118 self.assertEqual(packet[IP].dst, server.ip4)
3119 self.assertTrue(packet.haslayer(GRE))
3120 self.check_ip_checksum(packet)
3122 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3126 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3127 IP(src=server.ip4, dst=host_nat_ip) /
3129 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3130 TCP(sport=1234, dport=1234))
3131 self.pg0.add_stream(p)
3132 self.pg_enable_capture(self.pg_interfaces)
3134 p = self.pg0.get_capture(1)
3137 self.assertEqual(packet[IP].src, server_nat_ip)
3138 self.assertEqual(packet[IP].dst, host.ip4)
3139 self.assertTrue(packet.haslayer(GRE))
3140 self.check_ip_checksum(packet)
3142 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3145 def test_unknown_proto(self):
3146 """ NAT44 translate packet with unknown protocol """
3147 self.nat44_add_address(self.nat_addr)
3148 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3149 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3153 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3154 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3155 TCP(sport=self.tcp_port_in, dport=20))
3156 self.pg0.add_stream(p)
3157 self.pg_enable_capture(self.pg_interfaces)
3159 p = self.pg1.get_capture(1)
3161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3162 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3164 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3165 TCP(sport=1234, dport=1234))
3166 self.pg0.add_stream(p)
3167 self.pg_enable_capture(self.pg_interfaces)
3169 p = self.pg1.get_capture(1)
3172 self.assertEqual(packet[IP].src, self.nat_addr)
3173 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3174 self.assertTrue(packet.haslayer(GRE))
3175 self.check_ip_checksum(packet)
3177 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3181 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3182 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3184 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3185 TCP(sport=1234, dport=1234))
3186 self.pg1.add_stream(p)
3187 self.pg_enable_capture(self.pg_interfaces)
3189 p = self.pg0.get_capture(1)
3192 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3193 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3194 self.assertTrue(packet.haslayer(GRE))
3195 self.check_ip_checksum(packet)
3197 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3200 def test_hairpinning_unknown_proto(self):
3201 """ NAT44 translate packet with unknown protocol - hairpinning """
3202 host = self.pg0.remote_hosts[0]
3203 server = self.pg0.remote_hosts[1]
3206 server_in_port = 5678
3207 server_out_port = 8765
3208 server_nat_ip = "10.0.0.11"
3210 self.nat44_add_address(self.nat_addr)
3211 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3212 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3215 # add static mapping for server
3216 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3219 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3220 IP(src=host.ip4, dst=server_nat_ip) /
3221 TCP(sport=host_in_port, dport=server_out_port))
3222 self.pg0.add_stream(p)
3223 self.pg_enable_capture(self.pg_interfaces)
3225 capture = self.pg0.get_capture(1)
3227 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3228 IP(src=host.ip4, dst=server_nat_ip) /
3230 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3231 TCP(sport=1234, dport=1234))
3232 self.pg0.add_stream(p)
3233 self.pg_enable_capture(self.pg_interfaces)
3235 p = self.pg0.get_capture(1)
3238 self.assertEqual(packet[IP].src, self.nat_addr)
3239 self.assertEqual(packet[IP].dst, server.ip4)
3240 self.assertTrue(packet.haslayer(GRE))
3241 self.check_ip_checksum(packet)
3243 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3247 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3248 IP(src=server.ip4, dst=self.nat_addr) /
3250 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3251 TCP(sport=1234, dport=1234))
3252 self.pg0.add_stream(p)
3253 self.pg_enable_capture(self.pg_interfaces)
3255 p = self.pg0.get_capture(1)
3258 self.assertEqual(packet[IP].src, server_nat_ip)
3259 self.assertEqual(packet[IP].dst, host.ip4)
3260 self.assertTrue(packet.haslayer(GRE))
3261 self.check_ip_checksum(packet)
3263 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3266 def test_output_feature(self):
3267 """ NAT44 interface output feature (in2out postrouting) """
3268 self.nat44_add_address(self.nat_addr)
3269 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3270 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3271 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3275 pkts = self.create_stream_in(self.pg0, self.pg3)
3276 self.pg0.add_stream(pkts)
3277 self.pg_enable_capture(self.pg_interfaces)
3279 capture = self.pg3.get_capture(len(pkts))
3280 self.verify_capture_out(capture)
3283 pkts = self.create_stream_out(self.pg3)
3284 self.pg3.add_stream(pkts)
3285 self.pg_enable_capture(self.pg_interfaces)
3287 capture = self.pg0.get_capture(len(pkts))
3288 self.verify_capture_in(capture, self.pg0)
3290 # from non-NAT interface to NAT inside interface
3291 pkts = self.create_stream_in(self.pg2, self.pg0)
3292 self.pg2.add_stream(pkts)
3293 self.pg_enable_capture(self.pg_interfaces)
3295 capture = self.pg0.get_capture(len(pkts))
3296 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3298 def test_output_feature_vrf_aware(self):
3299 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3300 nat_ip_vrf10 = "10.0.0.10"
3301 nat_ip_vrf20 = "10.0.0.20"
3303 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3304 dst_address_length=32,
3305 next_hop_address=self.pg3.remote_ip4n,
3306 next_hop_sw_if_index=self.pg3.sw_if_index,
3308 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3309 dst_address_length=32,
3310 next_hop_address=self.pg3.remote_ip4n,
3311 next_hop_sw_if_index=self.pg3.sw_if_index,
3314 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3315 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3316 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3317 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3318 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3322 pkts = self.create_stream_in(self.pg4, self.pg3)
3323 self.pg4.add_stream(pkts)
3324 self.pg_enable_capture(self.pg_interfaces)
3326 capture = self.pg3.get_capture(len(pkts))
3327 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3330 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3331 self.pg3.add_stream(pkts)
3332 self.pg_enable_capture(self.pg_interfaces)
3334 capture = self.pg4.get_capture(len(pkts))
3335 self.verify_capture_in(capture, self.pg4)
3338 pkts = self.create_stream_in(self.pg6, self.pg3)
3339 self.pg6.add_stream(pkts)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg3.get_capture(len(pkts))
3343 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3346 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3347 self.pg3.add_stream(pkts)
3348 self.pg_enable_capture(self.pg_interfaces)
3350 capture = self.pg6.get_capture(len(pkts))
3351 self.verify_capture_in(capture, self.pg6)
3353 def test_output_feature_hairpinning(self):
3354 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3355 host = self.pg0.remote_hosts[0]
3356 server = self.pg0.remote_hosts[1]
3359 server_in_port = 5678
3360 server_out_port = 8765
3362 self.nat44_add_address(self.nat_addr)
3363 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3364 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3367 # add static mapping for server
3368 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3369 server_in_port, server_out_port,
3370 proto=IP_PROTOS.tcp)
3372 # send packet from host to server
3373 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3374 IP(src=host.ip4, dst=self.nat_addr) /
3375 TCP(sport=host_in_port, dport=server_out_port))
3376 self.pg0.add_stream(p)
3377 self.pg_enable_capture(self.pg_interfaces)
3379 capture = self.pg0.get_capture(1)
3384 self.assertEqual(ip.src, self.nat_addr)
3385 self.assertEqual(ip.dst, server.ip4)
3386 self.assertNotEqual(tcp.sport, host_in_port)
3387 self.assertEqual(tcp.dport, server_in_port)
3388 self.check_tcp_checksum(p)
3389 host_out_port = tcp.sport
3391 self.logger.error(ppp("Unexpected or invalid packet:", p))
3394 # send reply from server to host
3395 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3396 IP(src=server.ip4, dst=self.nat_addr) /
3397 TCP(sport=server_in_port, dport=host_out_port))
3398 self.pg0.add_stream(p)
3399 self.pg_enable_capture(self.pg_interfaces)
3401 capture = self.pg0.get_capture(1)
3406 self.assertEqual(ip.src, self.nat_addr)
3407 self.assertEqual(ip.dst, host.ip4)
3408 self.assertEqual(tcp.sport, server_out_port)
3409 self.assertEqual(tcp.dport, host_in_port)
3410 self.check_tcp_checksum(p)
3412 self.logger.error(ppp("Unexpected or invalid packet:", p))
3415 def test_output_feature_and_service(self):
3416 """ NAT44 interface output feature and services """
3417 external_addr = '1.2.3.4'
3421 self.vapi.nat44_forwarding_enable_disable(1)
3422 self.nat44_add_address(self.nat_addr)
3423 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3424 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3425 local_port, external_port,
3426 proto=IP_PROTOS.tcp, out2in_only=1)
3427 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3428 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3430 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3433 # from client to service
3434 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3435 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3436 TCP(sport=12345, dport=external_port))
3437 self.pg1.add_stream(p)
3438 self.pg_enable_capture(self.pg_interfaces)
3440 capture = self.pg0.get_capture(1)
3446 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3447 self.assertEqual(tcp.dport, local_port)
3448 self.check_tcp_checksum(p)
3449 self.check_ip_checksum(p)
3451 self.logger.error(ppp("Unexpected or invalid packet:", p))
3454 # from service back to client
3455 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3456 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3457 TCP(sport=local_port, dport=12345))
3458 self.pg0.add_stream(p)
3459 self.pg_enable_capture(self.pg_interfaces)
3461 capture = self.pg1.get_capture(1)
3466 self.assertEqual(ip.src, external_addr)
3467 self.assertEqual(tcp.sport, external_port)
3468 self.check_tcp_checksum(p)
3469 self.check_ip_checksum(p)
3471 self.logger.error(ppp("Unexpected or invalid packet:", p))
3474 # from local network host to external network
3475 pkts = self.create_stream_in(self.pg0, self.pg1)
3476 self.pg0.add_stream(pkts)
3477 self.pg_enable_capture(self.pg_interfaces)
3479 capture = self.pg1.get_capture(len(pkts))
3480 self.verify_capture_out(capture)
3481 pkts = self.create_stream_in(self.pg0, self.pg1)
3482 self.pg0.add_stream(pkts)
3483 self.pg_enable_capture(self.pg_interfaces)
3485 capture = self.pg1.get_capture(len(pkts))
3486 self.verify_capture_out(capture)
3488 # from external network back to local network host
3489 pkts = self.create_stream_out(self.pg1)
3490 self.pg1.add_stream(pkts)
3491 self.pg_enable_capture(self.pg_interfaces)
3493 capture = self.pg0.get_capture(len(pkts))
3494 self.verify_capture_in(capture, self.pg0)
3496 def test_output_feature_and_service2(self):
3497 """ NAT44 interface output feature and service host direct access """
3498 self.vapi.nat44_forwarding_enable_disable(1)
3499 self.nat44_add_address(self.nat_addr)
3500 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3503 # session initiaded from service host - translate
3504 pkts = self.create_stream_in(self.pg0, self.pg1)
3505 self.pg0.add_stream(pkts)
3506 self.pg_enable_capture(self.pg_interfaces)
3508 capture = self.pg1.get_capture(len(pkts))
3509 self.verify_capture_out(capture)
3511 pkts = self.create_stream_out(self.pg1)
3512 self.pg1.add_stream(pkts)
3513 self.pg_enable_capture(self.pg_interfaces)
3515 capture = self.pg0.get_capture(len(pkts))
3516 self.verify_capture_in(capture, self.pg0)
3518 tcp_port_out = self.tcp_port_out
3519 udp_port_out = self.udp_port_out
3520 icmp_id_out = self.icmp_id_out
3522 # session initiaded from remote host - do not translate
3523 pkts = self.create_stream_out(self.pg1,
3524 self.pg0.remote_ip4,
3525 use_inside_ports=True)
3526 self.pg1.add_stream(pkts)
3527 self.pg_enable_capture(self.pg_interfaces)
3529 capture = self.pg0.get_capture(len(pkts))
3530 self.verify_capture_in(capture, self.pg0)
3532 pkts = self.create_stream_in(self.pg0, self.pg1)
3533 self.pg0.add_stream(pkts)
3534 self.pg_enable_capture(self.pg_interfaces)
3536 capture = self.pg1.get_capture(len(pkts))
3537 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3540 def test_output_feature_and_service3(self):
3541 """ NAT44 interface output feature and DST NAT """
3542 external_addr = '1.2.3.4'
3546 self.vapi.nat44_forwarding_enable_disable(1)
3547 self.nat44_add_address(self.nat_addr)
3548 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3549 local_port, external_port,
3550 proto=IP_PROTOS.tcp, out2in_only=1)
3551 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3552 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3554 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3557 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3558 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3559 TCP(sport=12345, dport=external_port))
3560 self.pg0.add_stream(p)
3561 self.pg_enable_capture(self.pg_interfaces)
3563 capture = self.pg1.get_capture(1)
3568 self.assertEqual(ip.src, self.pg0.remote_ip4)
3569 self.assertEqual(tcp.sport, 12345)
3570 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3571 self.assertEqual(tcp.dport, local_port)
3572 self.check_tcp_checksum(p)
3573 self.check_ip_checksum(p)
3575 self.logger.error(ppp("Unexpected or invalid packet:", p))
3578 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3579 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3580 TCP(sport=local_port, dport=12345))
3581 self.pg1.add_stream(p)
3582 self.pg_enable_capture(self.pg_interfaces)
3584 capture = self.pg0.get_capture(1)
3589 self.assertEqual(ip.src, external_addr)
3590 self.assertEqual(tcp.sport, external_port)
3591 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3592 self.assertEqual(tcp.dport, 12345)
3593 self.check_tcp_checksum(p)
3594 self.check_ip_checksum(p)
3596 self.logger.error(ppp("Unexpected or invalid packet:", p))
3599 def test_one_armed_nat44(self):
3600 """ One armed NAT44 """
3601 remote_host = self.pg9.remote_hosts[0]
3602 local_host = self.pg9.remote_hosts[1]
3605 self.nat44_add_address(self.nat_addr)
3606 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3607 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3611 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3612 IP(src=local_host.ip4, dst=remote_host.ip4) /
3613 TCP(sport=12345, dport=80))
3614 self.pg9.add_stream(p)
3615 self.pg_enable_capture(self.pg_interfaces)
3617 capture = self.pg9.get_capture(1)
3622 self.assertEqual(ip.src, self.nat_addr)
3623 self.assertEqual(ip.dst, remote_host.ip4)
3624 self.assertNotEqual(tcp.sport, 12345)
3625 external_port = tcp.sport
3626 self.assertEqual(tcp.dport, 80)
3627 self.check_tcp_checksum(p)
3628 self.check_ip_checksum(p)
3630 self.logger.error(ppp("Unexpected or invalid packet:", p))
3634 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3635 IP(src=remote_host.ip4, dst=self.nat_addr) /
3636 TCP(sport=80, dport=external_port))
3637 self.pg9.add_stream(p)
3638 self.pg_enable_capture(self.pg_interfaces)
3640 capture = self.pg9.get_capture(1)
3645 self.assertEqual(ip.src, remote_host.ip4)
3646 self.assertEqual(ip.dst, local_host.ip4)
3647 self.assertEqual(tcp.sport, 80)
3648 self.assertEqual(tcp.dport, 12345)
3649 self.check_tcp_checksum(p)
3650 self.check_ip_checksum(p)
3652 self.logger.error(ppp("Unexpected or invalid packet:", p))
3655 def test_one_armed_nat44_static(self):
3656 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3657 remote_host = self.pg9.remote_hosts[0]
3658 local_host = self.pg9.remote_hosts[1]
3663 self.vapi.nat44_forwarding_enable_disable(1)
3664 self.nat44_add_address(self.nat_addr, twice_nat=1)
3665 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3666 local_port, external_port,
3667 proto=IP_PROTOS.tcp, out2in_only=1,
3669 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3670 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3673 # from client to service
3674 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3675 IP(src=remote_host.ip4, dst=self.nat_addr) /
3676 TCP(sport=12345, dport=external_port))
3677 self.pg9.add_stream(p)
3678 self.pg_enable_capture(self.pg_interfaces)
3680 capture = self.pg9.get_capture(1)
3686 self.assertEqual(ip.dst, local_host.ip4)
3687 self.assertEqual(ip.src, self.nat_addr)
3688 self.assertEqual(tcp.dport, local_port)
3689 self.assertNotEqual(tcp.sport, 12345)
3690 eh_port_in = tcp.sport
3691 self.check_tcp_checksum(p)
3692 self.check_ip_checksum(p)
3694 self.logger.error(ppp("Unexpected or invalid packet:", p))
3697 # from service back to client
3698 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3699 IP(src=local_host.ip4, dst=self.nat_addr) /
3700 TCP(sport=local_port, dport=eh_port_in))
3701 self.pg9.add_stream(p)
3702 self.pg_enable_capture(self.pg_interfaces)
3704 capture = self.pg9.get_capture(1)
3709 self.assertEqual(ip.src, self.nat_addr)
3710 self.assertEqual(ip.dst, remote_host.ip4)
3711 self.assertEqual(tcp.sport, external_port)
3712 self.assertEqual(tcp.dport, 12345)
3713 self.check_tcp_checksum(p)
3714 self.check_ip_checksum(p)
3716 self.logger.error(ppp("Unexpected or invalid packet:", p))
3719 def test_del_session(self):
3720 """ Delete NAT44 session """
3721 self.nat44_add_address(self.nat_addr)
3722 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3723 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3726 pkts = self.create_stream_in(self.pg0, self.pg1)
3727 self.pg0.add_stream(pkts)
3728 self.pg_enable_capture(self.pg_interfaces)
3730 capture = self.pg1.get_capture(len(pkts))
3732 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3733 nsessions = len(sessions)
3735 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3736 sessions[0].inside_port,
3737 sessions[0].protocol)
3738 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3739 sessions[1].outside_port,
3740 sessions[1].protocol,
3743 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3744 self.assertEqual(nsessions - len(sessions), 2)
3746 def test_set_get_reass(self):
3747 """ NAT44 set/get virtual fragmentation reassembly """
3748 reas_cfg1 = self.vapi.nat_get_reass()
3750 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3751 max_reass=reas_cfg1.ip4_max_reass * 2,
3752 max_frag=reas_cfg1.ip4_max_frag * 2)
3754 reas_cfg2 = self.vapi.nat_get_reass()
3756 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3757 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3758 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3760 self.vapi.nat_set_reass(drop_frag=1)
3761 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3763 def test_frag_in_order(self):
3764 """ NAT44 translate fragments arriving in order """
3765 self.nat44_add_address(self.nat_addr)
3766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3770 data = "A" * 4 + "B" * 16 + "C" * 3
3771 self.tcp_port_in = random.randint(1025, 65535)
3773 reass = self.vapi.nat_reass_dump()
3774 reass_n_start = len(reass)
3777 pkts = self.create_stream_frag(self.pg0,
3778 self.pg1.remote_ip4,
3782 self.pg0.add_stream(pkts)
3783 self.pg_enable_capture(self.pg_interfaces)
3785 frags = self.pg1.get_capture(len(pkts))
3786 p = self.reass_frags_and_verify(frags,
3788 self.pg1.remote_ip4)
3789 self.assertEqual(p[TCP].dport, 20)
3790 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3791 self.tcp_port_out = p[TCP].sport
3792 self.assertEqual(data, p[Raw].load)
3795 pkts = self.create_stream_frag(self.pg1,
3800 self.pg1.add_stream(pkts)
3801 self.pg_enable_capture(self.pg_interfaces)
3803 frags = self.pg0.get_capture(len(pkts))
3804 p = self.reass_frags_and_verify(frags,
3805 self.pg1.remote_ip4,
3806 self.pg0.remote_ip4)
3807 self.assertEqual(p[TCP].sport, 20)
3808 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3809 self.assertEqual(data, p[Raw].load)
3811 reass = self.vapi.nat_reass_dump()
3812 reass_n_end = len(reass)
3814 self.assertEqual(reass_n_end - reass_n_start, 2)
3816 def test_reass_hairpinning(self):
3817 """ NAT44 fragments hairpinning """
3818 host = self.pg0.remote_hosts[0]
3819 server = self.pg0.remote_hosts[1]
3820 host_in_port = random.randint(1025, 65535)
3822 server_in_port = random.randint(1025, 65535)
3823 server_out_port = random.randint(1025, 65535)
3824 data = "A" * 4 + "B" * 16 + "C" * 3
3826 self.nat44_add_address(self.nat_addr)
3827 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3828 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3830 # add static mapping for server
3831 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3832 server_in_port, server_out_port,
3833 proto=IP_PROTOS.tcp)
3835 # send packet from host to server
3836 pkts = self.create_stream_frag(self.pg0,
3841 self.pg0.add_stream(pkts)
3842 self.pg_enable_capture(self.pg_interfaces)
3844 frags = self.pg0.get_capture(len(pkts))
3845 p = self.reass_frags_and_verify(frags,
3848 self.assertNotEqual(p[TCP].sport, host_in_port)
3849 self.assertEqual(p[TCP].dport, server_in_port)
3850 self.assertEqual(data, p[Raw].load)
3852 def test_frag_out_of_order(self):
3853 """ NAT44 translate fragments arriving out of order """
3854 self.nat44_add_address(self.nat_addr)
3855 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3856 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3859 data = "A" * 4 + "B" * 16 + "C" * 3
3860 random.randint(1025, 65535)
3863 pkts = self.create_stream_frag(self.pg0,
3864 self.pg1.remote_ip4,
3869 self.pg0.add_stream(pkts)
3870 self.pg_enable_capture(self.pg_interfaces)
3872 frags = self.pg1.get_capture(len(pkts))
3873 p = self.reass_frags_and_verify(frags,
3875 self.pg1.remote_ip4)
3876 self.assertEqual(p[TCP].dport, 20)
3877 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3878 self.tcp_port_out = p[TCP].sport
3879 self.assertEqual(data, p[Raw].load)
3882 pkts = self.create_stream_frag(self.pg1,
3888 self.pg1.add_stream(pkts)
3889 self.pg_enable_capture(self.pg_interfaces)
3891 frags = self.pg0.get_capture(len(pkts))
3892 p = self.reass_frags_and_verify(frags,
3893 self.pg1.remote_ip4,
3894 self.pg0.remote_ip4)
3895 self.assertEqual(p[TCP].sport, 20)
3896 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3897 self.assertEqual(data, p[Raw].load)
3899 def test_port_restricted(self):
3900 """ Port restricted NAT44 (MAP-E CE) """
3901 self.nat44_add_address(self.nat_addr)
3902 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3903 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3905 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3906 "psid-offset 6 psid-len 6")
3908 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3909 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3910 TCP(sport=4567, dport=22))
3911 self.pg0.add_stream(p)
3912 self.pg_enable_capture(self.pg_interfaces)
3914 capture = self.pg1.get_capture(1)
3919 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3920 self.assertEqual(ip.src, self.nat_addr)
3921 self.assertEqual(tcp.dport, 22)
3922 self.assertNotEqual(tcp.sport, 4567)
3923 self.assertEqual((tcp.sport >> 6) & 63, 10)
3924 self.check_tcp_checksum(p)
3925 self.check_ip_checksum(p)
3927 self.logger.error(ppp("Unexpected or invalid packet:", p))
3930 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3932 twice_nat_addr = '10.0.1.3'
3940 port_in1 = port_in+1
3941 port_in2 = port_in+2
3946 server1 = self.pg0.remote_hosts[0]
3947 server2 = self.pg0.remote_hosts[1]
3959 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3962 self.nat44_add_address(self.nat_addr)
3963 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3965 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3967 proto=IP_PROTOS.tcp,
3968 twice_nat=int(not self_twice_nat),
3969 self_twice_nat=int(self_twice_nat))
3971 locals = [{'addr': server1.ip4n,
3974 {'addr': server2.ip4n,
3977 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3978 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3982 not self_twice_nat),
3985 local_num=len(locals),
3987 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3988 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3995 assert client_id is not None
3997 client = self.pg0.remote_hosts[0]
3998 elif client_id == 2:
3999 client = self.pg0.remote_hosts[1]
4001 client = pg1.remote_hosts[0]
4002 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
4003 IP(src=client.ip4, dst=self.nat_addr) /
4004 TCP(sport=eh_port_out, dport=port_out))
4006 self.pg_enable_capture(self.pg_interfaces)
4008 capture = pg0.get_capture(1)
4014 if ip.dst == server1.ip4:
4020 self.assertEqual(ip.dst, server.ip4)
4022 self.assertIn(tcp.dport, [port_in1, port_in2])
4024 self.assertEqual(tcp.dport, port_in)
4026 self.assertEqual(ip.src, twice_nat_addr)
4027 self.assertNotEqual(tcp.sport, eh_port_out)
4029 self.assertEqual(ip.src, client.ip4)
4030 self.assertEqual(tcp.sport, eh_port_out)
4032 eh_port_in = tcp.sport
4033 saved_port_in = tcp.dport
4034 self.check_tcp_checksum(p)
4035 self.check_ip_checksum(p)
4037 self.logger.error(ppp("Unexpected or invalid packet:", p))
4040 p = (Ether(src=server.mac, dst=pg0.local_mac) /
4041 IP(src=server.ip4, dst=eh_addr_in) /
4042 TCP(sport=saved_port_in, dport=eh_port_in))
4044 self.pg_enable_capture(self.pg_interfaces)
4046 capture = pg1.get_capture(1)
4051 self.assertEqual(ip.dst, client.ip4)
4052 self.assertEqual(ip.src, self.nat_addr)
4053 self.assertEqual(tcp.dport, eh_port_out)
4054 self.assertEqual(tcp.sport, port_out)
4055 self.check_tcp_checksum(p)
4056 self.check_ip_checksum(p)
4058 self.logger.error(ppp("Unexpected or invalid packet:", p))
4061 def test_twice_nat(self):
4063 self.twice_nat_common()
4065 def test_self_twice_nat_positive(self):
4066 """ Self Twice NAT44 (positive test) """
4067 self.twice_nat_common(self_twice_nat=True, same_pg=True)
4069 def test_self_twice_nat_negative(self):
4070 """ Self Twice NAT44 (negative test) """
4071 self.twice_nat_common(self_twice_nat=True)
4073 def test_twice_nat_lb(self):
4074 """ Twice NAT44 local service load balancing """
4075 self.twice_nat_common(lb=True)
4077 def test_self_twice_nat_lb_positive(self):
4078 """ Self Twice NAT44 local service load balancing (positive test) """
4079 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4082 def test_self_twice_nat_lb_negative(self):
4083 """ Self Twice NAT44 local service load balancing (negative test) """
4084 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
4087 def test_twice_nat_interface_addr(self):
4088 """ Acquire twice NAT44 addresses from interface """
4089 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
4091 # no address in NAT pool
4092 adresses = self.vapi.nat44_address_dump()
4093 self.assertEqual(0, len(adresses))
4095 # configure interface address and check NAT address pool
4096 self.pg7.config_ip4()
4097 adresses = self.vapi.nat44_address_dump()
4098 self.assertEqual(1, len(adresses))
4099 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
4100 self.assertEqual(adresses[0].twice_nat, 1)
4102 # remove interface address and check NAT address pool
4103 self.pg7.unconfig_ip4()
4104 adresses = self.vapi.nat44_address_dump()
4105 self.assertEqual(0, len(adresses))
4107 def test_ipfix_max_frags(self):
4108 """ IPFIX logging maximum fragments pending reassembly exceeded """
4109 self.nat44_add_address(self.nat_addr)
4110 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4111 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4113 self.vapi.nat_set_reass(max_frag=0)
4114 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4115 src_address=self.pg3.local_ip4n,
4117 template_interval=10)
4118 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4119 src_port=self.ipfix_src_port)
4121 data = "A" * 4 + "B" * 16 + "C" * 3
4122 self.tcp_port_in = random.randint(1025, 65535)
4123 pkts = self.create_stream_frag(self.pg0,
4124 self.pg1.remote_ip4,
4128 self.pg0.add_stream(pkts[-1])
4129 self.pg_enable_capture(self.pg_interfaces)
4131 frags = self.pg1.get_capture(0)
4132 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4133 capture = self.pg3.get_capture(9)
4134 ipfix = IPFIXDecoder()
4135 # first load template
4137 self.assertTrue(p.haslayer(IPFIX))
4138 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4139 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4140 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4141 self.assertEqual(p[UDP].dport, 4739)
4142 self.assertEqual(p[IPFIX].observationDomainID,
4143 self.ipfix_domain_id)
4144 if p.haslayer(Template):
4145 ipfix.add_template(p.getlayer(Template))
4146 # verify events in data set
4148 if p.haslayer(Data):
4149 data = ipfix.decode_data_set(p.getlayer(Set))
4150 self.verify_ipfix_max_fragments_ip4(data, 0,
4151 self.pg0.remote_ip4n)
4153 def test_tcp_session_close_in(self):
4154 """ Close TCP session from inside network """
4155 self.nat44_add_address(self.nat_addr)
4156 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4157 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4160 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4161 start_sessnum = len(sessions)
4163 self.initiate_tcp_session(self.pg0, self.pg1)
4165 # close the session from inside
4167 # FIN packet in -> out
4168 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4169 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4170 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4172 self.pg0.add_stream(p)
4173 self.pg_enable_capture(self.pg_interfaces)
4175 self.pg1.get_capture(1)
4179 # ACK packet out -> in
4180 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4181 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4182 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4186 # FIN packet out -> in
4187 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4188 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4189 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4193 self.pg1.add_stream(pkts)
4194 self.pg_enable_capture(self.pg_interfaces)
4196 self.pg0.get_capture(2)
4198 # ACK packet in -> out
4199 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4200 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4201 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4203 self.pg0.add_stream(p)
4204 self.pg_enable_capture(self.pg_interfaces)
4206 self.pg1.get_capture(1)
4208 self.initiate_tcp_session(self.pg0, self.pg1)
4209 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4211 self.assertEqual(len(sessions) - start_sessnum, 1)
4213 self.logger.error("TCP session termination failed")
4216 def test_tcp_session_close_out(self):
4217 """ Close TCP session from outside network """
4218 self.nat44_add_address(self.nat_addr)
4219 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4220 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4223 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4224 start_sessnum = len(sessions)
4226 self.initiate_tcp_session(self.pg0, self.pg1)
4228 # close the session from outside
4230 # FIN packet out -> in
4231 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4232 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4233 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4235 self.pg1.add_stream(p)
4236 self.pg_enable_capture(self.pg_interfaces)
4238 self.pg0.get_capture(1)
4242 # ACK packet in -> out
4243 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4244 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4245 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4249 # ACK packet in -> out
4250 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4251 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4252 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4256 self.pg0.add_stream(pkts)
4257 self.pg_enable_capture(self.pg_interfaces)
4259 self.pg1.get_capture(2)
4261 # ACK packet out -> in
4262 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4263 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4264 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4266 self.pg1.add_stream(p)
4267 self.pg_enable_capture(self.pg_interfaces)
4269 self.pg0.get_capture(1)
4271 self.initiate_tcp_session(self.pg0, self.pg1)
4272 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4274 self.assertEqual(len(sessions) - start_sessnum, 1)
4276 self.logger.error("TCP session termination failed")
4279 def test_tcp_session_close_simultaneous(self):
4280 """ Close TCP session from inside network """
4281 self.nat44_add_address(self.nat_addr)
4282 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4283 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4286 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4287 start_sessnum = len(sessions)
4289 self.initiate_tcp_session(self.pg0, self.pg1)
4291 # close the session from inside
4293 # FIN packet in -> out
4294 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4295 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4296 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4298 self.pg0.add_stream(p)
4299 self.pg_enable_capture(self.pg_interfaces)
4301 self.pg1.get_capture(1)
4303 # FIN packet out -> in
4304 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4305 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4306 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4308 self.pg1.add_stream(p)
4309 self.pg_enable_capture(self.pg_interfaces)
4311 self.pg0.get_capture(1)
4313 # ACK packet in -> out
4314 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4315 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4316 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4318 self.pg0.add_stream(p)
4319 self.pg_enable_capture(self.pg_interfaces)
4321 self.pg1.get_capture(1)
4323 # ACK packet out -> in
4324 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4325 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4326 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4328 self.pg1.add_stream(p)
4329 self.pg_enable_capture(self.pg_interfaces)
4331 self.pg0.get_capture(1)
4333 self.initiate_tcp_session(self.pg0, self.pg1)
4334 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4336 self.assertEqual(len(sessions) - start_sessnum, 1)
4338 self.logger.error("TCP session termination failed")
4342 super(TestNAT44, self).tearDown()
4343 if not self.vpp_dead:
4344 self.logger.info(self.vapi.cli("show nat44 addresses"))
4345 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4346 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4347 self.logger.info(self.vapi.cli("show nat44 interface address"))
4348 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4349 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4350 self.vapi.cli("nat addr-port-assignment-alg default")
4354 class TestNAT44Out2InDPO(MethodHolder):
4355 """ NAT44 Test Cases using out2in DPO """
4358 def setUpConstants(cls):
4359 super(TestNAT44Out2InDPO, cls).setUpConstants()
4360 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4363 def setUpClass(cls):
4364 super(TestNAT44Out2InDPO, cls).setUpClass()
4367 cls.tcp_port_in = 6303
4368 cls.tcp_port_out = 6303
4369 cls.udp_port_in = 6304
4370 cls.udp_port_out = 6304
4371 cls.icmp_id_in = 6305
4372 cls.icmp_id_out = 6305
4373 cls.nat_addr = '10.0.0.3'
4374 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4375 cls.dst_ip4 = '192.168.70.1'
4377 cls.create_pg_interfaces(range(2))
4380 cls.pg0.config_ip4()
4381 cls.pg0.resolve_arp()
4384 cls.pg1.config_ip6()
4385 cls.pg1.resolve_ndp()
4387 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4388 dst_address_length=0,
4389 next_hop_address=cls.pg1.remote_ip6n,
4390 next_hop_sw_if_index=cls.pg1.sw_if_index)
4393 super(TestNAT44Out2InDPO, cls).tearDownClass()
4396 def configure_xlat(self):
4397 self.dst_ip6_pfx = '1:2:3::'
4398 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4400 self.dst_ip6_pfx_len = 96
4401 self.src_ip6_pfx = '4:5:6::'
4402 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4404 self.src_ip6_pfx_len = 96
4405 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4406 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4407 '\x00\x00\x00\x00', 0, is_translation=1,
4410 def test_464xlat_ce(self):
4411 """ Test 464XLAT CE with NAT44 """
4413 self.configure_xlat()
4415 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4416 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4418 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4419 self.dst_ip6_pfx_len)
4420 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4421 self.src_ip6_pfx_len)
4424 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4425 self.pg0.add_stream(pkts)
4426 self.pg_enable_capture(self.pg_interfaces)
4428 capture = self.pg1.get_capture(len(pkts))
4429 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4432 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4434 self.pg1.add_stream(pkts)
4435 self.pg_enable_capture(self.pg_interfaces)
4437 capture = self.pg0.get_capture(len(pkts))
4438 self.verify_capture_in(capture, self.pg0)
4440 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4442 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4443 self.nat_addr_n, is_add=0)
4445 def test_464xlat_ce_no_nat(self):
4446 """ Test 464XLAT CE without NAT44 """
4448 self.configure_xlat()
4450 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4451 self.dst_ip6_pfx_len)
4452 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4453 self.src_ip6_pfx_len)
4455 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4456 self.pg0.add_stream(pkts)
4457 self.pg_enable_capture(self.pg_interfaces)
4459 capture = self.pg1.get_capture(len(pkts))
4460 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4461 nat_ip=out_dst_ip6, same_port=True)
4463 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4464 self.pg1.add_stream(pkts)
4465 self.pg_enable_capture(self.pg_interfaces)
4467 capture = self.pg0.get_capture(len(pkts))
4468 self.verify_capture_in(capture, self.pg0)
4471 class TestDeterministicNAT(MethodHolder):
4472 """ Deterministic NAT Test Cases """
4475 def setUpConstants(cls):
4476 super(TestDeterministicNAT, cls).setUpConstants()
4477 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4480 def setUpClass(cls):
4481 super(TestDeterministicNAT, cls).setUpClass()
4484 cls.tcp_port_in = 6303
4485 cls.tcp_external_port = 6303
4486 cls.udp_port_in = 6304
4487 cls.udp_external_port = 6304
4488 cls.icmp_id_in = 6305
4489 cls.nat_addr = '10.0.0.3'
4491 cls.create_pg_interfaces(range(3))
4492 cls.interfaces = list(cls.pg_interfaces)
4494 for i in cls.interfaces:
4499 cls.pg0.generate_remote_hosts(2)
4500 cls.pg0.configure_ipv4_neighbors()
4503 super(TestDeterministicNAT, cls).tearDownClass()
4506 def create_stream_in(self, in_if, out_if, ttl=64):
4508 Create packet stream for inside network
4510 :param in_if: Inside interface
4511 :param out_if: Outside interface
4512 :param ttl: TTL of generated packets
4516 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4517 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4518 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4522 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4523 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4524 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4528 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4529 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4530 ICMP(id=self.icmp_id_in, type='echo-request'))
4535 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4537 Create packet stream for outside network
4539 :param out_if: Outside interface
4540 :param dst_ip: Destination IP address (Default use global NAT address)
4541 :param ttl: TTL of generated packets
4544 dst_ip = self.nat_addr
4547 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4548 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4549 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4553 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4554 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4555 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4559 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4560 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4561 ICMP(id=self.icmp_external_id, type='echo-reply'))
4566 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4568 Verify captured packets on outside network
4570 :param capture: Captured packets
4571 :param nat_ip: Translated IP address (Default use global NAT address)
4572 :param same_port: Sorce port number is not translated (Default False)
4573 :param packet_num: Expected number of packets (Default 3)
4576 nat_ip = self.nat_addr
4577 self.assertEqual(packet_num, len(capture))
4578 for packet in capture:
4580 self.assertEqual(packet[IP].src, nat_ip)
4581 if packet.haslayer(TCP):
4582 self.tcp_port_out = packet[TCP].sport
4583 elif packet.haslayer(UDP):
4584 self.udp_port_out = packet[UDP].sport
4586 self.icmp_external_id = packet[ICMP].id
4588 self.logger.error(ppp("Unexpected or invalid packet "
4589 "(outside network):", packet))
4592 def verify_ipfix_max_entries_per_user(self, data):
4594 Verify IPFIX maximum entries per user exceeded event
4596 :param data: Decoded IPFIX data records
4598 self.assertEqual(1, len(data))
4601 self.assertEqual(ord(record[230]), 13)
4602 # natQuotaExceededEvent
4603 self.assertEqual('\x03\x00\x00\x00', record[466])
4605 self.assertEqual('\xe8\x03\x00\x00', record[473])
4607 self.assertEqual(self.pg0.remote_ip4n, record[8])
4609 def test_deterministic_mode(self):
4610 """ NAT plugin run deterministic mode """
4611 in_addr = '172.16.255.0'
4612 out_addr = '172.17.255.50'
4613 in_addr_t = '172.16.255.20'
4614 in_addr_n = socket.inet_aton(in_addr)
4615 out_addr_n = socket.inet_aton(out_addr)
4616 in_addr_t_n = socket.inet_aton(in_addr_t)
4620 nat_config = self.vapi.nat_show_config()
4621 self.assertEqual(1, nat_config.deterministic)
4623 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4625 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4626 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4627 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4628 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4630 deterministic_mappings = self.vapi.nat_det_map_dump()
4631 self.assertEqual(len(deterministic_mappings), 1)
4632 dsm = deterministic_mappings[0]
4633 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4634 self.assertEqual(in_plen, dsm.in_plen)
4635 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4636 self.assertEqual(out_plen, dsm.out_plen)
4638 self.clear_nat_det()
4639 deterministic_mappings = self.vapi.nat_det_map_dump()
4640 self.assertEqual(len(deterministic_mappings), 0)
4642 def test_set_timeouts(self):
4643 """ Set deterministic NAT timeouts """
4644 timeouts_before = self.vapi.nat_det_get_timeouts()
4646 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4647 timeouts_before.tcp_established + 10,
4648 timeouts_before.tcp_transitory + 10,
4649 timeouts_before.icmp + 10)
4651 timeouts_after = self.vapi.nat_det_get_timeouts()
4653 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4654 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4655 self.assertNotEqual(timeouts_before.tcp_established,
4656 timeouts_after.tcp_established)
4657 self.assertNotEqual(timeouts_before.tcp_transitory,
4658 timeouts_after.tcp_transitory)
4660 def test_det_in(self):
4661 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4663 nat_ip = "10.0.0.10"
4665 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4667 socket.inet_aton(nat_ip),
4669 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4670 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4674 pkts = self.create_stream_in(self.pg0, self.pg1)
4675 self.pg0.add_stream(pkts)
4676 self.pg_enable_capture(self.pg_interfaces)
4678 capture = self.pg1.get_capture(len(pkts))
4679 self.verify_capture_out(capture, nat_ip)
4682 pkts = self.create_stream_out(self.pg1, nat_ip)
4683 self.pg1.add_stream(pkts)
4684 self.pg_enable_capture(self.pg_interfaces)
4686 capture = self.pg0.get_capture(len(pkts))
4687 self.verify_capture_in(capture, self.pg0)
4690 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4691 self.assertEqual(len(sessions), 3)
4695 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4696 self.assertEqual(s.in_port, self.tcp_port_in)
4697 self.assertEqual(s.out_port, self.tcp_port_out)
4698 self.assertEqual(s.ext_port, self.tcp_external_port)
4702 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4703 self.assertEqual(s.in_port, self.udp_port_in)
4704 self.assertEqual(s.out_port, self.udp_port_out)
4705 self.assertEqual(s.ext_port, self.udp_external_port)
4709 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4710 self.assertEqual(s.in_port, self.icmp_id_in)
4711 self.assertEqual(s.out_port, self.icmp_external_id)
4713 def test_multiple_users(self):
4714 """ Deterministic NAT multiple users """
4716 nat_ip = "10.0.0.10"
4718 external_port = 6303
4720 host0 = self.pg0.remote_hosts[0]
4721 host1 = self.pg0.remote_hosts[1]
4723 self.vapi.nat_det_add_del_map(host0.ip4n,
4725 socket.inet_aton(nat_ip),
4727 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4728 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4732 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4733 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4734 TCP(sport=port_in, dport=external_port))
4735 self.pg0.add_stream(p)
4736 self.pg_enable_capture(self.pg_interfaces)
4738 capture = self.pg1.get_capture(1)
4743 self.assertEqual(ip.src, nat_ip)
4744 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4745 self.assertEqual(tcp.dport, external_port)
4746 port_out0 = tcp.sport
4748 self.logger.error(ppp("Unexpected or invalid packet:", p))
4752 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4753 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4754 TCP(sport=port_in, dport=external_port))
4755 self.pg0.add_stream(p)
4756 self.pg_enable_capture(self.pg_interfaces)
4758 capture = self.pg1.get_capture(1)
4763 self.assertEqual(ip.src, nat_ip)
4764 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4765 self.assertEqual(tcp.dport, external_port)
4766 port_out1 = tcp.sport
4768 self.logger.error(ppp("Unexpected or invalid packet:", p))
4771 dms = self.vapi.nat_det_map_dump()
4772 self.assertEqual(1, len(dms))
4773 self.assertEqual(2, dms[0].ses_num)
4776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4777 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4778 TCP(sport=external_port, dport=port_out0))
4779 self.pg1.add_stream(p)
4780 self.pg_enable_capture(self.pg_interfaces)
4782 capture = self.pg0.get_capture(1)
4787 self.assertEqual(ip.src, self.pg1.remote_ip4)
4788 self.assertEqual(ip.dst, host0.ip4)
4789 self.assertEqual(tcp.dport, port_in)
4790 self.assertEqual(tcp.sport, external_port)
4792 self.logger.error(ppp("Unexpected or invalid packet:", p))
4796 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4797 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4798 TCP(sport=external_port, dport=port_out1))
4799 self.pg1.add_stream(p)
4800 self.pg_enable_capture(self.pg_interfaces)
4802 capture = self.pg0.get_capture(1)
4807 self.assertEqual(ip.src, self.pg1.remote_ip4)
4808 self.assertEqual(ip.dst, host1.ip4)
4809 self.assertEqual(tcp.dport, port_in)
4810 self.assertEqual(tcp.sport, external_port)
4812 self.logger.error(ppp("Unexpected or invalid packet", p))
4815 # session close api test
4816 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4818 self.pg1.remote_ip4n,
4820 dms = self.vapi.nat_det_map_dump()
4821 self.assertEqual(dms[0].ses_num, 1)
4823 self.vapi.nat_det_close_session_in(host0.ip4n,
4825 self.pg1.remote_ip4n,
4827 dms = self.vapi.nat_det_map_dump()
4828 self.assertEqual(dms[0].ses_num, 0)
4830 def test_tcp_session_close_detection_in(self):
4831 """ Deterministic NAT TCP session close from inside network """
4832 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4834 socket.inet_aton(self.nat_addr),
4836 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4837 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4840 self.initiate_tcp_session(self.pg0, self.pg1)
4842 # close the session from inside
4844 # FIN packet in -> out
4845 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4846 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4847 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4849 self.pg0.add_stream(p)
4850 self.pg_enable_capture(self.pg_interfaces)
4852 self.pg1.get_capture(1)
4856 # ACK packet out -> in
4857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4859 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4863 # FIN packet out -> in
4864 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4865 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4866 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4870 self.pg1.add_stream(pkts)
4871 self.pg_enable_capture(self.pg_interfaces)
4873 self.pg0.get_capture(2)
4875 # ACK packet in -> out
4876 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4877 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4878 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4880 self.pg0.add_stream(p)
4881 self.pg_enable_capture(self.pg_interfaces)
4883 self.pg1.get_capture(1)
4885 # Check if deterministic NAT44 closed the session
4886 dms = self.vapi.nat_det_map_dump()
4887 self.assertEqual(0, dms[0].ses_num)
4889 self.logger.error("TCP session termination failed")
4892 def test_tcp_session_close_detection_out(self):
4893 """ Deterministic NAT TCP session close from outside network """
4894 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4896 socket.inet_aton(self.nat_addr),
4898 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4899 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4902 self.initiate_tcp_session(self.pg0, self.pg1)
4904 # close the session from outside
4906 # FIN packet out -> in
4907 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4908 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4909 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4911 self.pg1.add_stream(p)
4912 self.pg_enable_capture(self.pg_interfaces)
4914 self.pg0.get_capture(1)
4918 # ACK packet in -> out
4919 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4920 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4921 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4925 # ACK packet in -> out
4926 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4927 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4928 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4932 self.pg0.add_stream(pkts)
4933 self.pg_enable_capture(self.pg_interfaces)
4935 self.pg1.get_capture(2)
4937 # ACK packet out -> in
4938 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4939 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4940 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4942 self.pg1.add_stream(p)
4943 self.pg_enable_capture(self.pg_interfaces)
4945 self.pg0.get_capture(1)
4947 # Check if deterministic NAT44 closed the session
4948 dms = self.vapi.nat_det_map_dump()
4949 self.assertEqual(0, dms[0].ses_num)
4951 self.logger.error("TCP session termination failed")
4954 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4955 def test_session_timeout(self):
4956 """ Deterministic NAT session timeouts """
4957 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4959 socket.inet_aton(self.nat_addr),
4961 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4962 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4965 self.initiate_tcp_session(self.pg0, self.pg1)
4966 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4967 pkts = self.create_stream_in(self.pg0, self.pg1)
4968 self.pg0.add_stream(pkts)
4969 self.pg_enable_capture(self.pg_interfaces)
4971 capture = self.pg1.get_capture(len(pkts))
4974 dms = self.vapi.nat_det_map_dump()
4975 self.assertEqual(0, dms[0].ses_num)
4977 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4978 def test_session_limit_per_user(self):
4979 """ Deterministic NAT maximum sessions per user limit """
4980 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4982 socket.inet_aton(self.nat_addr),
4984 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4985 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4987 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4988 src_address=self.pg2.local_ip4n,
4990 template_interval=10)
4991 self.vapi.nat_ipfix()
4994 for port in range(1025, 2025):
4995 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4996 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4997 UDP(sport=port, dport=port))
5000 self.pg0.add_stream(pkts)
5001 self.pg_enable_capture(self.pg_interfaces)
5003 capture = self.pg1.get_capture(len(pkts))
5005 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5006 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
5007 UDP(sport=3001, dport=3002))
5008 self.pg0.add_stream(p)
5009 self.pg_enable_capture(self.pg_interfaces)
5011 capture = self.pg1.assert_nothing_captured()
5013 # verify ICMP error packet
5014 capture = self.pg0.get_capture(1)
5016 self.assertTrue(p.haslayer(ICMP))
5018 self.assertEqual(icmp.type, 3)
5019 self.assertEqual(icmp.code, 1)
5020 self.assertTrue(icmp.haslayer(IPerror))
5021 inner_ip = icmp[IPerror]
5022 self.assertEqual(inner_ip[UDPerror].sport, 3001)
5023 self.assertEqual(inner_ip[UDPerror].dport, 3002)
5025 dms = self.vapi.nat_det_map_dump()
5027 self.assertEqual(1000, dms[0].ses_num)
5029 # verify IPFIX logging
5030 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5032 capture = self.pg2.get_capture(2)
5033 ipfix = IPFIXDecoder()
5034 # first load template
5036 self.assertTrue(p.haslayer(IPFIX))
5037 if p.haslayer(Template):
5038 ipfix.add_template(p.getlayer(Template))
5039 # verify events in data set
5041 if p.haslayer(Data):
5042 data = ipfix.decode_data_set(p.getlayer(Set))
5043 self.verify_ipfix_max_entries_per_user(data)
5045 def clear_nat_det(self):
5047 Clear deterministic NAT configuration.
5049 self.vapi.nat_ipfix(enable=0)
5050 self.vapi.nat_det_set_timeouts()
5051 deterministic_mappings = self.vapi.nat_det_map_dump()
5052 for dsm in deterministic_mappings:
5053 self.vapi.nat_det_add_del_map(dsm.in_addr,
5059 interfaces = self.vapi.nat44_interface_dump()
5060 for intf in interfaces:
5061 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
5066 super(TestDeterministicNAT, self).tearDown()
5067 if not self.vpp_dead:
5068 self.logger.info(self.vapi.cli("show nat44 interfaces"))
5070 self.vapi.cli("show nat44 deterministic mappings"))
5072 self.vapi.cli("show nat44 deterministic timeouts"))
5074 self.vapi.cli("show nat44 deterministic sessions"))
5075 self.clear_nat_det()
5078 class TestNAT64(MethodHolder):
5079 """ NAT64 Test Cases """
5082 def setUpConstants(cls):
5083 super(TestNAT64, cls).setUpConstants()
5084 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
5085 "nat64 st hash buckets 256", "}"])
5088 def setUpClass(cls):
5089 super(TestNAT64, cls).setUpClass()
5092 cls.tcp_port_in = 6303
5093 cls.tcp_port_out = 6303
5094 cls.udp_port_in = 6304
5095 cls.udp_port_out = 6304
5096 cls.icmp_id_in = 6305
5097 cls.icmp_id_out = 6305
5098 cls.nat_addr = '10.0.0.3'
5099 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5101 cls.vrf1_nat_addr = '10.0.10.3'
5102 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
5104 cls.ipfix_src_port = 4739
5105 cls.ipfix_domain_id = 1
5107 cls.create_pg_interfaces(range(5))
5108 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
5109 cls.ip6_interfaces.append(cls.pg_interfaces[2])
5110 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
5112 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
5114 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
5116 cls.pg0.generate_remote_hosts(2)
5118 for i in cls.ip6_interfaces:
5121 i.configure_ipv6_neighbors()
5123 for i in cls.ip4_interfaces:
5129 cls.pg3.config_ip4()
5130 cls.pg3.resolve_arp()
5131 cls.pg3.config_ip6()
5132 cls.pg3.configure_ipv6_neighbors()
5135 super(TestNAT64, cls).tearDownClass()
5138 def test_pool(self):
5139 """ Add/delete address to NAT64 pool """
5140 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5142 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5144 addresses = self.vapi.nat64_pool_addr_dump()
5145 self.assertEqual(len(addresses), 1)
5146 self.assertEqual(addresses[0].address, nat_addr)
5148 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5150 addresses = self.vapi.nat64_pool_addr_dump()
5151 self.assertEqual(len(addresses), 0)
5153 def test_interface(self):
5154 """ Enable/disable NAT64 feature on the interface """
5155 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5156 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5158 interfaces = self.vapi.nat64_interface_dump()
5159 self.assertEqual(len(interfaces), 2)
5162 for intf in interfaces:
5163 if intf.sw_if_index == self.pg0.sw_if_index:
5164 self.assertEqual(intf.is_inside, 1)
5166 elif intf.sw_if_index == self.pg1.sw_if_index:
5167 self.assertEqual(intf.is_inside, 0)
5169 self.assertTrue(pg0_found)
5170 self.assertTrue(pg1_found)
5172 features = self.vapi.cli("show interface features pg0")
5173 self.assertNotEqual(features.find('nat64-in2out'), -1)
5174 features = self.vapi.cli("show interface features pg1")
5175 self.assertNotEqual(features.find('nat64-out2in'), -1)
5177 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5178 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5180 interfaces = self.vapi.nat64_interface_dump()
5181 self.assertEqual(len(interfaces), 0)
5183 def test_static_bib(self):
5184 """ Add/delete static BIB entry """
5185 in_addr = socket.inet_pton(socket.AF_INET6,
5186 '2001:db8:85a3::8a2e:370:7334')
5187 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5190 proto = IP_PROTOS.tcp
5192 self.vapi.nat64_add_del_static_bib(in_addr,
5197 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5202 self.assertEqual(bibe.i_addr, in_addr)
5203 self.assertEqual(bibe.o_addr, out_addr)
5204 self.assertEqual(bibe.i_port, in_port)
5205 self.assertEqual(bibe.o_port, out_port)
5206 self.assertEqual(static_bib_num, 1)
5208 self.vapi.nat64_add_del_static_bib(in_addr,
5214 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5219 self.assertEqual(static_bib_num, 0)
5221 def test_set_timeouts(self):
5222 """ Set NAT64 timeouts """
5223 # verify default values
5224 timeouts = self.vapi.nat64_get_timeouts()
5225 self.assertEqual(timeouts.udp, 300)
5226 self.assertEqual(timeouts.icmp, 60)
5227 self.assertEqual(timeouts.tcp_trans, 240)
5228 self.assertEqual(timeouts.tcp_est, 7440)
5229 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5231 # set and verify custom values
5232 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5233 tcp_est=7450, tcp_incoming_syn=10)
5234 timeouts = self.vapi.nat64_get_timeouts()
5235 self.assertEqual(timeouts.udp, 200)
5236 self.assertEqual(timeouts.icmp, 30)
5237 self.assertEqual(timeouts.tcp_trans, 250)
5238 self.assertEqual(timeouts.tcp_est, 7450)
5239 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5241 def test_dynamic(self):
5242 """ NAT64 dynamic translation test """
5243 self.tcp_port_in = 6303
5244 self.udp_port_in = 6304
5245 self.icmp_id_in = 6305
5247 ses_num_start = self.nat64_get_ses_num()
5249 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5251 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5252 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5255 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5256 self.pg0.add_stream(pkts)
5257 self.pg_enable_capture(self.pg_interfaces)
5259 capture = self.pg1.get_capture(len(pkts))
5260 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5261 dst_ip=self.pg1.remote_ip4)
5264 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5265 self.pg1.add_stream(pkts)
5266 self.pg_enable_capture(self.pg_interfaces)
5268 capture = self.pg0.get_capture(len(pkts))
5269 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5270 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5273 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5274 self.pg0.add_stream(pkts)
5275 self.pg_enable_capture(self.pg_interfaces)
5277 capture = self.pg1.get_capture(len(pkts))
5278 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5279 dst_ip=self.pg1.remote_ip4)
5282 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5283 self.pg1.add_stream(pkts)
5284 self.pg_enable_capture(self.pg_interfaces)
5286 capture = self.pg0.get_capture(len(pkts))
5287 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5289 ses_num_end = self.nat64_get_ses_num()
5291 self.assertEqual(ses_num_end - ses_num_start, 3)
5293 # tenant with specific VRF
5294 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5295 self.vrf1_nat_addr_n,
5296 vrf_id=self.vrf1_id)
5297 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5299 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5300 self.pg2.add_stream(pkts)
5301 self.pg_enable_capture(self.pg_interfaces)
5303 capture = self.pg1.get_capture(len(pkts))
5304 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5305 dst_ip=self.pg1.remote_ip4)
5307 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5308 self.pg1.add_stream(pkts)
5309 self.pg_enable_capture(self.pg_interfaces)
5311 capture = self.pg2.get_capture(len(pkts))
5312 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5314 def test_static(self):
5315 """ NAT64 static translation test """
5316 self.tcp_port_in = 60303
5317 self.udp_port_in = 60304
5318 self.icmp_id_in = 60305
5319 self.tcp_port_out = 60303
5320 self.udp_port_out = 60304
5321 self.icmp_id_out = 60305
5323 ses_num_start = self.nat64_get_ses_num()
5325 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5327 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5328 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5330 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5335 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5340 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5347 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5348 self.pg0.add_stream(pkts)
5349 self.pg_enable_capture(self.pg_interfaces)
5351 capture = self.pg1.get_capture(len(pkts))
5352 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5353 dst_ip=self.pg1.remote_ip4, same_port=True)
5356 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5357 self.pg1.add_stream(pkts)
5358 self.pg_enable_capture(self.pg_interfaces)
5360 capture = self.pg0.get_capture(len(pkts))
5361 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5362 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5364 ses_num_end = self.nat64_get_ses_num()
5366 self.assertEqual(ses_num_end - ses_num_start, 3)
5368 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5369 def test_session_timeout(self):
5370 """ NAT64 session timeout """
5371 self.icmp_id_in = 1234
5372 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5374 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5375 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5376 self.vapi.nat64_set_timeouts(icmp=5)
5378 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5379 self.pg0.add_stream(pkts)
5380 self.pg_enable_capture(self.pg_interfaces)
5382 capture = self.pg1.get_capture(len(pkts))
5384 ses_num_before_timeout = self.nat64_get_ses_num()
5388 # ICMP session after timeout
5389 ses_num_after_timeout = self.nat64_get_ses_num()
5390 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5392 def test_icmp_error(self):
5393 """ NAT64 ICMP Error message translation """
5394 self.tcp_port_in = 6303
5395 self.udp_port_in = 6304
5396 self.icmp_id_in = 6305
5398 ses_num_start = self.nat64_get_ses_num()
5400 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5402 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5403 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5405 # send some packets to create sessions
5406 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5407 self.pg0.add_stream(pkts)
5408 self.pg_enable_capture(self.pg_interfaces)
5410 capture_ip4 = self.pg1.get_capture(len(pkts))
5411 self.verify_capture_out(capture_ip4,
5412 nat_ip=self.nat_addr,
5413 dst_ip=self.pg1.remote_ip4)
5415 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5416 self.pg1.add_stream(pkts)
5417 self.pg_enable_capture(self.pg_interfaces)
5419 capture_ip6 = self.pg0.get_capture(len(pkts))
5420 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5421 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5422 self.pg0.remote_ip6)
5425 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5426 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5427 ICMPv6DestUnreach(code=1) /
5428 packet[IPv6] for packet in capture_ip6]
5429 self.pg0.add_stream(pkts)
5430 self.pg_enable_capture(self.pg_interfaces)
5432 capture = self.pg1.get_capture(len(pkts))
5433 for packet in capture:
5435 self.assertEqual(packet[IP].src, self.nat_addr)
5436 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5437 self.assertEqual(packet[ICMP].type, 3)
5438 self.assertEqual(packet[ICMP].code, 13)
5439 inner = packet[IPerror]
5440 self.assertEqual(inner.src, self.pg1.remote_ip4)
5441 self.assertEqual(inner.dst, self.nat_addr)
5442 self.check_icmp_checksum(packet)
5443 if inner.haslayer(TCPerror):
5444 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5445 elif inner.haslayer(UDPerror):
5446 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5448 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5450 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5454 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5455 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5456 ICMP(type=3, code=13) /
5457 packet[IP] for packet in capture_ip4]
5458 self.pg1.add_stream(pkts)
5459 self.pg_enable_capture(self.pg_interfaces)
5461 capture = self.pg0.get_capture(len(pkts))
5462 for packet in capture:
5464 self.assertEqual(packet[IPv6].src, ip.src)
5465 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5466 icmp = packet[ICMPv6DestUnreach]
5467 self.assertEqual(icmp.code, 1)
5468 inner = icmp[IPerror6]
5469 self.assertEqual(inner.src, self.pg0.remote_ip6)
5470 self.assertEqual(inner.dst, ip.src)
5471 self.check_icmpv6_checksum(packet)
5472 if inner.haslayer(TCPerror):
5473 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5474 elif inner.haslayer(UDPerror):
5475 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5477 self.assertEqual(inner[ICMPv6EchoRequest].id,
5480 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5483 def test_hairpinning(self):
5484 """ NAT64 hairpinning """
5486 client = self.pg0.remote_hosts[0]
5487 server = self.pg0.remote_hosts[1]
5488 server_tcp_in_port = 22
5489 server_tcp_out_port = 4022
5490 server_udp_in_port = 23
5491 server_udp_out_port = 4023
5492 client_tcp_in_port = 1234
5493 client_udp_in_port = 1235
5494 client_tcp_out_port = 0
5495 client_udp_out_port = 0
5496 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5497 nat_addr_ip6 = ip.src
5499 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5501 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5502 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5504 self.vapi.nat64_add_del_static_bib(server.ip6n,
5507 server_tcp_out_port,
5509 self.vapi.nat64_add_del_static_bib(server.ip6n,
5512 server_udp_out_port,
5517 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5518 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5519 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5521 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5522 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5523 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5525 self.pg0.add_stream(pkts)
5526 self.pg_enable_capture(self.pg_interfaces)
5528 capture = self.pg0.get_capture(len(pkts))
5529 for packet in capture:
5531 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5532 self.assertEqual(packet[IPv6].dst, server.ip6)
5533 if packet.haslayer(TCP):
5534 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5535 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5536 self.check_tcp_checksum(packet)
5537 client_tcp_out_port = packet[TCP].sport
5539 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5540 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5541 self.check_udp_checksum(packet)
5542 client_udp_out_port = packet[UDP].sport
5544 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5550 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5551 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5553 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5554 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5555 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5557 self.pg0.add_stream(pkts)
5558 self.pg_enable_capture(self.pg_interfaces)
5560 capture = self.pg0.get_capture(len(pkts))
5561 for packet in capture:
5563 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5564 self.assertEqual(packet[IPv6].dst, client.ip6)
5565 if packet.haslayer(TCP):
5566 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5567 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5568 self.check_tcp_checksum(packet)
5570 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5571 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5572 self.check_udp_checksum(packet)
5574 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5579 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5580 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5581 ICMPv6DestUnreach(code=1) /
5582 packet[IPv6] for packet in capture]
5583 self.pg0.add_stream(pkts)
5584 self.pg_enable_capture(self.pg_interfaces)
5586 capture = self.pg0.get_capture(len(pkts))
5587 for packet in capture:
5589 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5590 self.assertEqual(packet[IPv6].dst, server.ip6)
5591 icmp = packet[ICMPv6DestUnreach]
5592 self.assertEqual(icmp.code, 1)
5593 inner = icmp[IPerror6]
5594 self.assertEqual(inner.src, server.ip6)
5595 self.assertEqual(inner.dst, nat_addr_ip6)
5596 self.check_icmpv6_checksum(packet)
5597 if inner.haslayer(TCPerror):
5598 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5599 self.assertEqual(inner[TCPerror].dport,
5600 client_tcp_out_port)
5602 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5603 self.assertEqual(inner[UDPerror].dport,
5604 client_udp_out_port)
5606 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5609 def test_prefix(self):
5610 """ NAT64 Network-Specific Prefix """
5612 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5614 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5615 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5616 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5617 self.vrf1_nat_addr_n,
5618 vrf_id=self.vrf1_id)
5619 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5622 global_pref64 = "2001:db8::"
5623 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5624 global_pref64_len = 32
5625 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5627 prefix = self.vapi.nat64_prefix_dump()
5628 self.assertEqual(len(prefix), 1)
5629 self.assertEqual(prefix[0].prefix, global_pref64_n)
5630 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5631 self.assertEqual(prefix[0].vrf_id, 0)
5633 # Add tenant specific prefix
5634 vrf1_pref64 = "2001:db8:122:300::"
5635 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5636 vrf1_pref64_len = 56
5637 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5639 vrf_id=self.vrf1_id)
5640 prefix = self.vapi.nat64_prefix_dump()
5641 self.assertEqual(len(prefix), 2)
5644 pkts = self.create_stream_in_ip6(self.pg0,
5647 plen=global_pref64_len)
5648 self.pg0.add_stream(pkts)
5649 self.pg_enable_capture(self.pg_interfaces)
5651 capture = self.pg1.get_capture(len(pkts))
5652 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5653 dst_ip=self.pg1.remote_ip4)
5655 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5656 self.pg1.add_stream(pkts)
5657 self.pg_enable_capture(self.pg_interfaces)
5659 capture = self.pg0.get_capture(len(pkts))
5660 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5663 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5665 # Tenant specific prefix
5666 pkts = self.create_stream_in_ip6(self.pg2,
5669 plen=vrf1_pref64_len)
5670 self.pg2.add_stream(pkts)
5671 self.pg_enable_capture(self.pg_interfaces)
5673 capture = self.pg1.get_capture(len(pkts))
5674 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5675 dst_ip=self.pg1.remote_ip4)
5677 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5678 self.pg1.add_stream(pkts)
5679 self.pg_enable_capture(self.pg_interfaces)
5681 capture = self.pg2.get_capture(len(pkts))
5682 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5685 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5687 def test_unknown_proto(self):
5688 """ NAT64 translate packet with unknown protocol """
5690 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5692 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5693 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5694 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5697 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5698 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5699 TCP(sport=self.tcp_port_in, dport=20))
5700 self.pg0.add_stream(p)
5701 self.pg_enable_capture(self.pg_interfaces)
5703 p = self.pg1.get_capture(1)
5705 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5706 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5708 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5709 TCP(sport=1234, dport=1234))
5710 self.pg0.add_stream(p)
5711 self.pg_enable_capture(self.pg_interfaces)
5713 p = self.pg1.get_capture(1)
5716 self.assertEqual(packet[IP].src, self.nat_addr)
5717 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5718 self.assertTrue(packet.haslayer(GRE))
5719 self.check_ip_checksum(packet)
5721 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5725 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5726 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5728 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5729 TCP(sport=1234, dport=1234))
5730 self.pg1.add_stream(p)
5731 self.pg_enable_capture(self.pg_interfaces)
5733 p = self.pg0.get_capture(1)
5736 self.assertEqual(packet[IPv6].src, remote_ip6)
5737 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5738 self.assertEqual(packet[IPv6].nh, 47)
5740 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5743 def test_hairpinning_unknown_proto(self):
5744 """ NAT64 translate packet with unknown protocol - hairpinning """
5746 client = self.pg0.remote_hosts[0]
5747 server = self.pg0.remote_hosts[1]
5748 server_tcp_in_port = 22
5749 server_tcp_out_port = 4022
5750 client_tcp_in_port = 1234
5751 client_tcp_out_port = 1235
5752 server_nat_ip = "10.0.0.100"
5753 client_nat_ip = "10.0.0.110"
5754 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5755 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5756 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5757 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5759 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5761 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5762 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5764 self.vapi.nat64_add_del_static_bib(server.ip6n,
5767 server_tcp_out_port,
5770 self.vapi.nat64_add_del_static_bib(server.ip6n,
5776 self.vapi.nat64_add_del_static_bib(client.ip6n,
5779 client_tcp_out_port,
5783 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5784 IPv6(src=client.ip6, dst=server_nat_ip6) /
5785 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5786 self.pg0.add_stream(p)
5787 self.pg_enable_capture(self.pg_interfaces)
5789 p = self.pg0.get_capture(1)
5791 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5792 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5794 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5795 TCP(sport=1234, dport=1234))
5796 self.pg0.add_stream(p)
5797 self.pg_enable_capture(self.pg_interfaces)
5799 p = self.pg0.get_capture(1)
5802 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5803 self.assertEqual(packet[IPv6].dst, server.ip6)
5804 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5806 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5810 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5811 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5813 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5814 TCP(sport=1234, dport=1234))
5815 self.pg0.add_stream(p)
5816 self.pg_enable_capture(self.pg_interfaces)
5818 p = self.pg0.get_capture(1)
5821 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5822 self.assertEqual(packet[IPv6].dst, client.ip6)
5823 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5825 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5828 def test_one_armed_nat64(self):
5829 """ One armed NAT64 """
5831 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5835 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5837 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5838 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5841 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5842 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5843 TCP(sport=12345, dport=80))
5844 self.pg3.add_stream(p)
5845 self.pg_enable_capture(self.pg_interfaces)
5847 capture = self.pg3.get_capture(1)
5852 self.assertEqual(ip.src, self.nat_addr)
5853 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5854 self.assertNotEqual(tcp.sport, 12345)
5855 external_port = tcp.sport
5856 self.assertEqual(tcp.dport, 80)
5857 self.check_tcp_checksum(p)
5858 self.check_ip_checksum(p)
5860 self.logger.error(ppp("Unexpected or invalid packet:", p))
5864 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5865 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5866 TCP(sport=80, dport=external_port))
5867 self.pg3.add_stream(p)
5868 self.pg_enable_capture(self.pg_interfaces)
5870 capture = self.pg3.get_capture(1)
5875 self.assertEqual(ip.src, remote_host_ip6)
5876 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5877 self.assertEqual(tcp.sport, 80)
5878 self.assertEqual(tcp.dport, 12345)
5879 self.check_tcp_checksum(p)
5881 self.logger.error(ppp("Unexpected or invalid packet:", p))
5884 def test_frag_in_order(self):
5885 """ NAT64 translate fragments arriving in order """
5886 self.tcp_port_in = random.randint(1025, 65535)
5888 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5890 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5891 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5893 reass = self.vapi.nat_reass_dump()
5894 reass_n_start = len(reass)
5898 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5899 self.tcp_port_in, 20, data)
5900 self.pg0.add_stream(pkts)
5901 self.pg_enable_capture(self.pg_interfaces)
5903 frags = self.pg1.get_capture(len(pkts))
5904 p = self.reass_frags_and_verify(frags,
5906 self.pg1.remote_ip4)
5907 self.assertEqual(p[TCP].dport, 20)
5908 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5909 self.tcp_port_out = p[TCP].sport
5910 self.assertEqual(data, p[Raw].load)
5913 data = "A" * 4 + "b" * 16 + "C" * 3
5914 pkts = self.create_stream_frag(self.pg1,
5919 self.pg1.add_stream(pkts)
5920 self.pg_enable_capture(self.pg_interfaces)
5922 frags = self.pg0.get_capture(len(pkts))
5923 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5924 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5925 self.assertEqual(p[TCP].sport, 20)
5926 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5927 self.assertEqual(data, p[Raw].load)
5929 reass = self.vapi.nat_reass_dump()
5930 reass_n_end = len(reass)
5932 self.assertEqual(reass_n_end - reass_n_start, 2)
5934 def test_reass_hairpinning(self):
5935 """ NAT64 fragments hairpinning """
5937 client = self.pg0.remote_hosts[0]
5938 server = self.pg0.remote_hosts[1]
5939 server_in_port = random.randint(1025, 65535)
5940 server_out_port = random.randint(1025, 65535)
5941 client_in_port = random.randint(1025, 65535)
5942 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5943 nat_addr_ip6 = ip.src
5945 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5947 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5948 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5950 # add static BIB entry for server
5951 self.vapi.nat64_add_del_static_bib(server.ip6n,
5957 # send packet from host to server
5958 pkts = self.create_stream_frag_ip6(self.pg0,
5963 self.pg0.add_stream(pkts)
5964 self.pg_enable_capture(self.pg_interfaces)
5966 frags = self.pg0.get_capture(len(pkts))
5967 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5968 self.assertNotEqual(p[TCP].sport, client_in_port)
5969 self.assertEqual(p[TCP].dport, server_in_port)
5970 self.assertEqual(data, p[Raw].load)
5972 def test_frag_out_of_order(self):
5973 """ NAT64 translate fragments arriving out of order """
5974 self.tcp_port_in = random.randint(1025, 65535)
5976 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5978 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5979 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5983 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5984 self.tcp_port_in, 20, data)
5986 self.pg0.add_stream(pkts)
5987 self.pg_enable_capture(self.pg_interfaces)
5989 frags = self.pg1.get_capture(len(pkts))
5990 p = self.reass_frags_and_verify(frags,
5992 self.pg1.remote_ip4)
5993 self.assertEqual(p[TCP].dport, 20)
5994 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5995 self.tcp_port_out = p[TCP].sport
5996 self.assertEqual(data, p[Raw].load)
5999 data = "A" * 4 + "B" * 16 + "C" * 3
6000 pkts = self.create_stream_frag(self.pg1,
6006 self.pg1.add_stream(pkts)
6007 self.pg_enable_capture(self.pg_interfaces)
6009 frags = self.pg0.get_capture(len(pkts))
6010 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
6011 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
6012 self.assertEqual(p[TCP].sport, 20)
6013 self.assertEqual(p[TCP].dport, self.tcp_port_in)
6014 self.assertEqual(data, p[Raw].load)
6016 def test_interface_addr(self):
6017 """ Acquire NAT64 pool addresses from interface """
6018 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
6020 # no address in NAT64 pool
6021 adresses = self.vapi.nat44_address_dump()
6022 self.assertEqual(0, len(adresses))
6024 # configure interface address and check NAT64 address pool
6025 self.pg4.config_ip4()
6026 addresses = self.vapi.nat64_pool_addr_dump()
6027 self.assertEqual(len(addresses), 1)
6028 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
6030 # remove interface address and check NAT64 address pool
6031 self.pg4.unconfig_ip4()
6032 addresses = self.vapi.nat64_pool_addr_dump()
6033 self.assertEqual(0, len(adresses))
6035 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
6036 def test_ipfix_max_bibs_sessions(self):
6037 """ IPFIX logging maximum session and BIB entries exceeded """
6040 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6044 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6046 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6047 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6051 for i in range(0, max_bibs):
6052 src = "fd01:aa::%x" % (i)
6053 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6054 IPv6(src=src, dst=remote_host_ip6) /
6055 TCP(sport=12345, dport=80))
6057 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6058 IPv6(src=src, dst=remote_host_ip6) /
6059 TCP(sport=12345, dport=22))
6061 self.pg0.add_stream(pkts)
6062 self.pg_enable_capture(self.pg_interfaces)
6064 self.pg1.get_capture(max_sessions)
6066 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6067 src_address=self.pg3.local_ip4n,
6069 template_interval=10)
6070 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6071 src_port=self.ipfix_src_port)
6073 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6074 IPv6(src=src, dst=remote_host_ip6) /
6075 TCP(sport=12345, dport=25))
6076 self.pg0.add_stream(p)
6077 self.pg_enable_capture(self.pg_interfaces)
6079 self.pg1.get_capture(0)
6080 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6081 capture = self.pg3.get_capture(9)
6082 ipfix = IPFIXDecoder()
6083 # first load template
6085 self.assertTrue(p.haslayer(IPFIX))
6086 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6087 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6088 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6089 self.assertEqual(p[UDP].dport, 4739)
6090 self.assertEqual(p[IPFIX].observationDomainID,
6091 self.ipfix_domain_id)
6092 if p.haslayer(Template):
6093 ipfix.add_template(p.getlayer(Template))
6094 # verify events in data set
6096 if p.haslayer(Data):
6097 data = ipfix.decode_data_set(p.getlayer(Set))
6098 self.verify_ipfix_max_sessions(data, max_sessions)
6100 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6101 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6102 TCP(sport=12345, dport=80))
6103 self.pg0.add_stream(p)
6104 self.pg_enable_capture(self.pg_interfaces)
6106 self.pg1.get_capture(0)
6107 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6108 capture = self.pg3.get_capture(1)
6109 # verify events in data set
6111 self.assertTrue(p.haslayer(IPFIX))
6112 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6113 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6114 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6115 self.assertEqual(p[UDP].dport, 4739)
6116 self.assertEqual(p[IPFIX].observationDomainID,
6117 self.ipfix_domain_id)
6118 if p.haslayer(Data):
6119 data = ipfix.decode_data_set(p.getlayer(Set))
6120 self.verify_ipfix_max_bibs(data, max_bibs)
6122 def test_ipfix_max_frags(self):
6123 """ IPFIX logging maximum fragments pending reassembly exceeded """
6124 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6126 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6127 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6128 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6129 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6130 src_address=self.pg3.local_ip4n,
6132 template_interval=10)
6133 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6134 src_port=self.ipfix_src_port)
6137 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6138 self.tcp_port_in, 20, data)
6139 self.pg0.add_stream(pkts[-1])
6140 self.pg_enable_capture(self.pg_interfaces)
6142 self.pg1.get_capture(0)
6143 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6144 capture = self.pg3.get_capture(9)
6145 ipfix = IPFIXDecoder()
6146 # first load template
6148 self.assertTrue(p.haslayer(IPFIX))
6149 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6150 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6151 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6152 self.assertEqual(p[UDP].dport, 4739)
6153 self.assertEqual(p[IPFIX].observationDomainID,
6154 self.ipfix_domain_id)
6155 if p.haslayer(Template):
6156 ipfix.add_template(p.getlayer(Template))
6157 # verify events in data set
6159 if p.haslayer(Data):
6160 data = ipfix.decode_data_set(p.getlayer(Set))
6161 self.verify_ipfix_max_fragments_ip6(data, 0,
6162 self.pg0.remote_ip6n)
6164 def test_ipfix_bib_ses(self):
6165 """ IPFIX logging NAT64 BIB/session create and delete events """
6166 self.tcp_port_in = random.randint(1025, 65535)
6167 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6171 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6173 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6174 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6175 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6176 src_address=self.pg3.local_ip4n,
6178 template_interval=10)
6179 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6180 src_port=self.ipfix_src_port)
6183 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6184 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6185 TCP(sport=self.tcp_port_in, dport=25))
6186 self.pg0.add_stream(p)
6187 self.pg_enable_capture(self.pg_interfaces)
6189 p = self.pg1.get_capture(1)
6190 self.tcp_port_out = p[0][TCP].sport
6191 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6192 capture = self.pg3.get_capture(10)
6193 ipfix = IPFIXDecoder()
6194 # first load template
6196 self.assertTrue(p.haslayer(IPFIX))
6197 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6198 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6199 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6200 self.assertEqual(p[UDP].dport, 4739)
6201 self.assertEqual(p[IPFIX].observationDomainID,
6202 self.ipfix_domain_id)
6203 if p.haslayer(Template):
6204 ipfix.add_template(p.getlayer(Template))
6205 # verify events in data set
6207 if p.haslayer(Data):
6208 data = ipfix.decode_data_set(p.getlayer(Set))
6209 if ord(data[0][230]) == 10:
6210 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6211 elif ord(data[0][230]) == 6:
6212 self.verify_ipfix_nat64_ses(data,
6214 self.pg0.remote_ip6n,
6215 self.pg1.remote_ip4,
6218 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6221 self.pg_enable_capture(self.pg_interfaces)
6222 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6225 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6226 capture = self.pg3.get_capture(2)
6227 # verify events in data set
6229 self.assertTrue(p.haslayer(IPFIX))
6230 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6231 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6232 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6233 self.assertEqual(p[UDP].dport, 4739)
6234 self.assertEqual(p[IPFIX].observationDomainID,
6235 self.ipfix_domain_id)
6236 if p.haslayer(Data):
6237 data = ipfix.decode_data_set(p.getlayer(Set))
6238 if ord(data[0][230]) == 11:
6239 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6240 elif ord(data[0][230]) == 7:
6241 self.verify_ipfix_nat64_ses(data,
6243 self.pg0.remote_ip6n,
6244 self.pg1.remote_ip4,
6247 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6249 def nat64_get_ses_num(self):
6251 Return number of active NAT64 sessions.
6253 st = self.vapi.nat64_st_dump()
6256 def clear_nat64(self):
6258 Clear NAT64 configuration.
6260 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6261 domain_id=self.ipfix_domain_id)
6262 self.ipfix_src_port = 4739
6263 self.ipfix_domain_id = 1
6265 self.vapi.nat64_set_timeouts()
6267 interfaces = self.vapi.nat64_interface_dump()
6268 for intf in interfaces:
6269 if intf.is_inside > 1:
6270 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6273 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6277 bib = self.vapi.nat64_bib_dump(255)
6280 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6288 adresses = self.vapi.nat64_pool_addr_dump()
6289 for addr in adresses:
6290 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6295 prefixes = self.vapi.nat64_prefix_dump()
6296 for prefix in prefixes:
6297 self.vapi.nat64_add_del_prefix(prefix.prefix,
6299 vrf_id=prefix.vrf_id,
6303 super(TestNAT64, self).tearDown()
6304 if not self.vpp_dead:
6305 self.logger.info(self.vapi.cli("show nat64 pool"))
6306 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6307 self.logger.info(self.vapi.cli("show nat64 prefix"))
6308 self.logger.info(self.vapi.cli("show nat64 bib all"))
6309 self.logger.info(self.vapi.cli("show nat64 session table all"))
6310 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6314 class TestDSlite(MethodHolder):
6315 """ DS-Lite Test Cases """
6318 def setUpClass(cls):
6319 super(TestDSlite, cls).setUpClass()
6322 cls.nat_addr = '10.0.0.3'
6323 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6325 cls.create_pg_interfaces(range(2))
6327 cls.pg0.config_ip4()
6328 cls.pg0.resolve_arp()
6330 cls.pg1.config_ip6()
6331 cls.pg1.generate_remote_hosts(2)
6332 cls.pg1.configure_ipv6_neighbors()
6335 super(TestDSlite, cls).tearDownClass()
6338 def test_dslite(self):
6339 """ Test DS-Lite """
6340 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6342 aftr_ip4 = '192.0.0.1'
6343 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6344 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6345 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6346 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6349 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6350 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6351 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6352 UDP(sport=20000, dport=10000))
6353 self.pg1.add_stream(p)
6354 self.pg_enable_capture(self.pg_interfaces)
6356 capture = self.pg0.get_capture(1)
6357 capture = capture[0]
6358 self.assertFalse(capture.haslayer(IPv6))
6359 self.assertEqual(capture[IP].src, self.nat_addr)
6360 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6361 self.assertNotEqual(capture[UDP].sport, 20000)
6362 self.assertEqual(capture[UDP].dport, 10000)
6363 self.check_ip_checksum(capture)
6364 out_port = capture[UDP].sport
6366 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6367 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6368 UDP(sport=10000, dport=out_port))
6369 self.pg0.add_stream(p)
6370 self.pg_enable_capture(self.pg_interfaces)
6372 capture = self.pg1.get_capture(1)
6373 capture = capture[0]
6374 self.assertEqual(capture[IPv6].src, aftr_ip6)
6375 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6376 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6377 self.assertEqual(capture[IP].dst, '192.168.1.1')
6378 self.assertEqual(capture[UDP].sport, 10000)
6379 self.assertEqual(capture[UDP].dport, 20000)
6380 self.check_ip_checksum(capture)
6383 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6384 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6385 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6386 TCP(sport=20001, dport=10001))
6387 self.pg1.add_stream(p)
6388 self.pg_enable_capture(self.pg_interfaces)
6390 capture = self.pg0.get_capture(1)
6391 capture = capture[0]
6392 self.assertFalse(capture.haslayer(IPv6))
6393 self.assertEqual(capture[IP].src, self.nat_addr)
6394 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6395 self.assertNotEqual(capture[TCP].sport, 20001)
6396 self.assertEqual(capture[TCP].dport, 10001)
6397 self.check_ip_checksum(capture)
6398 self.check_tcp_checksum(capture)
6399 out_port = capture[TCP].sport
6401 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6402 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6403 TCP(sport=10001, dport=out_port))
6404 self.pg0.add_stream(p)
6405 self.pg_enable_capture(self.pg_interfaces)
6407 capture = self.pg1.get_capture(1)
6408 capture = capture[0]
6409 self.assertEqual(capture[IPv6].src, aftr_ip6)
6410 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6411 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6412 self.assertEqual(capture[IP].dst, '192.168.1.1')
6413 self.assertEqual(capture[TCP].sport, 10001)
6414 self.assertEqual(capture[TCP].dport, 20001)
6415 self.check_ip_checksum(capture)
6416 self.check_tcp_checksum(capture)
6419 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6420 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6421 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6422 ICMP(id=4000, type='echo-request'))
6423 self.pg1.add_stream(p)
6424 self.pg_enable_capture(self.pg_interfaces)
6426 capture = self.pg0.get_capture(1)
6427 capture = capture[0]
6428 self.assertFalse(capture.haslayer(IPv6))
6429 self.assertEqual(capture[IP].src, self.nat_addr)
6430 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6431 self.assertNotEqual(capture[ICMP].id, 4000)
6432 self.check_ip_checksum(capture)
6433 self.check_icmp_checksum(capture)
6434 out_id = capture[ICMP].id
6436 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6437 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6438 ICMP(id=out_id, type='echo-reply'))
6439 self.pg0.add_stream(p)
6440 self.pg_enable_capture(self.pg_interfaces)
6442 capture = self.pg1.get_capture(1)
6443 capture = capture[0]
6444 self.assertEqual(capture[IPv6].src, aftr_ip6)
6445 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6446 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6447 self.assertEqual(capture[IP].dst, '192.168.1.1')
6448 self.assertEqual(capture[ICMP].id, 4000)
6449 self.check_ip_checksum(capture)
6450 self.check_icmp_checksum(capture)
6452 # ping DS-Lite AFTR tunnel endpoint address
6453 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6454 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6455 ICMPv6EchoRequest())
6456 self.pg1.add_stream(p)
6457 self.pg_enable_capture(self.pg_interfaces)
6459 capture = self.pg1.get_capture(1)
6460 self.assertEqual(1, len(capture))
6461 capture = capture[0]
6462 self.assertEqual(capture[IPv6].src, aftr_ip6)
6463 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6464 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6467 super(TestDSlite, self).tearDown()
6468 if not self.vpp_dead:
6469 self.logger.info(self.vapi.cli("show dslite pool"))
6471 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6472 self.logger.info(self.vapi.cli("show dslite sessions"))
6475 class TestDSliteCE(MethodHolder):
6476 """ DS-Lite CE Test Cases """
6479 def setUpConstants(cls):
6480 super(TestDSliteCE, cls).setUpConstants()
6481 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6484 def setUpClass(cls):
6485 super(TestDSliteCE, cls).setUpClass()
6488 cls.create_pg_interfaces(range(2))
6490 cls.pg0.config_ip4()
6491 cls.pg0.resolve_arp()
6493 cls.pg1.config_ip6()
6494 cls.pg1.generate_remote_hosts(1)
6495 cls.pg1.configure_ipv6_neighbors()
6498 super(TestDSliteCE, cls).tearDownClass()
6501 def test_dslite_ce(self):
6502 """ Test DS-Lite CE """
6504 b4_ip4 = '192.0.0.2'
6505 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6506 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6507 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6508 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6510 aftr_ip4 = '192.0.0.1'
6511 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6512 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6513 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6514 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6516 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6517 dst_address_length=128,
6518 next_hop_address=self.pg1.remote_ip6n,
6519 next_hop_sw_if_index=self.pg1.sw_if_index,
6523 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6524 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6525 UDP(sport=10000, dport=20000))
6526 self.pg0.add_stream(p)
6527 self.pg_enable_capture(self.pg_interfaces)
6529 capture = self.pg1.get_capture(1)
6530 capture = capture[0]
6531 self.assertEqual(capture[IPv6].src, b4_ip6)
6532 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6533 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6534 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6535 self.assertEqual(capture[UDP].sport, 10000)
6536 self.assertEqual(capture[UDP].dport, 20000)
6537 self.check_ip_checksum(capture)
6540 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6541 IPv6(dst=b4_ip6, src=aftr_ip6) /
6542 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6543 UDP(sport=20000, dport=10000))
6544 self.pg1.add_stream(p)
6545 self.pg_enable_capture(self.pg_interfaces)
6547 capture = self.pg0.get_capture(1)
6548 capture = capture[0]
6549 self.assertFalse(capture.haslayer(IPv6))
6550 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6551 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6552 self.assertEqual(capture[UDP].sport, 20000)
6553 self.assertEqual(capture[UDP].dport, 10000)
6554 self.check_ip_checksum(capture)
6556 # ping DS-Lite B4 tunnel endpoint address
6557 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6558 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6559 ICMPv6EchoRequest())
6560 self.pg1.add_stream(p)
6561 self.pg_enable_capture(self.pg_interfaces)
6563 capture = self.pg1.get_capture(1)
6564 self.assertEqual(1, len(capture))
6565 capture = capture[0]
6566 self.assertEqual(capture[IPv6].src, b4_ip6)
6567 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6568 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6571 super(TestDSliteCE, self).tearDown()
6572 if not self.vpp_dead:
6574 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6576 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6579 class TestNAT66(MethodHolder):
6580 """ NAT66 Test Cases """
6583 def setUpClass(cls):
6584 super(TestNAT66, cls).setUpClass()
6587 cls.nat_addr = 'fd01:ff::2'
6588 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6590 cls.create_pg_interfaces(range(2))
6591 cls.interfaces = list(cls.pg_interfaces)
6593 for i in cls.interfaces:
6596 i.configure_ipv6_neighbors()
6599 super(TestNAT66, cls).tearDownClass()
6602 def test_static(self):
6603 """ 1:1 NAT66 test """
6604 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6605 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6606 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6611 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6612 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6615 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6616 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6619 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6620 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6621 ICMPv6EchoRequest())
6623 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6624 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6625 GRE() / IP() / TCP())
6627 self.pg0.add_stream(pkts)
6628 self.pg_enable_capture(self.pg_interfaces)
6630 capture = self.pg1.get_capture(len(pkts))
6631 for packet in capture:
6633 self.assertEqual(packet[IPv6].src, self.nat_addr)
6634 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6635 if packet.haslayer(TCP):
6636 self.check_tcp_checksum(packet)
6637 elif packet.haslayer(UDP):
6638 self.check_udp_checksum(packet)
6639 elif packet.haslayer(ICMPv6EchoRequest):
6640 self.check_icmpv6_checksum(packet)
6642 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6647 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6648 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6651 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6652 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6655 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6656 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6659 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6660 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6661 GRE() / IP() / TCP())
6663 self.pg1.add_stream(pkts)
6664 self.pg_enable_capture(self.pg_interfaces)
6666 capture = self.pg0.get_capture(len(pkts))
6667 for packet in capture:
6669 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6670 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6671 if packet.haslayer(TCP):
6672 self.check_tcp_checksum(packet)
6673 elif packet.haslayer(UDP):
6674 self.check_udp_checksum(packet)
6675 elif packet.haslayer(ICMPv6EchoReply):
6676 self.check_icmpv6_checksum(packet)
6678 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6681 sm = self.vapi.nat66_static_mapping_dump()
6682 self.assertEqual(len(sm), 1)
6683 self.assertEqual(sm[0].total_pkts, 8)
6685 def test_check_no_translate(self):
6686 """ NAT66 translate only when egress interface is outside interface """
6687 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6688 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6689 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6693 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6694 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6696 self.pg0.add_stream([p])
6697 self.pg_enable_capture(self.pg_interfaces)
6699 capture = self.pg1.get_capture(1)
6702 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6703 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6705 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6708 def clear_nat66(self):
6710 Clear NAT66 configuration.
6712 interfaces = self.vapi.nat66_interface_dump()
6713 for intf in interfaces:
6714 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6718 static_mappings = self.vapi.nat66_static_mapping_dump()
6719 for sm in static_mappings:
6720 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6721 sm.external_ip_address,
6726 super(TestNAT66, self).tearDown()
6727 if not self.vpp_dead:
6728 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6729 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6732 if __name__ == '__main__':
6733 unittest.main(testRunner=VppTestRunner)