9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1388 pkts = self.create_stream_in(self.pg0, self.pg1)
1389 self.pg0.add_stream(pkts)
1390 self.pg_enable_capture(self.pg_interfaces)
1392 capture = self.pg1.get_capture(len(pkts))
1393 self.verify_capture_out(capture, nat_ip, True)
1396 pkts = self.create_stream_out(self.pg1, nat_ip)
1397 self.pg1.add_stream(pkts)
1398 self.pg_enable_capture(self.pg_interfaces)
1400 capture = self.pg0.get_capture(len(pkts))
1401 self.verify_capture_in(capture, self.pg0)
1403 def test_static_out(self):
1404 """ 1:1 NAT initialized from outside network """
1406 nat_ip = "10.0.0.20"
1407 self.tcp_port_out = 6303
1408 self.udp_port_out = 6304
1409 self.icmp_id_out = 6305
1412 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1413 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1414 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1416 sm = self.vapi.nat44_static_mapping_dump()
1417 self.assertEqual(len(sm), 1)
1418 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1421 pkts = self.create_stream_out(self.pg1, nat_ip)
1422 self.pg1.add_stream(pkts)
1423 self.pg_enable_capture(self.pg_interfaces)
1425 capture = self.pg0.get_capture(len(pkts))
1426 self.verify_capture_in(capture, self.pg0)
1429 pkts = self.create_stream_in(self.pg0, self.pg1)
1430 self.pg0.add_stream(pkts)
1431 self.pg_enable_capture(self.pg_interfaces)
1433 capture = self.pg1.get_capture(len(pkts))
1434 self.verify_capture_out(capture, nat_ip, True)
1436 def test_static_with_port_in(self):
1437 """ 1:1 NAPT initialized from inside network """
1439 self.tcp_port_out = 3606
1440 self.udp_port_out = 3607
1441 self.icmp_id_out = 3608
1443 self.nat44_add_address(self.nat_addr)
1444 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1445 self.tcp_port_in, self.tcp_port_out,
1446 proto=IP_PROTOS.tcp)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.udp_port_in, self.udp_port_out,
1449 proto=IP_PROTOS.udp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.icmp_id_in, self.icmp_id_out,
1452 proto=IP_PROTOS.icmp)
1453 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1454 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1458 pkts = self.create_stream_in(self.pg0, self.pg1)
1459 self.pg0.add_stream(pkts)
1460 self.pg_enable_capture(self.pg_interfaces)
1462 capture = self.pg1.get_capture(len(pkts))
1463 self.verify_capture_out(capture)
1466 pkts = self.create_stream_out(self.pg1)
1467 self.pg1.add_stream(pkts)
1468 self.pg_enable_capture(self.pg_interfaces)
1470 capture = self.pg0.get_capture(len(pkts))
1471 self.verify_capture_in(capture, self.pg0)
1473 def test_static_with_port_out(self):
1474 """ 1:1 NAPT initialized from outside network """
1476 self.tcp_port_out = 30606
1477 self.udp_port_out = 30607
1478 self.icmp_id_out = 30608
1480 self.nat44_add_address(self.nat_addr)
1481 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1482 self.tcp_port_in, self.tcp_port_out,
1483 proto=IP_PROTOS.tcp)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.udp_port_in, self.udp_port_out,
1486 proto=IP_PROTOS.udp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.icmp_id_in, self.icmp_id_out,
1489 proto=IP_PROTOS.icmp)
1490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1495 pkts = self.create_stream_out(self.pg1)
1496 self.pg1.add_stream(pkts)
1497 self.pg_enable_capture(self.pg_interfaces)
1499 capture = self.pg0.get_capture(len(pkts))
1500 self.verify_capture_in(capture, self.pg0)
1503 pkts = self.create_stream_in(self.pg0, self.pg1)
1504 self.pg0.add_stream(pkts)
1505 self.pg_enable_capture(self.pg_interfaces)
1507 capture = self.pg1.get_capture(len(pkts))
1508 self.verify_capture_out(capture)
1510 def test_static_with_port_out2(self):
1511 """ 1:1 NAPT symmetrical rule """
1516 self.vapi.nat44_forwarding_enable_disable(1)
1517 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1518 local_port, external_port,
1519 proto=IP_PROTOS.tcp, out2in_only=1)
1520 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1521 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1524 # from client to service
1525 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1526 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1527 TCP(sport=12345, dport=external_port))
1528 self.pg1.add_stream(p)
1529 self.pg_enable_capture(self.pg_interfaces)
1531 capture = self.pg0.get_capture(1)
1537 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1538 self.assertEqual(tcp.dport, local_port)
1539 self.check_tcp_checksum(p)
1540 self.check_ip_checksum(p)
1542 self.logger.error(ppp("Unexpected or invalid packet:", p))
1545 # from service back to client
1546 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1547 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1548 TCP(sport=local_port, dport=12345))
1549 self.pg0.add_stream(p)
1550 self.pg_enable_capture(self.pg_interfaces)
1552 capture = self.pg1.get_capture(1)
1557 self.assertEqual(ip.src, self.nat_addr)
1558 self.assertEqual(tcp.sport, external_port)
1559 self.check_tcp_checksum(p)
1560 self.check_ip_checksum(p)
1562 self.logger.error(ppp("Unexpected or invalid packet:", p))
1565 # from client to server (no translation)
1566 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1567 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1568 TCP(sport=12346, dport=local_port))
1569 self.pg1.add_stream(p)
1570 self.pg_enable_capture(self.pg_interfaces)
1572 capture = self.pg0.get_capture(1)
1578 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1579 self.assertEqual(tcp.dport, local_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1586 # from service back to client (no translation)
1587 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1588 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1589 TCP(sport=local_port, dport=12346))
1590 self.pg0.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg1.get_capture(1)
1598 self.assertEqual(ip.src, self.pg0.remote_ip4)
1599 self.assertEqual(tcp.sport, local_port)
1600 self.check_tcp_checksum(p)
1601 self.check_ip_checksum(p)
1603 self.logger.error(ppp("Unexpected or invalid packet:", p))
1606 def test_static_vrf_aware(self):
1607 """ 1:1 NAT VRF awareness """
1609 nat_ip1 = "10.0.0.30"
1610 nat_ip2 = "10.0.0.40"
1611 self.tcp_port_out = 6303
1612 self.udp_port_out = 6304
1613 self.icmp_id_out = 6305
1615 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1617 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1619 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1621 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1622 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1624 # inside interface VRF match NAT44 static mapping VRF
1625 pkts = self.create_stream_in(self.pg4, self.pg3)
1626 self.pg4.add_stream(pkts)
1627 self.pg_enable_capture(self.pg_interfaces)
1629 capture = self.pg3.get_capture(len(pkts))
1630 self.verify_capture_out(capture, nat_ip1, True)
1632 # inside interface VRF don't match NAT44 static mapping VRF (packets
1634 pkts = self.create_stream_in(self.pg0, self.pg3)
1635 self.pg0.add_stream(pkts)
1636 self.pg_enable_capture(self.pg_interfaces)
1638 self.pg3.assert_nothing_captured()
1640 def test_identity_nat(self):
1641 """ Identity NAT """
1643 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1644 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1645 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1648 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1649 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1650 TCP(sport=12345, dport=56789))
1651 self.pg1.add_stream(p)
1652 self.pg_enable_capture(self.pg_interfaces)
1654 capture = self.pg0.get_capture(1)
1659 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1660 self.assertEqual(ip.src, self.pg1.remote_ip4)
1661 self.assertEqual(tcp.dport, 56789)
1662 self.assertEqual(tcp.sport, 12345)
1663 self.check_tcp_checksum(p)
1664 self.check_ip_checksum(p)
1666 self.logger.error(ppp("Unexpected or invalid packet:", p))
1669 def test_static_lb(self):
1670 """ NAT44 local service load balancing """
1671 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1674 server1 = self.pg0.remote_hosts[0]
1675 server2 = self.pg0.remote_hosts[1]
1677 locals = [{'addr': server1.ip4n,
1680 {'addr': server2.ip4n,
1684 self.nat44_add_address(self.nat_addr)
1685 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1688 local_num=len(locals),
1690 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1691 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1694 # from client to service
1695 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1696 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1697 TCP(sport=12345, dport=external_port))
1698 self.pg1.add_stream(p)
1699 self.pg_enable_capture(self.pg_interfaces)
1701 capture = self.pg0.get_capture(1)
1707 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1708 if ip.dst == server1.ip4:
1712 self.assertEqual(tcp.dport, local_port)
1713 self.check_tcp_checksum(p)
1714 self.check_ip_checksum(p)
1716 self.logger.error(ppp("Unexpected or invalid packet:", p))
1719 # from service back to client
1720 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1721 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1722 TCP(sport=local_port, dport=12345))
1723 self.pg0.add_stream(p)
1724 self.pg_enable_capture(self.pg_interfaces)
1726 capture = self.pg1.get_capture(1)
1731 self.assertEqual(ip.src, self.nat_addr)
1732 self.assertEqual(tcp.sport, external_port)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1742 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1744 for client in clients:
1745 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1746 IP(src=client, dst=self.nat_addr) /
1747 TCP(sport=12345, dport=external_port))
1749 self.pg1.add_stream(pkts)
1750 self.pg_enable_capture(self.pg_interfaces)
1752 capture = self.pg0.get_capture(len(pkts))
1754 if p[IP].dst == server1.ip4:
1758 self.assertTrue(server1_n > server2_n)
1760 def test_static_lb_2(self):
1761 """ NAT44 local service load balancing (asymmetrical rule) """
1762 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1765 server1 = self.pg0.remote_hosts[0]
1766 server2 = self.pg0.remote_hosts[1]
1768 locals = [{'addr': server1.ip4n,
1771 {'addr': server2.ip4n,
1775 self.vapi.nat44_forwarding_enable_disable(1)
1776 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1780 local_num=len(locals),
1782 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1783 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1786 # from client to service
1787 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1788 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1789 TCP(sport=12345, dport=external_port))
1790 self.pg1.add_stream(p)
1791 self.pg_enable_capture(self.pg_interfaces)
1793 capture = self.pg0.get_capture(1)
1799 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1800 if ip.dst == server1.ip4:
1804 self.assertEqual(tcp.dport, local_port)
1805 self.check_tcp_checksum(p)
1806 self.check_ip_checksum(p)
1808 self.logger.error(ppp("Unexpected or invalid packet:", p))
1811 # from service back to client
1812 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1813 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1814 TCP(sport=local_port, dport=12345))
1815 self.pg0.add_stream(p)
1816 self.pg_enable_capture(self.pg_interfaces)
1818 capture = self.pg1.get_capture(1)
1823 self.assertEqual(ip.src, self.nat_addr)
1824 self.assertEqual(tcp.sport, external_port)
1825 self.check_tcp_checksum(p)
1826 self.check_ip_checksum(p)
1828 self.logger.error(ppp("Unexpected or invalid packet:", p))
1831 # from client to server (no translation)
1832 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1833 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1834 TCP(sport=12346, dport=local_port))
1835 self.pg1.add_stream(p)
1836 self.pg_enable_capture(self.pg_interfaces)
1838 capture = self.pg0.get_capture(1)
1844 self.assertEqual(ip.dst, server1.ip4)
1845 self.assertEqual(tcp.dport, local_port)
1846 self.check_tcp_checksum(p)
1847 self.check_ip_checksum(p)
1849 self.logger.error(ppp("Unexpected or invalid packet:", p))
1852 # from service back to client (no translation)
1853 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1854 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1855 TCP(sport=local_port, dport=12346))
1856 self.pg0.add_stream(p)
1857 self.pg_enable_capture(self.pg_interfaces)
1859 capture = self.pg1.get_capture(1)
1864 self.assertEqual(ip.src, server1.ip4)
1865 self.assertEqual(tcp.sport, local_port)
1866 self.check_tcp_checksum(p)
1867 self.check_ip_checksum(p)
1869 self.logger.error(ppp("Unexpected or invalid packet:", p))
1872 def test_multiple_inside_interfaces(self):
1873 """ NAT44 multiple non-overlapping address space inside interfaces """
1875 self.nat44_add_address(self.nat_addr)
1876 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1877 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1878 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1881 # between two NAT44 inside interfaces (no translation)
1882 pkts = self.create_stream_in(self.pg0, self.pg1)
1883 self.pg0.add_stream(pkts)
1884 self.pg_enable_capture(self.pg_interfaces)
1886 capture = self.pg1.get_capture(len(pkts))
1887 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1889 # from NAT44 inside to interface without NAT44 feature (no translation)
1890 pkts = self.create_stream_in(self.pg0, self.pg2)
1891 self.pg0.add_stream(pkts)
1892 self.pg_enable_capture(self.pg_interfaces)
1894 capture = self.pg2.get_capture(len(pkts))
1895 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1897 # in2out 1st interface
1898 pkts = self.create_stream_in(self.pg0, self.pg3)
1899 self.pg0.add_stream(pkts)
1900 self.pg_enable_capture(self.pg_interfaces)
1902 capture = self.pg3.get_capture(len(pkts))
1903 self.verify_capture_out(capture)
1905 # out2in 1st interface
1906 pkts = self.create_stream_out(self.pg3)
1907 self.pg3.add_stream(pkts)
1908 self.pg_enable_capture(self.pg_interfaces)
1910 capture = self.pg0.get_capture(len(pkts))
1911 self.verify_capture_in(capture, self.pg0)
1913 # in2out 2nd interface
1914 pkts = self.create_stream_in(self.pg1, self.pg3)
1915 self.pg1.add_stream(pkts)
1916 self.pg_enable_capture(self.pg_interfaces)
1918 capture = self.pg3.get_capture(len(pkts))
1919 self.verify_capture_out(capture)
1921 # out2in 2nd interface
1922 pkts = self.create_stream_out(self.pg3)
1923 self.pg3.add_stream(pkts)
1924 self.pg_enable_capture(self.pg_interfaces)
1926 capture = self.pg1.get_capture(len(pkts))
1927 self.verify_capture_in(capture, self.pg1)
1929 def test_inside_overlapping_interfaces(self):
1930 """ NAT44 multiple inside interfaces with overlapping address space """
1932 static_nat_ip = "10.0.0.10"
1933 self.nat44_add_address(self.nat_addr)
1934 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1936 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1937 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1938 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1939 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1942 # between NAT44 inside interfaces with same VRF (no translation)
1943 pkts = self.create_stream_in(self.pg4, self.pg5)
1944 self.pg4.add_stream(pkts)
1945 self.pg_enable_capture(self.pg_interfaces)
1947 capture = self.pg5.get_capture(len(pkts))
1948 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1950 # between NAT44 inside interfaces with different VRF (hairpinning)
1951 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1952 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1953 TCP(sport=1234, dport=5678))
1954 self.pg4.add_stream(p)
1955 self.pg_enable_capture(self.pg_interfaces)
1957 capture = self.pg6.get_capture(1)
1962 self.assertEqual(ip.src, self.nat_addr)
1963 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1964 self.assertNotEqual(tcp.sport, 1234)
1965 self.assertEqual(tcp.dport, 5678)
1967 self.logger.error(ppp("Unexpected or invalid packet:", p))
1970 # in2out 1st interface
1971 pkts = self.create_stream_in(self.pg4, self.pg3)
1972 self.pg4.add_stream(pkts)
1973 self.pg_enable_capture(self.pg_interfaces)
1975 capture = self.pg3.get_capture(len(pkts))
1976 self.verify_capture_out(capture)
1978 # out2in 1st interface
1979 pkts = self.create_stream_out(self.pg3)
1980 self.pg3.add_stream(pkts)
1981 self.pg_enable_capture(self.pg_interfaces)
1983 capture = self.pg4.get_capture(len(pkts))
1984 self.verify_capture_in(capture, self.pg4)
1986 # in2out 2nd interface
1987 pkts = self.create_stream_in(self.pg5, self.pg3)
1988 self.pg5.add_stream(pkts)
1989 self.pg_enable_capture(self.pg_interfaces)
1991 capture = self.pg3.get_capture(len(pkts))
1992 self.verify_capture_out(capture)
1994 # out2in 2nd interface
1995 pkts = self.create_stream_out(self.pg3)
1996 self.pg3.add_stream(pkts)
1997 self.pg_enable_capture(self.pg_interfaces)
1999 capture = self.pg5.get_capture(len(pkts))
2000 self.verify_capture_in(capture, self.pg5)
2003 addresses = self.vapi.nat44_address_dump()
2004 self.assertEqual(len(addresses), 1)
2005 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2006 self.assertEqual(len(sessions), 3)
2007 for session in sessions:
2008 self.assertFalse(session.is_static)
2009 self.assertEqual(session.inside_ip_address[0:4],
2010 self.pg5.remote_ip4n)
2011 self.assertEqual(session.outside_ip_address,
2012 addresses[0].ip_address)
2013 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2014 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2015 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2016 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2017 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2018 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2019 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2020 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2021 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2023 # in2out 3rd interface
2024 pkts = self.create_stream_in(self.pg6, self.pg3)
2025 self.pg6.add_stream(pkts)
2026 self.pg_enable_capture(self.pg_interfaces)
2028 capture = self.pg3.get_capture(len(pkts))
2029 self.verify_capture_out(capture, static_nat_ip, True)
2031 # out2in 3rd interface
2032 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2033 self.pg3.add_stream(pkts)
2034 self.pg_enable_capture(self.pg_interfaces)
2036 capture = self.pg6.get_capture(len(pkts))
2037 self.verify_capture_in(capture, self.pg6)
2039 # general user and session dump verifications
2040 users = self.vapi.nat44_user_dump()
2041 self.assertTrue(len(users) >= 3)
2042 addresses = self.vapi.nat44_address_dump()
2043 self.assertEqual(len(addresses), 1)
2045 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2047 for session in sessions:
2048 self.assertEqual(user.ip_address, session.inside_ip_address)
2049 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2050 self.assertTrue(session.protocol in
2051 [IP_PROTOS.tcp, IP_PROTOS.udp,
2055 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2056 self.assertTrue(len(sessions) >= 4)
2057 for session in sessions:
2058 self.assertFalse(session.is_static)
2059 self.assertEqual(session.inside_ip_address[0:4],
2060 self.pg4.remote_ip4n)
2061 self.assertEqual(session.outside_ip_address,
2062 addresses[0].ip_address)
2065 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2066 self.assertTrue(len(sessions) >= 3)
2067 for session in sessions:
2068 self.assertTrue(session.is_static)
2069 self.assertEqual(session.inside_ip_address[0:4],
2070 self.pg6.remote_ip4n)
2071 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2072 map(int, static_nat_ip.split('.')))
2073 self.assertTrue(session.inside_port in
2074 [self.tcp_port_in, self.udp_port_in,
2077 def test_hairpinning(self):
2078 """ NAT44 hairpinning - 1:1 NAPT """
2080 host = self.pg0.remote_hosts[0]
2081 server = self.pg0.remote_hosts[1]
2084 server_in_port = 5678
2085 server_out_port = 8765
2087 self.nat44_add_address(self.nat_addr)
2088 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2089 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2091 # add static mapping for server
2092 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2093 server_in_port, server_out_port,
2094 proto=IP_PROTOS.tcp)
2096 # send packet from host to server
2097 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2098 IP(src=host.ip4, dst=self.nat_addr) /
2099 TCP(sport=host_in_port, dport=server_out_port))
2100 self.pg0.add_stream(p)
2101 self.pg_enable_capture(self.pg_interfaces)
2103 capture = self.pg0.get_capture(1)
2108 self.assertEqual(ip.src, self.nat_addr)
2109 self.assertEqual(ip.dst, server.ip4)
2110 self.assertNotEqual(tcp.sport, host_in_port)
2111 self.assertEqual(tcp.dport, server_in_port)
2112 self.check_tcp_checksum(p)
2113 host_out_port = tcp.sport
2115 self.logger.error(ppp("Unexpected or invalid packet:", p))
2118 # send reply from server to host
2119 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2120 IP(src=server.ip4, dst=self.nat_addr) /
2121 TCP(sport=server_in_port, dport=host_out_port))
2122 self.pg0.add_stream(p)
2123 self.pg_enable_capture(self.pg_interfaces)
2125 capture = self.pg0.get_capture(1)
2130 self.assertEqual(ip.src, self.nat_addr)
2131 self.assertEqual(ip.dst, host.ip4)
2132 self.assertEqual(tcp.sport, server_out_port)
2133 self.assertEqual(tcp.dport, host_in_port)
2134 self.check_tcp_checksum(p)
2136 self.logger.error(ppp("Unexpected or invalid packet:", p))
2139 def test_hairpinning2(self):
2140 """ NAT44 hairpinning - 1:1 NAT"""
2142 server1_nat_ip = "10.0.0.10"
2143 server2_nat_ip = "10.0.0.11"
2144 host = self.pg0.remote_hosts[0]
2145 server1 = self.pg0.remote_hosts[1]
2146 server2 = self.pg0.remote_hosts[2]
2147 server_tcp_port = 22
2148 server_udp_port = 20
2150 self.nat44_add_address(self.nat_addr)
2151 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2152 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2155 # add static mapping for servers
2156 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2157 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2162 IP(src=host.ip4, dst=server1_nat_ip) /
2163 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2165 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2166 IP(src=host.ip4, dst=server1_nat_ip) /
2167 UDP(sport=self.udp_port_in, dport=server_udp_port))
2169 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2170 IP(src=host.ip4, dst=server1_nat_ip) /
2171 ICMP(id=self.icmp_id_in, type='echo-request'))
2173 self.pg0.add_stream(pkts)
2174 self.pg_enable_capture(self.pg_interfaces)
2176 capture = self.pg0.get_capture(len(pkts))
2177 for packet in capture:
2179 self.assertEqual(packet[IP].src, self.nat_addr)
2180 self.assertEqual(packet[IP].dst, server1.ip4)
2181 if packet.haslayer(TCP):
2182 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2183 self.assertEqual(packet[TCP].dport, server_tcp_port)
2184 self.tcp_port_out = packet[TCP].sport
2185 self.check_tcp_checksum(packet)
2186 elif packet.haslayer(UDP):
2187 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2188 self.assertEqual(packet[UDP].dport, server_udp_port)
2189 self.udp_port_out = packet[UDP].sport
2191 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2192 self.icmp_id_out = packet[ICMP].id
2194 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2199 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2200 IP(src=server1.ip4, dst=self.nat_addr) /
2201 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2203 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2204 IP(src=server1.ip4, dst=self.nat_addr) /
2205 UDP(sport=server_udp_port, dport=self.udp_port_out))
2207 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2208 IP(src=server1.ip4, dst=self.nat_addr) /
2209 ICMP(id=self.icmp_id_out, type='echo-reply'))
2211 self.pg0.add_stream(pkts)
2212 self.pg_enable_capture(self.pg_interfaces)
2214 capture = self.pg0.get_capture(len(pkts))
2215 for packet in capture:
2217 self.assertEqual(packet[IP].src, server1_nat_ip)
2218 self.assertEqual(packet[IP].dst, host.ip4)
2219 if packet.haslayer(TCP):
2220 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2221 self.assertEqual(packet[TCP].sport, server_tcp_port)
2222 self.check_tcp_checksum(packet)
2223 elif packet.haslayer(UDP):
2224 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2225 self.assertEqual(packet[UDP].sport, server_udp_port)
2227 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2229 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2232 # server2 to server1
2234 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2235 IP(src=server2.ip4, dst=server1_nat_ip) /
2236 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2238 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2239 IP(src=server2.ip4, dst=server1_nat_ip) /
2240 UDP(sport=self.udp_port_in, dport=server_udp_port))
2242 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2243 IP(src=server2.ip4, dst=server1_nat_ip) /
2244 ICMP(id=self.icmp_id_in, type='echo-request'))
2246 self.pg0.add_stream(pkts)
2247 self.pg_enable_capture(self.pg_interfaces)
2249 capture = self.pg0.get_capture(len(pkts))
2250 for packet in capture:
2252 self.assertEqual(packet[IP].src, server2_nat_ip)
2253 self.assertEqual(packet[IP].dst, server1.ip4)
2254 if packet.haslayer(TCP):
2255 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2256 self.assertEqual(packet[TCP].dport, server_tcp_port)
2257 self.tcp_port_out = packet[TCP].sport
2258 self.check_tcp_checksum(packet)
2259 elif packet.haslayer(UDP):
2260 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2261 self.assertEqual(packet[UDP].dport, server_udp_port)
2262 self.udp_port_out = packet[UDP].sport
2264 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2265 self.icmp_id_out = packet[ICMP].id
2267 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2270 # server1 to server2
2272 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2273 IP(src=server1.ip4, dst=server2_nat_ip) /
2274 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2276 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2277 IP(src=server1.ip4, dst=server2_nat_ip) /
2278 UDP(sport=server_udp_port, dport=self.udp_port_out))
2280 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2281 IP(src=server1.ip4, dst=server2_nat_ip) /
2282 ICMP(id=self.icmp_id_out, type='echo-reply'))
2284 self.pg0.add_stream(pkts)
2285 self.pg_enable_capture(self.pg_interfaces)
2287 capture = self.pg0.get_capture(len(pkts))
2288 for packet in capture:
2290 self.assertEqual(packet[IP].src, server1_nat_ip)
2291 self.assertEqual(packet[IP].dst, server2.ip4)
2292 if packet.haslayer(TCP):
2293 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2294 self.assertEqual(packet[TCP].sport, server_tcp_port)
2295 self.check_tcp_checksum(packet)
2296 elif packet.haslayer(UDP):
2297 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2298 self.assertEqual(packet[UDP].sport, server_udp_port)
2300 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2302 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2305 def test_max_translations_per_user(self):
2306 """ MAX translations per user - recycle the least recently used """
2308 self.nat44_add_address(self.nat_addr)
2309 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2310 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2313 # get maximum number of translations per user
2314 nat44_config = self.vapi.nat_show_config()
2316 # send more than maximum number of translations per user packets
2317 pkts_num = nat44_config.max_translations_per_user + 5
2319 for port in range(0, pkts_num):
2320 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2321 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2322 TCP(sport=1025 + port))
2324 self.pg0.add_stream(pkts)
2325 self.pg_enable_capture(self.pg_interfaces)
2328 # verify number of translated packet
2329 self.pg1.get_capture(pkts_num)
2331 def test_interface_addr(self):
2332 """ Acquire NAT44 addresses from interface """
2333 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2335 # no address in NAT pool
2336 adresses = self.vapi.nat44_address_dump()
2337 self.assertEqual(0, len(adresses))
2339 # configure interface address and check NAT address pool
2340 self.pg7.config_ip4()
2341 adresses = self.vapi.nat44_address_dump()
2342 self.assertEqual(1, len(adresses))
2343 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2345 # remove interface address and check NAT address pool
2346 self.pg7.unconfig_ip4()
2347 adresses = self.vapi.nat44_address_dump()
2348 self.assertEqual(0, len(adresses))
2350 def test_interface_addr_static_mapping(self):
2351 """ Static mapping with addresses from interface """
2354 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2355 self.nat44_add_static_mapping(
2357 external_sw_if_index=self.pg7.sw_if_index,
2360 # static mappings with external interface
2361 static_mappings = self.vapi.nat44_static_mapping_dump()
2362 self.assertEqual(1, len(static_mappings))
2363 self.assertEqual(self.pg7.sw_if_index,
2364 static_mappings[0].external_sw_if_index)
2365 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2367 # configure interface address and check static mappings
2368 self.pg7.config_ip4()
2369 static_mappings = self.vapi.nat44_static_mapping_dump()
2370 self.assertEqual(1, len(static_mappings))
2371 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2372 self.pg7.local_ip4n)
2373 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2374 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2376 # remove interface address and check static mappings
2377 self.pg7.unconfig_ip4()
2378 static_mappings = self.vapi.nat44_static_mapping_dump()
2379 self.assertEqual(0, len(static_mappings))
2381 def test_interface_addr_identity_nat(self):
2382 """ Identity NAT with addresses from interface """
2385 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2386 self.vapi.nat44_add_del_identity_mapping(
2387 sw_if_index=self.pg7.sw_if_index,
2389 protocol=IP_PROTOS.tcp,
2392 # identity mappings with external interface
2393 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2394 self.assertEqual(1, len(identity_mappings))
2395 self.assertEqual(self.pg7.sw_if_index,
2396 identity_mappings[0].sw_if_index)
2398 # configure interface address and check identity mappings
2399 self.pg7.config_ip4()
2400 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2401 self.assertEqual(1, len(identity_mappings))
2402 self.assertEqual(identity_mappings[0].ip_address,
2403 self.pg7.local_ip4n)
2404 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2405 self.assertEqual(port, identity_mappings[0].port)
2406 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2408 # remove interface address and check identity mappings
2409 self.pg7.unconfig_ip4()
2410 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2411 self.assertEqual(0, len(identity_mappings))
2413 def test_ipfix_nat44_sess(self):
2414 """ IPFIX logging NAT44 session created/delted """
2415 self.ipfix_domain_id = 10
2416 self.ipfix_src_port = 20202
2417 colector_port = 30303
2418 bind_layers(UDP, IPFIX, dport=30303)
2419 self.nat44_add_address(self.nat_addr)
2420 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2421 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2423 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2424 src_address=self.pg3.local_ip4n,
2426 template_interval=10,
2427 collector_port=colector_port)
2428 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2429 src_port=self.ipfix_src_port)
2431 pkts = self.create_stream_in(self.pg0, self.pg1)
2432 self.pg0.add_stream(pkts)
2433 self.pg_enable_capture(self.pg_interfaces)
2435 capture = self.pg1.get_capture(len(pkts))
2436 self.verify_capture_out(capture)
2437 self.nat44_add_address(self.nat_addr, is_add=0)
2438 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2439 capture = self.pg3.get_capture(9)
2440 ipfix = IPFIXDecoder()
2441 # first load template
2443 self.assertTrue(p.haslayer(IPFIX))
2444 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2445 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2446 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2447 self.assertEqual(p[UDP].dport, colector_port)
2448 self.assertEqual(p[IPFIX].observationDomainID,
2449 self.ipfix_domain_id)
2450 if p.haslayer(Template):
2451 ipfix.add_template(p.getlayer(Template))
2452 # verify events in data set
2454 if p.haslayer(Data):
2455 data = ipfix.decode_data_set(p.getlayer(Set))
2456 self.verify_ipfix_nat44_ses(data)
2458 def test_ipfix_addr_exhausted(self):
2459 """ IPFIX logging NAT addresses exhausted """
2460 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2461 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2463 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2464 src_address=self.pg3.local_ip4n,
2466 template_interval=10)
2467 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2468 src_port=self.ipfix_src_port)
2470 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2471 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2473 self.pg0.add_stream(p)
2474 self.pg_enable_capture(self.pg_interfaces)
2476 capture = self.pg1.get_capture(0)
2477 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2478 capture = self.pg3.get_capture(9)
2479 ipfix = IPFIXDecoder()
2480 # first load template
2482 self.assertTrue(p.haslayer(IPFIX))
2483 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2484 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2485 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2486 self.assertEqual(p[UDP].dport, 4739)
2487 self.assertEqual(p[IPFIX].observationDomainID,
2488 self.ipfix_domain_id)
2489 if p.haslayer(Template):
2490 ipfix.add_template(p.getlayer(Template))
2491 # verify events in data set
2493 if p.haslayer(Data):
2494 data = ipfix.decode_data_set(p.getlayer(Set))
2495 self.verify_ipfix_addr_exhausted(data)
2497 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2498 def test_ipfix_max_sessions(self):
2499 """ IPFIX logging maximum session entries exceeded """
2500 self.nat44_add_address(self.nat_addr)
2501 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2502 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2505 nat44_config = self.vapi.nat_show_config()
2506 max_sessions = 10 * nat44_config.translation_buckets
2509 for i in range(0, max_sessions):
2510 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512 IP(src=src, dst=self.pg1.remote_ip4) /
2515 self.pg0.add_stream(pkts)
2516 self.pg_enable_capture(self.pg_interfaces)
2519 self.pg1.get_capture(max_sessions)
2520 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2521 src_address=self.pg3.local_ip4n,
2523 template_interval=10)
2524 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2525 src_port=self.ipfix_src_port)
2527 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2528 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2530 self.pg0.add_stream(p)
2531 self.pg_enable_capture(self.pg_interfaces)
2533 self.pg1.get_capture(0)
2534 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2535 capture = self.pg3.get_capture(9)
2536 ipfix = IPFIXDecoder()
2537 # first load template
2539 self.assertTrue(p.haslayer(IPFIX))
2540 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543 self.assertEqual(p[UDP].dport, 4739)
2544 self.assertEqual(p[IPFIX].observationDomainID,
2545 self.ipfix_domain_id)
2546 if p.haslayer(Template):
2547 ipfix.add_template(p.getlayer(Template))
2548 # verify events in data set
2550 if p.haslayer(Data):
2551 data = ipfix.decode_data_set(p.getlayer(Set))
2552 self.verify_ipfix_max_sessions(data, max_sessions)
2554 def test_pool_addr_fib(self):
2555 """ NAT44 add pool addresses to FIB """
2556 static_addr = '10.0.0.10'
2557 self.nat44_add_address(self.nat_addr)
2558 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2559 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2561 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2564 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2565 ARP(op=ARP.who_has, pdst=self.nat_addr,
2566 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2567 self.pg1.add_stream(p)
2568 self.pg_enable_capture(self.pg_interfaces)
2570 capture = self.pg1.get_capture(1)
2571 self.assertTrue(capture[0].haslayer(ARP))
2572 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2575 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2576 ARP(op=ARP.who_has, pdst=static_addr,
2577 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2578 self.pg1.add_stream(p)
2579 self.pg_enable_capture(self.pg_interfaces)
2581 capture = self.pg1.get_capture(1)
2582 self.assertTrue(capture[0].haslayer(ARP))
2583 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2585 # send ARP to non-NAT44 interface
2586 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2587 ARP(op=ARP.who_has, pdst=self.nat_addr,
2588 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2589 self.pg2.add_stream(p)
2590 self.pg_enable_capture(self.pg_interfaces)
2592 capture = self.pg1.get_capture(0)
2594 # remove addresses and verify
2595 self.nat44_add_address(self.nat_addr, is_add=0)
2596 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2599 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2600 ARP(op=ARP.who_has, pdst=self.nat_addr,
2601 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2602 self.pg1.add_stream(p)
2603 self.pg_enable_capture(self.pg_interfaces)
2605 capture = self.pg1.get_capture(0)
2607 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2608 ARP(op=ARP.who_has, pdst=static_addr,
2609 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2610 self.pg1.add_stream(p)
2611 self.pg_enable_capture(self.pg_interfaces)
2613 capture = self.pg1.get_capture(0)
2615 def test_vrf_mode(self):
2616 """ NAT44 tenant VRF aware address pool mode """
2620 nat_ip1 = "10.0.0.10"
2621 nat_ip2 = "10.0.0.11"
2623 self.pg0.unconfig_ip4()
2624 self.pg1.unconfig_ip4()
2625 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2626 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2627 self.pg0.set_table_ip4(vrf_id1)
2628 self.pg1.set_table_ip4(vrf_id2)
2629 self.pg0.config_ip4()
2630 self.pg1.config_ip4()
2632 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2633 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2634 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2635 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2636 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2640 pkts = self.create_stream_in(self.pg0, self.pg2)
2641 self.pg0.add_stream(pkts)
2642 self.pg_enable_capture(self.pg_interfaces)
2644 capture = self.pg2.get_capture(len(pkts))
2645 self.verify_capture_out(capture, nat_ip1)
2648 pkts = self.create_stream_in(self.pg1, self.pg2)
2649 self.pg1.add_stream(pkts)
2650 self.pg_enable_capture(self.pg_interfaces)
2652 capture = self.pg2.get_capture(len(pkts))
2653 self.verify_capture_out(capture, nat_ip2)
2655 self.pg0.unconfig_ip4()
2656 self.pg1.unconfig_ip4()
2657 self.pg0.set_table_ip4(0)
2658 self.pg1.set_table_ip4(0)
2659 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2660 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2662 def test_vrf_feature_independent(self):
2663 """ NAT44 tenant VRF independent address pool mode """
2665 nat_ip1 = "10.0.0.10"
2666 nat_ip2 = "10.0.0.11"
2668 self.nat44_add_address(nat_ip1)
2669 self.nat44_add_address(nat_ip2, vrf_id=99)
2670 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2671 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2672 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2676 pkts = self.create_stream_in(self.pg0, self.pg2)
2677 self.pg0.add_stream(pkts)
2678 self.pg_enable_capture(self.pg_interfaces)
2680 capture = self.pg2.get_capture(len(pkts))
2681 self.verify_capture_out(capture, nat_ip1)
2684 pkts = self.create_stream_in(self.pg1, self.pg2)
2685 self.pg1.add_stream(pkts)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg2.get_capture(len(pkts))
2689 self.verify_capture_out(capture, nat_ip1)
2691 def test_dynamic_ipless_interfaces(self):
2692 """ NAT44 interfaces without configured IP address """
2694 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2695 mactobinary(self.pg7.remote_mac),
2696 self.pg7.remote_ip4n,
2698 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2699 mactobinary(self.pg8.remote_mac),
2700 self.pg8.remote_ip4n,
2703 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2704 dst_address_length=32,
2705 next_hop_address=self.pg7.remote_ip4n,
2706 next_hop_sw_if_index=self.pg7.sw_if_index)
2707 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2708 dst_address_length=32,
2709 next_hop_address=self.pg8.remote_ip4n,
2710 next_hop_sw_if_index=self.pg8.sw_if_index)
2712 self.nat44_add_address(self.nat_addr)
2713 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2714 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2718 pkts = self.create_stream_in(self.pg7, self.pg8)
2719 self.pg7.add_stream(pkts)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg8.get_capture(len(pkts))
2723 self.verify_capture_out(capture)
2726 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2727 self.pg8.add_stream(pkts)
2728 self.pg_enable_capture(self.pg_interfaces)
2730 capture = self.pg7.get_capture(len(pkts))
2731 self.verify_capture_in(capture, self.pg7)
2733 def test_static_ipless_interfaces(self):
2734 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2736 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2737 mactobinary(self.pg7.remote_mac),
2738 self.pg7.remote_ip4n,
2740 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2741 mactobinary(self.pg8.remote_mac),
2742 self.pg8.remote_ip4n,
2745 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2746 dst_address_length=32,
2747 next_hop_address=self.pg7.remote_ip4n,
2748 next_hop_sw_if_index=self.pg7.sw_if_index)
2749 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2750 dst_address_length=32,
2751 next_hop_address=self.pg8.remote_ip4n,
2752 next_hop_sw_if_index=self.pg8.sw_if_index)
2754 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2755 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2756 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2760 pkts = self.create_stream_out(self.pg8)
2761 self.pg8.add_stream(pkts)
2762 self.pg_enable_capture(self.pg_interfaces)
2764 capture = self.pg7.get_capture(len(pkts))
2765 self.verify_capture_in(capture, self.pg7)
2768 pkts = self.create_stream_in(self.pg7, self.pg8)
2769 self.pg7.add_stream(pkts)
2770 self.pg_enable_capture(self.pg_interfaces)
2772 capture = self.pg8.get_capture(len(pkts))
2773 self.verify_capture_out(capture, self.nat_addr, True)
2775 def test_static_with_port_ipless_interfaces(self):
2776 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2778 self.tcp_port_out = 30606
2779 self.udp_port_out = 30607
2780 self.icmp_id_out = 30608
2782 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2783 mactobinary(self.pg7.remote_mac),
2784 self.pg7.remote_ip4n,
2786 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2787 mactobinary(self.pg8.remote_mac),
2788 self.pg8.remote_ip4n,
2791 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2792 dst_address_length=32,
2793 next_hop_address=self.pg7.remote_ip4n,
2794 next_hop_sw_if_index=self.pg7.sw_if_index)
2795 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2796 dst_address_length=32,
2797 next_hop_address=self.pg8.remote_ip4n,
2798 next_hop_sw_if_index=self.pg8.sw_if_index)
2800 self.nat44_add_address(self.nat_addr)
2801 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2802 self.tcp_port_in, self.tcp_port_out,
2803 proto=IP_PROTOS.tcp)
2804 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2805 self.udp_port_in, self.udp_port_out,
2806 proto=IP_PROTOS.udp)
2807 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2808 self.icmp_id_in, self.icmp_id_out,
2809 proto=IP_PROTOS.icmp)
2810 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2811 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2815 pkts = self.create_stream_out(self.pg8)
2816 self.pg8.add_stream(pkts)
2817 self.pg_enable_capture(self.pg_interfaces)
2819 capture = self.pg7.get_capture(len(pkts))
2820 self.verify_capture_in(capture, self.pg7)
2823 pkts = self.create_stream_in(self.pg7, self.pg8)
2824 self.pg7.add_stream(pkts)
2825 self.pg_enable_capture(self.pg_interfaces)
2827 capture = self.pg8.get_capture(len(pkts))
2828 self.verify_capture_out(capture)
2830 def test_static_unknown_proto(self):
2831 """ 1:1 NAT translate packet with unknown protocol """
2832 nat_ip = "10.0.0.10"
2833 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2834 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2835 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2839 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2840 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2842 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2843 TCP(sport=1234, dport=1234))
2844 self.pg0.add_stream(p)
2845 self.pg_enable_capture(self.pg_interfaces)
2847 p = self.pg1.get_capture(1)
2850 self.assertEqual(packet[IP].src, nat_ip)
2851 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2852 self.assertTrue(packet.haslayer(GRE))
2853 self.check_ip_checksum(packet)
2855 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2859 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2860 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2862 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2863 TCP(sport=1234, dport=1234))
2864 self.pg1.add_stream(p)
2865 self.pg_enable_capture(self.pg_interfaces)
2867 p = self.pg0.get_capture(1)
2870 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2871 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2872 self.assertTrue(packet.haslayer(GRE))
2873 self.check_ip_checksum(packet)
2875 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2878 def test_hairpinning_static_unknown_proto(self):
2879 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2881 host = self.pg0.remote_hosts[0]
2882 server = self.pg0.remote_hosts[1]
2884 host_nat_ip = "10.0.0.10"
2885 server_nat_ip = "10.0.0.11"
2887 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2888 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2889 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2890 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2894 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2895 IP(src=host.ip4, dst=server_nat_ip) /
2897 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2898 TCP(sport=1234, dport=1234))
2899 self.pg0.add_stream(p)
2900 self.pg_enable_capture(self.pg_interfaces)
2902 p = self.pg0.get_capture(1)
2905 self.assertEqual(packet[IP].src, host_nat_ip)
2906 self.assertEqual(packet[IP].dst, server.ip4)
2907 self.assertTrue(packet.haslayer(GRE))
2908 self.check_ip_checksum(packet)
2910 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2914 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2915 IP(src=server.ip4, dst=host_nat_ip) /
2917 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2918 TCP(sport=1234, dport=1234))
2919 self.pg0.add_stream(p)
2920 self.pg_enable_capture(self.pg_interfaces)
2922 p = self.pg0.get_capture(1)
2925 self.assertEqual(packet[IP].src, server_nat_ip)
2926 self.assertEqual(packet[IP].dst, host.ip4)
2927 self.assertTrue(packet.haslayer(GRE))
2928 self.check_ip_checksum(packet)
2930 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2933 def test_unknown_proto(self):
2934 """ NAT44 translate packet with unknown protocol """
2935 self.nat44_add_address(self.nat_addr)
2936 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2937 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2941 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2942 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2943 TCP(sport=self.tcp_port_in, dport=20))
2944 self.pg0.add_stream(p)
2945 self.pg_enable_capture(self.pg_interfaces)
2947 p = self.pg1.get_capture(1)
2949 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2950 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2952 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2953 TCP(sport=1234, dport=1234))
2954 self.pg0.add_stream(p)
2955 self.pg_enable_capture(self.pg_interfaces)
2957 p = self.pg1.get_capture(1)
2960 self.assertEqual(packet[IP].src, self.nat_addr)
2961 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2962 self.assertTrue(packet.haslayer(GRE))
2963 self.check_ip_checksum(packet)
2965 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2969 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2970 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2972 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2973 TCP(sport=1234, dport=1234))
2974 self.pg1.add_stream(p)
2975 self.pg_enable_capture(self.pg_interfaces)
2977 p = self.pg0.get_capture(1)
2980 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2981 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2982 self.assertTrue(packet.haslayer(GRE))
2983 self.check_ip_checksum(packet)
2985 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2988 def test_hairpinning_unknown_proto(self):
2989 """ NAT44 translate packet with unknown protocol - hairpinning """
2990 host = self.pg0.remote_hosts[0]
2991 server = self.pg0.remote_hosts[1]
2994 server_in_port = 5678
2995 server_out_port = 8765
2996 server_nat_ip = "10.0.0.11"
2998 self.nat44_add_address(self.nat_addr)
2999 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3000 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3003 # add static mapping for server
3004 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3007 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3008 IP(src=host.ip4, dst=server_nat_ip) /
3009 TCP(sport=host_in_port, dport=server_out_port))
3010 self.pg0.add_stream(p)
3011 self.pg_enable_capture(self.pg_interfaces)
3013 capture = self.pg0.get_capture(1)
3015 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3016 IP(src=host.ip4, dst=server_nat_ip) /
3018 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3019 TCP(sport=1234, dport=1234))
3020 self.pg0.add_stream(p)
3021 self.pg_enable_capture(self.pg_interfaces)
3023 p = self.pg0.get_capture(1)
3026 self.assertEqual(packet[IP].src, self.nat_addr)
3027 self.assertEqual(packet[IP].dst, server.ip4)
3028 self.assertTrue(packet.haslayer(GRE))
3029 self.check_ip_checksum(packet)
3031 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3035 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3036 IP(src=server.ip4, dst=self.nat_addr) /
3038 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3039 TCP(sport=1234, dport=1234))
3040 self.pg0.add_stream(p)
3041 self.pg_enable_capture(self.pg_interfaces)
3043 p = self.pg0.get_capture(1)
3046 self.assertEqual(packet[IP].src, server_nat_ip)
3047 self.assertEqual(packet[IP].dst, host.ip4)
3048 self.assertTrue(packet.haslayer(GRE))
3049 self.check_ip_checksum(packet)
3051 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3054 def test_output_feature(self):
3055 """ NAT44 interface output feature (in2out postrouting) """
3056 self.nat44_add_address(self.nat_addr)
3057 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3058 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3059 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3063 pkts = self.create_stream_in(self.pg0, self.pg3)
3064 self.pg0.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 capture = self.pg3.get_capture(len(pkts))
3068 self.verify_capture_out(capture)
3071 pkts = self.create_stream_out(self.pg3)
3072 self.pg3.add_stream(pkts)
3073 self.pg_enable_capture(self.pg_interfaces)
3075 capture = self.pg0.get_capture(len(pkts))
3076 self.verify_capture_in(capture, self.pg0)
3078 # from non-NAT interface to NAT inside interface
3079 pkts = self.create_stream_in(self.pg2, self.pg0)
3080 self.pg2.add_stream(pkts)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg0.get_capture(len(pkts))
3084 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3086 def test_output_feature_vrf_aware(self):
3087 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3088 nat_ip_vrf10 = "10.0.0.10"
3089 nat_ip_vrf20 = "10.0.0.20"
3091 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3092 dst_address_length=32,
3093 next_hop_address=self.pg3.remote_ip4n,
3094 next_hop_sw_if_index=self.pg3.sw_if_index,
3096 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3097 dst_address_length=32,
3098 next_hop_address=self.pg3.remote_ip4n,
3099 next_hop_sw_if_index=self.pg3.sw_if_index,
3102 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3103 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3104 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3105 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3106 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3110 pkts = self.create_stream_in(self.pg4, self.pg3)
3111 self.pg4.add_stream(pkts)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 capture = self.pg3.get_capture(len(pkts))
3115 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3118 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3119 self.pg3.add_stream(pkts)
3120 self.pg_enable_capture(self.pg_interfaces)
3122 capture = self.pg4.get_capture(len(pkts))
3123 self.verify_capture_in(capture, self.pg4)
3126 pkts = self.create_stream_in(self.pg6, self.pg3)
3127 self.pg6.add_stream(pkts)
3128 self.pg_enable_capture(self.pg_interfaces)
3130 capture = self.pg3.get_capture(len(pkts))
3131 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3134 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3135 self.pg3.add_stream(pkts)
3136 self.pg_enable_capture(self.pg_interfaces)
3138 capture = self.pg6.get_capture(len(pkts))
3139 self.verify_capture_in(capture, self.pg6)
3141 def test_output_feature_hairpinning(self):
3142 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3143 host = self.pg0.remote_hosts[0]
3144 server = self.pg0.remote_hosts[1]
3147 server_in_port = 5678
3148 server_out_port = 8765
3150 self.nat44_add_address(self.nat_addr)
3151 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3152 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3155 # add static mapping for server
3156 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3157 server_in_port, server_out_port,
3158 proto=IP_PROTOS.tcp)
3160 # send packet from host to server
3161 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3162 IP(src=host.ip4, dst=self.nat_addr) /
3163 TCP(sport=host_in_port, dport=server_out_port))
3164 self.pg0.add_stream(p)
3165 self.pg_enable_capture(self.pg_interfaces)
3167 capture = self.pg0.get_capture(1)
3172 self.assertEqual(ip.src, self.nat_addr)
3173 self.assertEqual(ip.dst, server.ip4)
3174 self.assertNotEqual(tcp.sport, host_in_port)
3175 self.assertEqual(tcp.dport, server_in_port)
3176 self.check_tcp_checksum(p)
3177 host_out_port = tcp.sport
3179 self.logger.error(ppp("Unexpected or invalid packet:", p))
3182 # send reply from server to host
3183 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3184 IP(src=server.ip4, dst=self.nat_addr) /
3185 TCP(sport=server_in_port, dport=host_out_port))
3186 self.pg0.add_stream(p)
3187 self.pg_enable_capture(self.pg_interfaces)
3189 capture = self.pg0.get_capture(1)
3194 self.assertEqual(ip.src, self.nat_addr)
3195 self.assertEqual(ip.dst, host.ip4)
3196 self.assertEqual(tcp.sport, server_out_port)
3197 self.assertEqual(tcp.dport, host_in_port)
3198 self.check_tcp_checksum(p)
3200 self.logger.error(ppp("Unexpected or invalid packet:", p))
3203 def test_one_armed_nat44(self):
3204 """ One armed NAT44 """
3205 remote_host = self.pg9.remote_hosts[0]
3206 local_host = self.pg9.remote_hosts[1]
3209 self.nat44_add_address(self.nat_addr)
3210 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3211 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3215 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3216 IP(src=local_host.ip4, dst=remote_host.ip4) /
3217 TCP(sport=12345, dport=80))
3218 self.pg9.add_stream(p)
3219 self.pg_enable_capture(self.pg_interfaces)
3221 capture = self.pg9.get_capture(1)
3226 self.assertEqual(ip.src, self.nat_addr)
3227 self.assertEqual(ip.dst, remote_host.ip4)
3228 self.assertNotEqual(tcp.sport, 12345)
3229 external_port = tcp.sport
3230 self.assertEqual(tcp.dport, 80)
3231 self.check_tcp_checksum(p)
3232 self.check_ip_checksum(p)
3234 self.logger.error(ppp("Unexpected or invalid packet:", p))
3238 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3239 IP(src=remote_host.ip4, dst=self.nat_addr) /
3240 TCP(sport=80, dport=external_port))
3241 self.pg9.add_stream(p)
3242 self.pg_enable_capture(self.pg_interfaces)
3244 capture = self.pg9.get_capture(1)
3249 self.assertEqual(ip.src, remote_host.ip4)
3250 self.assertEqual(ip.dst, local_host.ip4)
3251 self.assertEqual(tcp.sport, 80)
3252 self.assertEqual(tcp.dport, 12345)
3253 self.check_tcp_checksum(p)
3254 self.check_ip_checksum(p)
3256 self.logger.error(ppp("Unexpected or invalid packet:", p))
3259 def test_one_armed_nat44_static(self):
3260 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3261 remote_host = self.pg9.remote_hosts[0]
3262 local_host = self.pg9.remote_hosts[1]
3267 self.vapi.nat44_forwarding_enable_disable(1)
3268 self.nat44_add_address(self.nat_addr, twice_nat=1)
3269 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3270 local_port, external_port,
3271 proto=IP_PROTOS.tcp, out2in_only=1,
3273 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3274 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3277 # from client to service
3278 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3279 IP(src=remote_host.ip4, dst=self.nat_addr) /
3280 TCP(sport=12345, dport=external_port))
3281 self.pg9.add_stream(p)
3282 self.pg_enable_capture(self.pg_interfaces)
3284 capture = self.pg9.get_capture(1)
3290 self.assertEqual(ip.dst, local_host.ip4)
3291 self.assertEqual(ip.src, self.nat_addr)
3292 self.assertEqual(tcp.dport, local_port)
3293 self.assertNotEqual(tcp.sport, 12345)
3294 eh_port_in = tcp.sport
3295 self.check_tcp_checksum(p)
3296 self.check_ip_checksum(p)
3298 self.logger.error(ppp("Unexpected or invalid packet:", p))
3301 # from service back to client
3302 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3303 IP(src=local_host.ip4, dst=self.nat_addr) /
3304 TCP(sport=local_port, dport=eh_port_in))
3305 self.pg9.add_stream(p)
3306 self.pg_enable_capture(self.pg_interfaces)
3308 capture = self.pg9.get_capture(1)
3313 self.assertEqual(ip.src, self.nat_addr)
3314 self.assertEqual(ip.dst, remote_host.ip4)
3315 self.assertEqual(tcp.sport, external_port)
3316 self.assertEqual(tcp.dport, 12345)
3317 self.check_tcp_checksum(p)
3318 self.check_ip_checksum(p)
3320 self.logger.error(ppp("Unexpected or invalid packet:", p))
3323 def test_del_session(self):
3324 """ Delete NAT44 session """
3325 self.nat44_add_address(self.nat_addr)
3326 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3327 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3330 pkts = self.create_stream_in(self.pg0, self.pg1)
3331 self.pg0.add_stream(pkts)
3332 self.pg_enable_capture(self.pg_interfaces)
3334 capture = self.pg1.get_capture(len(pkts))
3336 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3337 nsessions = len(sessions)
3339 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3340 sessions[0].inside_port,
3341 sessions[0].protocol)
3342 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3343 sessions[1].outside_port,
3344 sessions[1].protocol,
3347 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3348 self.assertEqual(nsessions - len(sessions), 2)
3350 def test_set_get_reass(self):
3351 """ NAT44 set/get virtual fragmentation reassembly """
3352 reas_cfg1 = self.vapi.nat_get_reass()
3354 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3355 max_reass=reas_cfg1.ip4_max_reass * 2,
3356 max_frag=reas_cfg1.ip4_max_frag * 2)
3358 reas_cfg2 = self.vapi.nat_get_reass()
3360 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3361 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3362 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3364 self.vapi.nat_set_reass(drop_frag=1)
3365 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3367 def test_frag_in_order(self):
3368 """ NAT44 translate fragments arriving in order """
3369 self.nat44_add_address(self.nat_addr)
3370 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3371 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3374 data = "A" * 4 + "B" * 16 + "C" * 3
3375 self.tcp_port_in = random.randint(1025, 65535)
3377 reass = self.vapi.nat_reass_dump()
3378 reass_n_start = len(reass)
3381 pkts = self.create_stream_frag(self.pg0,
3382 self.pg1.remote_ip4,
3386 self.pg0.add_stream(pkts)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 frags = self.pg1.get_capture(len(pkts))
3390 p = self.reass_frags_and_verify(frags,
3392 self.pg1.remote_ip4)
3393 self.assertEqual(p[TCP].dport, 20)
3394 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3395 self.tcp_port_out = p[TCP].sport
3396 self.assertEqual(data, p[Raw].load)
3399 pkts = self.create_stream_frag(self.pg1,
3404 self.pg1.add_stream(pkts)
3405 self.pg_enable_capture(self.pg_interfaces)
3407 frags = self.pg0.get_capture(len(pkts))
3408 p = self.reass_frags_and_verify(frags,
3409 self.pg1.remote_ip4,
3410 self.pg0.remote_ip4)
3411 self.assertEqual(p[TCP].sport, 20)
3412 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3413 self.assertEqual(data, p[Raw].load)
3415 reass = self.vapi.nat_reass_dump()
3416 reass_n_end = len(reass)
3418 self.assertEqual(reass_n_end - reass_n_start, 2)
3420 def test_reass_hairpinning(self):
3421 """ NAT44 fragments hairpinning """
3422 host = self.pg0.remote_hosts[0]
3423 server = self.pg0.remote_hosts[1]
3424 host_in_port = random.randint(1025, 65535)
3426 server_in_port = random.randint(1025, 65535)
3427 server_out_port = random.randint(1025, 65535)
3428 data = "A" * 4 + "B" * 16 + "C" * 3
3430 self.nat44_add_address(self.nat_addr)
3431 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3432 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3434 # add static mapping for server
3435 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3436 server_in_port, server_out_port,
3437 proto=IP_PROTOS.tcp)
3439 # send packet from host to server
3440 pkts = self.create_stream_frag(self.pg0,
3445 self.pg0.add_stream(pkts)
3446 self.pg_enable_capture(self.pg_interfaces)
3448 frags = self.pg0.get_capture(len(pkts))
3449 p = self.reass_frags_and_verify(frags,
3452 self.assertNotEqual(p[TCP].sport, host_in_port)
3453 self.assertEqual(p[TCP].dport, server_in_port)
3454 self.assertEqual(data, p[Raw].load)
3456 def test_frag_out_of_order(self):
3457 """ NAT44 translate fragments arriving out of order """
3458 self.nat44_add_address(self.nat_addr)
3459 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3460 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3463 data = "A" * 4 + "B" * 16 + "C" * 3
3464 random.randint(1025, 65535)
3467 pkts = self.create_stream_frag(self.pg0,
3468 self.pg1.remote_ip4,
3473 self.pg0.add_stream(pkts)
3474 self.pg_enable_capture(self.pg_interfaces)
3476 frags = self.pg1.get_capture(len(pkts))
3477 p = self.reass_frags_and_verify(frags,
3479 self.pg1.remote_ip4)
3480 self.assertEqual(p[TCP].dport, 20)
3481 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3482 self.tcp_port_out = p[TCP].sport
3483 self.assertEqual(data, p[Raw].load)
3486 pkts = self.create_stream_frag(self.pg1,
3492 self.pg1.add_stream(pkts)
3493 self.pg_enable_capture(self.pg_interfaces)
3495 frags = self.pg0.get_capture(len(pkts))
3496 p = self.reass_frags_and_verify(frags,
3497 self.pg1.remote_ip4,
3498 self.pg0.remote_ip4)
3499 self.assertEqual(p[TCP].sport, 20)
3500 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3501 self.assertEqual(data, p[Raw].load)
3503 def test_port_restricted(self):
3504 """ Port restricted NAT44 (MAP-E CE) """
3505 self.nat44_add_address(self.nat_addr)
3506 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3507 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3509 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3510 "psid-offset 6 psid-len 6")
3512 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3513 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3514 TCP(sport=4567, dport=22))
3515 self.pg0.add_stream(p)
3516 self.pg_enable_capture(self.pg_interfaces)
3518 capture = self.pg1.get_capture(1)
3523 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3524 self.assertEqual(ip.src, self.nat_addr)
3525 self.assertEqual(tcp.dport, 22)
3526 self.assertNotEqual(tcp.sport, 4567)
3527 self.assertEqual((tcp.sport >> 6) & 63, 10)
3528 self.check_tcp_checksum(p)
3529 self.check_ip_checksum(p)
3531 self.logger.error(ppp("Unexpected or invalid packet:", p))
3534 def test_twice_nat(self):
3536 twice_nat_addr = '10.0.1.3'
3541 self.nat44_add_address(self.nat_addr)
3542 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3543 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3544 port_in, port_out, proto=IP_PROTOS.tcp,
3546 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3547 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3550 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3551 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3552 TCP(sport=eh_port_out, dport=port_out))
3553 self.pg1.add_stream(p)
3554 self.pg_enable_capture(self.pg_interfaces)
3556 capture = self.pg0.get_capture(1)
3561 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3562 self.assertEqual(ip.src, twice_nat_addr)
3563 self.assertEqual(tcp.dport, port_in)
3564 self.assertNotEqual(tcp.sport, eh_port_out)
3565 eh_port_in = tcp.sport
3566 self.check_tcp_checksum(p)
3567 self.check_ip_checksum(p)
3569 self.logger.error(ppp("Unexpected or invalid packet:", p))
3572 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3573 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3574 TCP(sport=port_in, dport=eh_port_in))
3575 self.pg0.add_stream(p)
3576 self.pg_enable_capture(self.pg_interfaces)
3578 capture = self.pg1.get_capture(1)
3583 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3584 self.assertEqual(ip.src, self.nat_addr)
3585 self.assertEqual(tcp.dport, eh_port_out)
3586 self.assertEqual(tcp.sport, port_out)
3587 self.check_tcp_checksum(p)
3588 self.check_ip_checksum(p)
3590 self.logger.error(ppp("Unexpected or invalid packet:", p))
3593 def test_twice_nat_lb(self):
3594 """ Twice NAT44 local service load balancing """
3595 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3596 twice_nat_addr = '10.0.1.3'
3601 server1 = self.pg0.remote_hosts[0]
3602 server2 = self.pg0.remote_hosts[1]
3604 locals = [{'addr': server1.ip4n,
3607 {'addr': server2.ip4n,
3611 self.nat44_add_address(self.nat_addr)
3612 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3614 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3618 local_num=len(locals),
3620 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3621 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3624 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3625 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3626 TCP(sport=eh_port_out, dport=external_port))
3627 self.pg1.add_stream(p)
3628 self.pg_enable_capture(self.pg_interfaces)
3630 capture = self.pg0.get_capture(1)
3636 self.assertEqual(ip.src, twice_nat_addr)
3637 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3638 if ip.dst == server1.ip4:
3642 self.assertNotEqual(tcp.sport, eh_port_out)
3643 eh_port_in = tcp.sport
3644 self.assertEqual(tcp.dport, local_port)
3645 self.check_tcp_checksum(p)
3646 self.check_ip_checksum(p)
3648 self.logger.error(ppp("Unexpected or invalid packet:", p))
3651 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3652 IP(src=server.ip4, dst=twice_nat_addr) /
3653 TCP(sport=local_port, dport=eh_port_in))
3654 self.pg0.add_stream(p)
3655 self.pg_enable_capture(self.pg_interfaces)
3657 capture = self.pg1.get_capture(1)
3662 self.assertEqual(ip.src, self.nat_addr)
3663 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3664 self.assertEqual(tcp.sport, external_port)
3665 self.assertEqual(tcp.dport, eh_port_out)
3666 self.check_tcp_checksum(p)
3667 self.check_ip_checksum(p)
3669 self.logger.error(ppp("Unexpected or invalid packet:", p))
3672 def test_twice_nat_interface_addr(self):
3673 """ Acquire twice NAT44 addresses from interface """
3674 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3676 # no address in NAT pool
3677 adresses = self.vapi.nat44_address_dump()
3678 self.assertEqual(0, len(adresses))
3680 # configure interface address and check NAT address pool
3681 self.pg7.config_ip4()
3682 adresses = self.vapi.nat44_address_dump()
3683 self.assertEqual(1, len(adresses))
3684 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3685 self.assertEqual(adresses[0].twice_nat, 1)
3687 # remove interface address and check NAT address pool
3688 self.pg7.unconfig_ip4()
3689 adresses = self.vapi.nat44_address_dump()
3690 self.assertEqual(0, len(adresses))
3692 def test_ipfix_max_frags(self):
3693 """ IPFIX logging maximum fragments pending reassembly exceeded """
3694 self.nat44_add_address(self.nat_addr)
3695 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3696 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3698 self.vapi.nat_set_reass(max_frag=0)
3699 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3700 src_address=self.pg3.local_ip4n,
3702 template_interval=10)
3703 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3704 src_port=self.ipfix_src_port)
3706 data = "A" * 4 + "B" * 16 + "C" * 3
3707 self.tcp_port_in = random.randint(1025, 65535)
3708 pkts = self.create_stream_frag(self.pg0,
3709 self.pg1.remote_ip4,
3713 self.pg0.add_stream(pkts[-1])
3714 self.pg_enable_capture(self.pg_interfaces)
3716 frags = self.pg1.get_capture(0)
3717 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3718 capture = self.pg3.get_capture(9)
3719 ipfix = IPFIXDecoder()
3720 # first load template
3722 self.assertTrue(p.haslayer(IPFIX))
3723 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3724 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3725 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3726 self.assertEqual(p[UDP].dport, 4739)
3727 self.assertEqual(p[IPFIX].observationDomainID,
3728 self.ipfix_domain_id)
3729 if p.haslayer(Template):
3730 ipfix.add_template(p.getlayer(Template))
3731 # verify events in data set
3733 if p.haslayer(Data):
3734 data = ipfix.decode_data_set(p.getlayer(Set))
3735 self.verify_ipfix_max_fragments_ip4(data, 0,
3736 self.pg0.remote_ip4n)
3739 super(TestNAT44, self).tearDown()
3740 if not self.vpp_dead:
3741 self.logger.info(self.vapi.cli("show nat44 addresses"))
3742 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3743 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3744 self.logger.info(self.vapi.cli("show nat44 interface address"))
3745 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3746 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3747 self.vapi.cli("nat addr-port-assignment-alg default")
3751 class TestNAT44Out2InDPO(MethodHolder):
3752 """ NAT44 Test Cases using out2in DPO """
3755 def setUpConstants(cls):
3756 super(TestNAT44Out2InDPO, cls).setUpConstants()
3757 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3760 def setUpClass(cls):
3761 super(TestNAT44Out2InDPO, cls).setUpClass()
3764 cls.tcp_port_in = 6303
3765 cls.tcp_port_out = 6303
3766 cls.udp_port_in = 6304
3767 cls.udp_port_out = 6304
3768 cls.icmp_id_in = 6305
3769 cls.icmp_id_out = 6305
3770 cls.nat_addr = '10.0.0.3'
3771 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3772 cls.dst_ip4 = '192.168.70.1'
3774 cls.create_pg_interfaces(range(2))
3777 cls.pg0.config_ip4()
3778 cls.pg0.resolve_arp()
3781 cls.pg1.config_ip6()
3782 cls.pg1.resolve_ndp()
3784 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3785 dst_address_length=0,
3786 next_hop_address=cls.pg1.remote_ip6n,
3787 next_hop_sw_if_index=cls.pg1.sw_if_index)
3790 super(TestNAT44Out2InDPO, cls).tearDownClass()
3793 def configure_xlat(self):
3794 self.dst_ip6_pfx = '1:2:3::'
3795 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3797 self.dst_ip6_pfx_len = 96
3798 self.src_ip6_pfx = '4:5:6::'
3799 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3801 self.src_ip6_pfx_len = 96
3802 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3803 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3804 '\x00\x00\x00\x00', 0, is_translation=1,
3807 def test_464xlat_ce(self):
3808 """ Test 464XLAT CE with NAT44 """
3810 self.configure_xlat()
3812 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3813 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3815 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3816 self.dst_ip6_pfx_len)
3817 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3818 self.src_ip6_pfx_len)
3821 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3822 self.pg0.add_stream(pkts)
3823 self.pg_enable_capture(self.pg_interfaces)
3825 capture = self.pg1.get_capture(len(pkts))
3826 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3829 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3831 self.pg1.add_stream(pkts)
3832 self.pg_enable_capture(self.pg_interfaces)
3834 capture = self.pg0.get_capture(len(pkts))
3835 self.verify_capture_in(capture, self.pg0)
3837 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3839 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3840 self.nat_addr_n, is_add=0)
3842 def test_464xlat_ce_no_nat(self):
3843 """ Test 464XLAT CE without NAT44 """
3845 self.configure_xlat()
3847 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3848 self.dst_ip6_pfx_len)
3849 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3850 self.src_ip6_pfx_len)
3852 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3853 self.pg0.add_stream(pkts)
3854 self.pg_enable_capture(self.pg_interfaces)
3856 capture = self.pg1.get_capture(len(pkts))
3857 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3858 nat_ip=out_dst_ip6, same_port=True)
3860 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3861 self.pg1.add_stream(pkts)
3862 self.pg_enable_capture(self.pg_interfaces)
3864 capture = self.pg0.get_capture(len(pkts))
3865 self.verify_capture_in(capture, self.pg0)
3868 class TestDeterministicNAT(MethodHolder):
3869 """ Deterministic NAT Test Cases """
3872 def setUpConstants(cls):
3873 super(TestDeterministicNAT, cls).setUpConstants()
3874 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3877 def setUpClass(cls):
3878 super(TestDeterministicNAT, cls).setUpClass()
3881 cls.tcp_port_in = 6303
3882 cls.tcp_external_port = 6303
3883 cls.udp_port_in = 6304
3884 cls.udp_external_port = 6304
3885 cls.icmp_id_in = 6305
3886 cls.nat_addr = '10.0.0.3'
3888 cls.create_pg_interfaces(range(3))
3889 cls.interfaces = list(cls.pg_interfaces)
3891 for i in cls.interfaces:
3896 cls.pg0.generate_remote_hosts(2)
3897 cls.pg0.configure_ipv4_neighbors()
3900 super(TestDeterministicNAT, cls).tearDownClass()
3903 def create_stream_in(self, in_if, out_if, ttl=64):
3905 Create packet stream for inside network
3907 :param in_if: Inside interface
3908 :param out_if: Outside interface
3909 :param ttl: TTL of generated packets
3913 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3914 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3915 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3919 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3920 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3921 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3925 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3926 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3927 ICMP(id=self.icmp_id_in, type='echo-request'))
3932 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3934 Create packet stream for outside network
3936 :param out_if: Outside interface
3937 :param dst_ip: Destination IP address (Default use global NAT address)
3938 :param ttl: TTL of generated packets
3941 dst_ip = self.nat_addr
3944 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3945 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3946 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3950 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3951 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3952 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3956 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3957 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3958 ICMP(id=self.icmp_external_id, type='echo-reply'))
3963 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3965 Verify captured packets on outside network
3967 :param capture: Captured packets
3968 :param nat_ip: Translated IP address (Default use global NAT address)
3969 :param same_port: Sorce port number is not translated (Default False)
3970 :param packet_num: Expected number of packets (Default 3)
3973 nat_ip = self.nat_addr
3974 self.assertEqual(packet_num, len(capture))
3975 for packet in capture:
3977 self.assertEqual(packet[IP].src, nat_ip)
3978 if packet.haslayer(TCP):
3979 self.tcp_port_out = packet[TCP].sport
3980 elif packet.haslayer(UDP):
3981 self.udp_port_out = packet[UDP].sport
3983 self.icmp_external_id = packet[ICMP].id
3985 self.logger.error(ppp("Unexpected or invalid packet "
3986 "(outside network):", packet))
3989 def initiate_tcp_session(self, in_if, out_if):
3991 Initiates TCP session
3993 :param in_if: Inside interface
3994 :param out_if: Outside interface
3997 # SYN packet in->out
3998 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3999 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4000 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4003 self.pg_enable_capture(self.pg_interfaces)
4005 capture = out_if.get_capture(1)
4007 self.tcp_port_out = p[TCP].sport
4009 # SYN + ACK packet out->in
4010 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4011 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4012 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4014 out_if.add_stream(p)
4015 self.pg_enable_capture(self.pg_interfaces)
4017 in_if.get_capture(1)
4019 # ACK packet in->out
4020 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4021 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4022 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4025 self.pg_enable_capture(self.pg_interfaces)
4027 out_if.get_capture(1)
4030 self.logger.error("TCP 3 way handshake failed")
4033 def verify_ipfix_max_entries_per_user(self, data):
4035 Verify IPFIX maximum entries per user exceeded event
4037 :param data: Decoded IPFIX data records
4039 self.assertEqual(1, len(data))
4042 self.assertEqual(ord(record[230]), 13)
4043 # natQuotaExceededEvent
4044 self.assertEqual('\x03\x00\x00\x00', record[466])
4046 self.assertEqual('\xe8\x03\x00\x00', record[473])
4048 self.assertEqual(self.pg0.remote_ip4n, record[8])
4050 def test_deterministic_mode(self):
4051 """ NAT plugin run deterministic mode """
4052 in_addr = '172.16.255.0'
4053 out_addr = '172.17.255.50'
4054 in_addr_t = '172.16.255.20'
4055 in_addr_n = socket.inet_aton(in_addr)
4056 out_addr_n = socket.inet_aton(out_addr)
4057 in_addr_t_n = socket.inet_aton(in_addr_t)
4061 nat_config = self.vapi.nat_show_config()
4062 self.assertEqual(1, nat_config.deterministic)
4064 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4066 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4067 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4068 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4069 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4071 deterministic_mappings = self.vapi.nat_det_map_dump()
4072 self.assertEqual(len(deterministic_mappings), 1)
4073 dsm = deterministic_mappings[0]
4074 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4075 self.assertEqual(in_plen, dsm.in_plen)
4076 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4077 self.assertEqual(out_plen, dsm.out_plen)
4079 self.clear_nat_det()
4080 deterministic_mappings = self.vapi.nat_det_map_dump()
4081 self.assertEqual(len(deterministic_mappings), 0)
4083 def test_set_timeouts(self):
4084 """ Set deterministic NAT timeouts """
4085 timeouts_before = self.vapi.nat_det_get_timeouts()
4087 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4088 timeouts_before.tcp_established + 10,
4089 timeouts_before.tcp_transitory + 10,
4090 timeouts_before.icmp + 10)
4092 timeouts_after = self.vapi.nat_det_get_timeouts()
4094 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4095 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4096 self.assertNotEqual(timeouts_before.tcp_established,
4097 timeouts_after.tcp_established)
4098 self.assertNotEqual(timeouts_before.tcp_transitory,
4099 timeouts_after.tcp_transitory)
4101 def test_det_in(self):
4102 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4104 nat_ip = "10.0.0.10"
4106 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4108 socket.inet_aton(nat_ip),
4110 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4111 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4115 pkts = self.create_stream_in(self.pg0, self.pg1)
4116 self.pg0.add_stream(pkts)
4117 self.pg_enable_capture(self.pg_interfaces)
4119 capture = self.pg1.get_capture(len(pkts))
4120 self.verify_capture_out(capture, nat_ip)
4123 pkts = self.create_stream_out(self.pg1, nat_ip)
4124 self.pg1.add_stream(pkts)
4125 self.pg_enable_capture(self.pg_interfaces)
4127 capture = self.pg0.get_capture(len(pkts))
4128 self.verify_capture_in(capture, self.pg0)
4131 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4132 self.assertEqual(len(sessions), 3)
4136 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4137 self.assertEqual(s.in_port, self.tcp_port_in)
4138 self.assertEqual(s.out_port, self.tcp_port_out)
4139 self.assertEqual(s.ext_port, self.tcp_external_port)
4143 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4144 self.assertEqual(s.in_port, self.udp_port_in)
4145 self.assertEqual(s.out_port, self.udp_port_out)
4146 self.assertEqual(s.ext_port, self.udp_external_port)
4150 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4151 self.assertEqual(s.in_port, self.icmp_id_in)
4152 self.assertEqual(s.out_port, self.icmp_external_id)
4154 def test_multiple_users(self):
4155 """ Deterministic NAT multiple users """
4157 nat_ip = "10.0.0.10"
4159 external_port = 6303
4161 host0 = self.pg0.remote_hosts[0]
4162 host1 = self.pg0.remote_hosts[1]
4164 self.vapi.nat_det_add_del_map(host0.ip4n,
4166 socket.inet_aton(nat_ip),
4168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4173 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4174 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4175 TCP(sport=port_in, dport=external_port))
4176 self.pg0.add_stream(p)
4177 self.pg_enable_capture(self.pg_interfaces)
4179 capture = self.pg1.get_capture(1)
4184 self.assertEqual(ip.src, nat_ip)
4185 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4186 self.assertEqual(tcp.dport, external_port)
4187 port_out0 = tcp.sport
4189 self.logger.error(ppp("Unexpected or invalid packet:", p))
4193 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4194 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4195 TCP(sport=port_in, dport=external_port))
4196 self.pg0.add_stream(p)
4197 self.pg_enable_capture(self.pg_interfaces)
4199 capture = self.pg1.get_capture(1)
4204 self.assertEqual(ip.src, nat_ip)
4205 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4206 self.assertEqual(tcp.dport, external_port)
4207 port_out1 = tcp.sport
4209 self.logger.error(ppp("Unexpected or invalid packet:", p))
4212 dms = self.vapi.nat_det_map_dump()
4213 self.assertEqual(1, len(dms))
4214 self.assertEqual(2, dms[0].ses_num)
4217 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4218 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4219 TCP(sport=external_port, dport=port_out0))
4220 self.pg1.add_stream(p)
4221 self.pg_enable_capture(self.pg_interfaces)
4223 capture = self.pg0.get_capture(1)
4228 self.assertEqual(ip.src, self.pg1.remote_ip4)
4229 self.assertEqual(ip.dst, host0.ip4)
4230 self.assertEqual(tcp.dport, port_in)
4231 self.assertEqual(tcp.sport, external_port)
4233 self.logger.error(ppp("Unexpected or invalid packet:", p))
4237 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4238 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4239 TCP(sport=external_port, dport=port_out1))
4240 self.pg1.add_stream(p)
4241 self.pg_enable_capture(self.pg_interfaces)
4243 capture = self.pg0.get_capture(1)
4248 self.assertEqual(ip.src, self.pg1.remote_ip4)
4249 self.assertEqual(ip.dst, host1.ip4)
4250 self.assertEqual(tcp.dport, port_in)
4251 self.assertEqual(tcp.sport, external_port)
4253 self.logger.error(ppp("Unexpected or invalid packet", p))
4256 # session close api test
4257 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4259 self.pg1.remote_ip4n,
4261 dms = self.vapi.nat_det_map_dump()
4262 self.assertEqual(dms[0].ses_num, 1)
4264 self.vapi.nat_det_close_session_in(host0.ip4n,
4266 self.pg1.remote_ip4n,
4268 dms = self.vapi.nat_det_map_dump()
4269 self.assertEqual(dms[0].ses_num, 0)
4271 def test_tcp_session_close_detection_in(self):
4272 """ Deterministic NAT TCP session close from inside network """
4273 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4275 socket.inet_aton(self.nat_addr),
4277 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4278 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4281 self.initiate_tcp_session(self.pg0, self.pg1)
4283 # close the session from inside
4285 # FIN packet in -> out
4286 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4287 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4288 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4290 self.pg0.add_stream(p)
4291 self.pg_enable_capture(self.pg_interfaces)
4293 self.pg1.get_capture(1)
4297 # ACK packet out -> in
4298 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4299 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4300 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4304 # FIN packet out -> in
4305 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4306 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4307 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4311 self.pg1.add_stream(pkts)
4312 self.pg_enable_capture(self.pg_interfaces)
4314 self.pg0.get_capture(2)
4316 # ACK packet in -> out
4317 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4318 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4319 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4321 self.pg0.add_stream(p)
4322 self.pg_enable_capture(self.pg_interfaces)
4324 self.pg1.get_capture(1)
4326 # Check if deterministic NAT44 closed the session
4327 dms = self.vapi.nat_det_map_dump()
4328 self.assertEqual(0, dms[0].ses_num)
4330 self.logger.error("TCP session termination failed")
4333 def test_tcp_session_close_detection_out(self):
4334 """ Deterministic NAT TCP session close from outside network """
4335 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4337 socket.inet_aton(self.nat_addr),
4339 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4340 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4343 self.initiate_tcp_session(self.pg0, self.pg1)
4345 # close the session from outside
4347 # FIN packet out -> in
4348 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4349 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4350 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4352 self.pg1.add_stream(p)
4353 self.pg_enable_capture(self.pg_interfaces)
4355 self.pg0.get_capture(1)
4359 # ACK packet in -> out
4360 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4361 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4362 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4366 # ACK packet in -> out
4367 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4368 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4369 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4373 self.pg0.add_stream(pkts)
4374 self.pg_enable_capture(self.pg_interfaces)
4376 self.pg1.get_capture(2)
4378 # ACK packet out -> in
4379 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4380 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4381 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4383 self.pg1.add_stream(p)
4384 self.pg_enable_capture(self.pg_interfaces)
4386 self.pg0.get_capture(1)
4388 # Check if deterministic NAT44 closed the session
4389 dms = self.vapi.nat_det_map_dump()
4390 self.assertEqual(0, dms[0].ses_num)
4392 self.logger.error("TCP session termination failed")
4395 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4396 def test_session_timeout(self):
4397 """ Deterministic NAT session timeouts """
4398 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4400 socket.inet_aton(self.nat_addr),
4402 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4403 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4406 self.initiate_tcp_session(self.pg0, self.pg1)
4407 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4408 pkts = self.create_stream_in(self.pg0, self.pg1)
4409 self.pg0.add_stream(pkts)
4410 self.pg_enable_capture(self.pg_interfaces)
4412 capture = self.pg1.get_capture(len(pkts))
4415 dms = self.vapi.nat_det_map_dump()
4416 self.assertEqual(0, dms[0].ses_num)
4418 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4419 def test_session_limit_per_user(self):
4420 """ Deterministic NAT maximum sessions per user limit """
4421 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4423 socket.inet_aton(self.nat_addr),
4425 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4426 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4428 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4429 src_address=self.pg2.local_ip4n,
4431 template_interval=10)
4432 self.vapi.nat_ipfix()
4435 for port in range(1025, 2025):
4436 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4437 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4438 UDP(sport=port, dport=port))
4441 self.pg0.add_stream(pkts)
4442 self.pg_enable_capture(self.pg_interfaces)
4444 capture = self.pg1.get_capture(len(pkts))
4446 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4447 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4448 UDP(sport=3001, dport=3002))
4449 self.pg0.add_stream(p)
4450 self.pg_enable_capture(self.pg_interfaces)
4452 capture = self.pg1.assert_nothing_captured()
4454 # verify ICMP error packet
4455 capture = self.pg0.get_capture(1)
4457 self.assertTrue(p.haslayer(ICMP))
4459 self.assertEqual(icmp.type, 3)
4460 self.assertEqual(icmp.code, 1)
4461 self.assertTrue(icmp.haslayer(IPerror))
4462 inner_ip = icmp[IPerror]
4463 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4464 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4466 dms = self.vapi.nat_det_map_dump()
4468 self.assertEqual(1000, dms[0].ses_num)
4470 # verify IPFIX logging
4471 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4473 capture = self.pg2.get_capture(2)
4474 ipfix = IPFIXDecoder()
4475 # first load template
4477 self.assertTrue(p.haslayer(IPFIX))
4478 if p.haslayer(Template):
4479 ipfix.add_template(p.getlayer(Template))
4480 # verify events in data set
4482 if p.haslayer(Data):
4483 data = ipfix.decode_data_set(p.getlayer(Set))
4484 self.verify_ipfix_max_entries_per_user(data)
4486 def clear_nat_det(self):
4488 Clear deterministic NAT configuration.
4490 self.vapi.nat_ipfix(enable=0)
4491 self.vapi.nat_det_set_timeouts()
4492 deterministic_mappings = self.vapi.nat_det_map_dump()
4493 for dsm in deterministic_mappings:
4494 self.vapi.nat_det_add_del_map(dsm.in_addr,
4500 interfaces = self.vapi.nat44_interface_dump()
4501 for intf in interfaces:
4502 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4507 super(TestDeterministicNAT, self).tearDown()
4508 if not self.vpp_dead:
4509 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4511 self.vapi.cli("show nat44 deterministic mappings"))
4513 self.vapi.cli("show nat44 deterministic timeouts"))
4515 self.vapi.cli("show nat44 deterministic sessions"))
4516 self.clear_nat_det()
4519 class TestNAT64(MethodHolder):
4520 """ NAT64 Test Cases """
4523 def setUpConstants(cls):
4524 super(TestNAT64, cls).setUpConstants()
4525 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4526 "nat64 st hash buckets 256", "}"])
4529 def setUpClass(cls):
4530 super(TestNAT64, cls).setUpClass()
4533 cls.tcp_port_in = 6303
4534 cls.tcp_port_out = 6303
4535 cls.udp_port_in = 6304
4536 cls.udp_port_out = 6304
4537 cls.icmp_id_in = 6305
4538 cls.icmp_id_out = 6305
4539 cls.nat_addr = '10.0.0.3'
4540 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4542 cls.vrf1_nat_addr = '10.0.10.3'
4543 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4545 cls.ipfix_src_port = 4739
4546 cls.ipfix_domain_id = 1
4548 cls.create_pg_interfaces(range(5))
4549 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4550 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4551 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4553 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4555 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4557 cls.pg0.generate_remote_hosts(2)
4559 for i in cls.ip6_interfaces:
4562 i.configure_ipv6_neighbors()
4564 for i in cls.ip4_interfaces:
4570 cls.pg3.config_ip4()
4571 cls.pg3.resolve_arp()
4572 cls.pg3.config_ip6()
4573 cls.pg3.configure_ipv6_neighbors()
4576 super(TestNAT64, cls).tearDownClass()
4579 def test_pool(self):
4580 """ Add/delete address to NAT64 pool """
4581 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4583 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4585 addresses = self.vapi.nat64_pool_addr_dump()
4586 self.assertEqual(len(addresses), 1)
4587 self.assertEqual(addresses[0].address, nat_addr)
4589 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4591 addresses = self.vapi.nat64_pool_addr_dump()
4592 self.assertEqual(len(addresses), 0)
4594 def test_interface(self):
4595 """ Enable/disable NAT64 feature on the interface """
4596 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4597 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4599 interfaces = self.vapi.nat64_interface_dump()
4600 self.assertEqual(len(interfaces), 2)
4603 for intf in interfaces:
4604 if intf.sw_if_index == self.pg0.sw_if_index:
4605 self.assertEqual(intf.is_inside, 1)
4607 elif intf.sw_if_index == self.pg1.sw_if_index:
4608 self.assertEqual(intf.is_inside, 0)
4610 self.assertTrue(pg0_found)
4611 self.assertTrue(pg1_found)
4613 features = self.vapi.cli("show interface features pg0")
4614 self.assertNotEqual(features.find('nat64-in2out'), -1)
4615 features = self.vapi.cli("show interface features pg1")
4616 self.assertNotEqual(features.find('nat64-out2in'), -1)
4618 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4619 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4621 interfaces = self.vapi.nat64_interface_dump()
4622 self.assertEqual(len(interfaces), 0)
4624 def test_static_bib(self):
4625 """ Add/delete static BIB entry """
4626 in_addr = socket.inet_pton(socket.AF_INET6,
4627 '2001:db8:85a3::8a2e:370:7334')
4628 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4631 proto = IP_PROTOS.tcp
4633 self.vapi.nat64_add_del_static_bib(in_addr,
4638 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4643 self.assertEqual(bibe.i_addr, in_addr)
4644 self.assertEqual(bibe.o_addr, out_addr)
4645 self.assertEqual(bibe.i_port, in_port)
4646 self.assertEqual(bibe.o_port, out_port)
4647 self.assertEqual(static_bib_num, 1)
4649 self.vapi.nat64_add_del_static_bib(in_addr,
4655 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4660 self.assertEqual(static_bib_num, 0)
4662 def test_set_timeouts(self):
4663 """ Set NAT64 timeouts """
4664 # verify default values
4665 timeouts = self.vapi.nat64_get_timeouts()
4666 self.assertEqual(timeouts.udp, 300)
4667 self.assertEqual(timeouts.icmp, 60)
4668 self.assertEqual(timeouts.tcp_trans, 240)
4669 self.assertEqual(timeouts.tcp_est, 7440)
4670 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4672 # set and verify custom values
4673 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4674 tcp_est=7450, tcp_incoming_syn=10)
4675 timeouts = self.vapi.nat64_get_timeouts()
4676 self.assertEqual(timeouts.udp, 200)
4677 self.assertEqual(timeouts.icmp, 30)
4678 self.assertEqual(timeouts.tcp_trans, 250)
4679 self.assertEqual(timeouts.tcp_est, 7450)
4680 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4682 def test_dynamic(self):
4683 """ NAT64 dynamic translation test """
4684 self.tcp_port_in = 6303
4685 self.udp_port_in = 6304
4686 self.icmp_id_in = 6305
4688 ses_num_start = self.nat64_get_ses_num()
4690 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4692 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4693 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4696 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4697 self.pg0.add_stream(pkts)
4698 self.pg_enable_capture(self.pg_interfaces)
4700 capture = self.pg1.get_capture(len(pkts))
4701 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4702 dst_ip=self.pg1.remote_ip4)
4705 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4706 self.pg1.add_stream(pkts)
4707 self.pg_enable_capture(self.pg_interfaces)
4709 capture = self.pg0.get_capture(len(pkts))
4710 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4711 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4714 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4715 self.pg0.add_stream(pkts)
4716 self.pg_enable_capture(self.pg_interfaces)
4718 capture = self.pg1.get_capture(len(pkts))
4719 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4720 dst_ip=self.pg1.remote_ip4)
4723 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4724 self.pg1.add_stream(pkts)
4725 self.pg_enable_capture(self.pg_interfaces)
4727 capture = self.pg0.get_capture(len(pkts))
4728 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4730 ses_num_end = self.nat64_get_ses_num()
4732 self.assertEqual(ses_num_end - ses_num_start, 3)
4734 # tenant with specific VRF
4735 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4736 self.vrf1_nat_addr_n,
4737 vrf_id=self.vrf1_id)
4738 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4740 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4741 self.pg2.add_stream(pkts)
4742 self.pg_enable_capture(self.pg_interfaces)
4744 capture = self.pg1.get_capture(len(pkts))
4745 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4746 dst_ip=self.pg1.remote_ip4)
4748 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4749 self.pg1.add_stream(pkts)
4750 self.pg_enable_capture(self.pg_interfaces)
4752 capture = self.pg2.get_capture(len(pkts))
4753 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4755 def test_static(self):
4756 """ NAT64 static translation test """
4757 self.tcp_port_in = 60303
4758 self.udp_port_in = 60304
4759 self.icmp_id_in = 60305
4760 self.tcp_port_out = 60303
4761 self.udp_port_out = 60304
4762 self.icmp_id_out = 60305
4764 ses_num_start = self.nat64_get_ses_num()
4766 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4768 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4769 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4771 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4776 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4781 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4788 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4789 self.pg0.add_stream(pkts)
4790 self.pg_enable_capture(self.pg_interfaces)
4792 capture = self.pg1.get_capture(len(pkts))
4793 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4794 dst_ip=self.pg1.remote_ip4, same_port=True)
4797 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4798 self.pg1.add_stream(pkts)
4799 self.pg_enable_capture(self.pg_interfaces)
4801 capture = self.pg0.get_capture(len(pkts))
4802 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4803 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4805 ses_num_end = self.nat64_get_ses_num()
4807 self.assertEqual(ses_num_end - ses_num_start, 3)
4809 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4810 def test_session_timeout(self):
4811 """ NAT64 session timeout """
4812 self.icmp_id_in = 1234
4813 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4815 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4816 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4817 self.vapi.nat64_set_timeouts(icmp=5)
4819 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4820 self.pg0.add_stream(pkts)
4821 self.pg_enable_capture(self.pg_interfaces)
4823 capture = self.pg1.get_capture(len(pkts))
4825 ses_num_before_timeout = self.nat64_get_ses_num()
4829 # ICMP session after timeout
4830 ses_num_after_timeout = self.nat64_get_ses_num()
4831 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4833 def test_icmp_error(self):
4834 """ NAT64 ICMP Error message translation """
4835 self.tcp_port_in = 6303
4836 self.udp_port_in = 6304
4837 self.icmp_id_in = 6305
4839 ses_num_start = self.nat64_get_ses_num()
4841 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4843 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4844 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4846 # send some packets to create sessions
4847 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4848 self.pg0.add_stream(pkts)
4849 self.pg_enable_capture(self.pg_interfaces)
4851 capture_ip4 = self.pg1.get_capture(len(pkts))
4852 self.verify_capture_out(capture_ip4,
4853 nat_ip=self.nat_addr,
4854 dst_ip=self.pg1.remote_ip4)
4856 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4857 self.pg1.add_stream(pkts)
4858 self.pg_enable_capture(self.pg_interfaces)
4860 capture_ip6 = self.pg0.get_capture(len(pkts))
4861 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4862 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4863 self.pg0.remote_ip6)
4866 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4867 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4868 ICMPv6DestUnreach(code=1) /
4869 packet[IPv6] for packet in capture_ip6]
4870 self.pg0.add_stream(pkts)
4871 self.pg_enable_capture(self.pg_interfaces)
4873 capture = self.pg1.get_capture(len(pkts))
4874 for packet in capture:
4876 self.assertEqual(packet[IP].src, self.nat_addr)
4877 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4878 self.assertEqual(packet[ICMP].type, 3)
4879 self.assertEqual(packet[ICMP].code, 13)
4880 inner = packet[IPerror]
4881 self.assertEqual(inner.src, self.pg1.remote_ip4)
4882 self.assertEqual(inner.dst, self.nat_addr)
4883 self.check_icmp_checksum(packet)
4884 if inner.haslayer(TCPerror):
4885 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4886 elif inner.haslayer(UDPerror):
4887 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4889 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4891 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4895 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4896 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4897 ICMP(type=3, code=13) /
4898 packet[IP] for packet in capture_ip4]
4899 self.pg1.add_stream(pkts)
4900 self.pg_enable_capture(self.pg_interfaces)
4902 capture = self.pg0.get_capture(len(pkts))
4903 for packet in capture:
4905 self.assertEqual(packet[IPv6].src, ip.src)
4906 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4907 icmp = packet[ICMPv6DestUnreach]
4908 self.assertEqual(icmp.code, 1)
4909 inner = icmp[IPerror6]
4910 self.assertEqual(inner.src, self.pg0.remote_ip6)
4911 self.assertEqual(inner.dst, ip.src)
4912 self.check_icmpv6_checksum(packet)
4913 if inner.haslayer(TCPerror):
4914 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4915 elif inner.haslayer(UDPerror):
4916 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4918 self.assertEqual(inner[ICMPv6EchoRequest].id,
4921 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4924 def test_hairpinning(self):
4925 """ NAT64 hairpinning """
4927 client = self.pg0.remote_hosts[0]
4928 server = self.pg0.remote_hosts[1]
4929 server_tcp_in_port = 22
4930 server_tcp_out_port = 4022
4931 server_udp_in_port = 23
4932 server_udp_out_port = 4023
4933 client_tcp_in_port = 1234
4934 client_udp_in_port = 1235
4935 client_tcp_out_port = 0
4936 client_udp_out_port = 0
4937 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4938 nat_addr_ip6 = ip.src
4940 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4942 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4943 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4945 self.vapi.nat64_add_del_static_bib(server.ip6n,
4948 server_tcp_out_port,
4950 self.vapi.nat64_add_del_static_bib(server.ip6n,
4953 server_udp_out_port,
4958 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4959 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4960 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4962 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4963 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4964 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4966 self.pg0.add_stream(pkts)
4967 self.pg_enable_capture(self.pg_interfaces)
4969 capture = self.pg0.get_capture(len(pkts))
4970 for packet in capture:
4972 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4973 self.assertEqual(packet[IPv6].dst, server.ip6)
4974 if packet.haslayer(TCP):
4975 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4976 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4977 self.check_tcp_checksum(packet)
4978 client_tcp_out_port = packet[TCP].sport
4980 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4981 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4982 self.check_udp_checksum(packet)
4983 client_udp_out_port = packet[UDP].sport
4985 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4990 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4991 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4992 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4994 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4996 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4998 self.pg0.add_stream(pkts)
4999 self.pg_enable_capture(self.pg_interfaces)
5001 capture = self.pg0.get_capture(len(pkts))
5002 for packet in capture:
5004 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5005 self.assertEqual(packet[IPv6].dst, client.ip6)
5006 if packet.haslayer(TCP):
5007 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5008 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5009 self.check_tcp_checksum(packet)
5011 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5012 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5013 self.check_udp_checksum(packet)
5015 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5020 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5021 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5022 ICMPv6DestUnreach(code=1) /
5023 packet[IPv6] for packet in capture]
5024 self.pg0.add_stream(pkts)
5025 self.pg_enable_capture(self.pg_interfaces)
5027 capture = self.pg0.get_capture(len(pkts))
5028 for packet in capture:
5030 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5031 self.assertEqual(packet[IPv6].dst, server.ip6)
5032 icmp = packet[ICMPv6DestUnreach]
5033 self.assertEqual(icmp.code, 1)
5034 inner = icmp[IPerror6]
5035 self.assertEqual(inner.src, server.ip6)
5036 self.assertEqual(inner.dst, nat_addr_ip6)
5037 self.check_icmpv6_checksum(packet)
5038 if inner.haslayer(TCPerror):
5039 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5040 self.assertEqual(inner[TCPerror].dport,
5041 client_tcp_out_port)
5043 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5044 self.assertEqual(inner[UDPerror].dport,
5045 client_udp_out_port)
5047 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5050 def test_prefix(self):
5051 """ NAT64 Network-Specific Prefix """
5053 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5055 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5056 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5057 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5058 self.vrf1_nat_addr_n,
5059 vrf_id=self.vrf1_id)
5060 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5063 global_pref64 = "2001:db8::"
5064 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5065 global_pref64_len = 32
5066 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5068 prefix = self.vapi.nat64_prefix_dump()
5069 self.assertEqual(len(prefix), 1)
5070 self.assertEqual(prefix[0].prefix, global_pref64_n)
5071 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5072 self.assertEqual(prefix[0].vrf_id, 0)
5074 # Add tenant specific prefix
5075 vrf1_pref64 = "2001:db8:122:300::"
5076 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5077 vrf1_pref64_len = 56
5078 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5080 vrf_id=self.vrf1_id)
5081 prefix = self.vapi.nat64_prefix_dump()
5082 self.assertEqual(len(prefix), 2)
5085 pkts = self.create_stream_in_ip6(self.pg0,
5088 plen=global_pref64_len)
5089 self.pg0.add_stream(pkts)
5090 self.pg_enable_capture(self.pg_interfaces)
5092 capture = self.pg1.get_capture(len(pkts))
5093 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5094 dst_ip=self.pg1.remote_ip4)
5096 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5097 self.pg1.add_stream(pkts)
5098 self.pg_enable_capture(self.pg_interfaces)
5100 capture = self.pg0.get_capture(len(pkts))
5101 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5104 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5106 # Tenant specific prefix
5107 pkts = self.create_stream_in_ip6(self.pg2,
5110 plen=vrf1_pref64_len)
5111 self.pg2.add_stream(pkts)
5112 self.pg_enable_capture(self.pg_interfaces)
5114 capture = self.pg1.get_capture(len(pkts))
5115 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5116 dst_ip=self.pg1.remote_ip4)
5118 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5119 self.pg1.add_stream(pkts)
5120 self.pg_enable_capture(self.pg_interfaces)
5122 capture = self.pg2.get_capture(len(pkts))
5123 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5126 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5128 def test_unknown_proto(self):
5129 """ NAT64 translate packet with unknown protocol """
5131 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5133 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5134 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5135 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5138 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5139 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5140 TCP(sport=self.tcp_port_in, dport=20))
5141 self.pg0.add_stream(p)
5142 self.pg_enable_capture(self.pg_interfaces)
5144 p = self.pg1.get_capture(1)
5146 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5147 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5149 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5150 TCP(sport=1234, dport=1234))
5151 self.pg0.add_stream(p)
5152 self.pg_enable_capture(self.pg_interfaces)
5154 p = self.pg1.get_capture(1)
5157 self.assertEqual(packet[IP].src, self.nat_addr)
5158 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5159 self.assertTrue(packet.haslayer(GRE))
5160 self.check_ip_checksum(packet)
5162 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5166 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5167 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5169 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5170 TCP(sport=1234, dport=1234))
5171 self.pg1.add_stream(p)
5172 self.pg_enable_capture(self.pg_interfaces)
5174 p = self.pg0.get_capture(1)
5177 self.assertEqual(packet[IPv6].src, remote_ip6)
5178 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5179 self.assertEqual(packet[IPv6].nh, 47)
5181 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5184 def test_hairpinning_unknown_proto(self):
5185 """ NAT64 translate packet with unknown protocol - hairpinning """
5187 client = self.pg0.remote_hosts[0]
5188 server = self.pg0.remote_hosts[1]
5189 server_tcp_in_port = 22
5190 server_tcp_out_port = 4022
5191 client_tcp_in_port = 1234
5192 client_tcp_out_port = 1235
5193 server_nat_ip = "10.0.0.100"
5194 client_nat_ip = "10.0.0.110"
5195 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5196 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5197 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5198 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5200 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5202 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5203 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5205 self.vapi.nat64_add_del_static_bib(server.ip6n,
5208 server_tcp_out_port,
5211 self.vapi.nat64_add_del_static_bib(server.ip6n,
5217 self.vapi.nat64_add_del_static_bib(client.ip6n,
5220 client_tcp_out_port,
5224 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5225 IPv6(src=client.ip6, dst=server_nat_ip6) /
5226 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5227 self.pg0.add_stream(p)
5228 self.pg_enable_capture(self.pg_interfaces)
5230 p = self.pg0.get_capture(1)
5232 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5233 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5235 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5236 TCP(sport=1234, dport=1234))
5237 self.pg0.add_stream(p)
5238 self.pg_enable_capture(self.pg_interfaces)
5240 p = self.pg0.get_capture(1)
5243 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5244 self.assertEqual(packet[IPv6].dst, server.ip6)
5245 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5247 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5251 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5252 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5254 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5255 TCP(sport=1234, dport=1234))
5256 self.pg0.add_stream(p)
5257 self.pg_enable_capture(self.pg_interfaces)
5259 p = self.pg0.get_capture(1)
5262 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5263 self.assertEqual(packet[IPv6].dst, client.ip6)
5264 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5266 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5269 def test_one_armed_nat64(self):
5270 """ One armed NAT64 """
5272 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5276 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5278 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5279 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5282 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5283 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5284 TCP(sport=12345, dport=80))
5285 self.pg3.add_stream(p)
5286 self.pg_enable_capture(self.pg_interfaces)
5288 capture = self.pg3.get_capture(1)
5293 self.assertEqual(ip.src, self.nat_addr)
5294 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5295 self.assertNotEqual(tcp.sport, 12345)
5296 external_port = tcp.sport
5297 self.assertEqual(tcp.dport, 80)
5298 self.check_tcp_checksum(p)
5299 self.check_ip_checksum(p)
5301 self.logger.error(ppp("Unexpected or invalid packet:", p))
5305 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5306 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5307 TCP(sport=80, dport=external_port))
5308 self.pg3.add_stream(p)
5309 self.pg_enable_capture(self.pg_interfaces)
5311 capture = self.pg3.get_capture(1)
5316 self.assertEqual(ip.src, remote_host_ip6)
5317 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5318 self.assertEqual(tcp.sport, 80)
5319 self.assertEqual(tcp.dport, 12345)
5320 self.check_tcp_checksum(p)
5322 self.logger.error(ppp("Unexpected or invalid packet:", p))
5325 def test_frag_in_order(self):
5326 """ NAT64 translate fragments arriving in order """
5327 self.tcp_port_in = random.randint(1025, 65535)
5329 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5331 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5332 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5334 reass = self.vapi.nat_reass_dump()
5335 reass_n_start = len(reass)
5339 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5340 self.tcp_port_in, 20, data)
5341 self.pg0.add_stream(pkts)
5342 self.pg_enable_capture(self.pg_interfaces)
5344 frags = self.pg1.get_capture(len(pkts))
5345 p = self.reass_frags_and_verify(frags,
5347 self.pg1.remote_ip4)
5348 self.assertEqual(p[TCP].dport, 20)
5349 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5350 self.tcp_port_out = p[TCP].sport
5351 self.assertEqual(data, p[Raw].load)
5354 data = "A" * 4 + "b" * 16 + "C" * 3
5355 pkts = self.create_stream_frag(self.pg1,
5360 self.pg1.add_stream(pkts)
5361 self.pg_enable_capture(self.pg_interfaces)
5363 frags = self.pg0.get_capture(len(pkts))
5364 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5365 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5366 self.assertEqual(p[TCP].sport, 20)
5367 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5368 self.assertEqual(data, p[Raw].load)
5370 reass = self.vapi.nat_reass_dump()
5371 reass_n_end = len(reass)
5373 self.assertEqual(reass_n_end - reass_n_start, 2)
5375 def test_reass_hairpinning(self):
5376 """ NAT64 fragments hairpinning """
5378 client = self.pg0.remote_hosts[0]
5379 server = self.pg0.remote_hosts[1]
5380 server_in_port = random.randint(1025, 65535)
5381 server_out_port = random.randint(1025, 65535)
5382 client_in_port = random.randint(1025, 65535)
5383 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5384 nat_addr_ip6 = ip.src
5386 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5388 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5389 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5391 # add static BIB entry for server
5392 self.vapi.nat64_add_del_static_bib(server.ip6n,
5398 # send packet from host to server
5399 pkts = self.create_stream_frag_ip6(self.pg0,
5404 self.pg0.add_stream(pkts)
5405 self.pg_enable_capture(self.pg_interfaces)
5407 frags = self.pg0.get_capture(len(pkts))
5408 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5409 self.assertNotEqual(p[TCP].sport, client_in_port)
5410 self.assertEqual(p[TCP].dport, server_in_port)
5411 self.assertEqual(data, p[Raw].load)
5413 def test_frag_out_of_order(self):
5414 """ NAT64 translate fragments arriving out of order """
5415 self.tcp_port_in = random.randint(1025, 65535)
5417 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5419 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5420 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5424 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5425 self.tcp_port_in, 20, data)
5427 self.pg0.add_stream(pkts)
5428 self.pg_enable_capture(self.pg_interfaces)
5430 frags = self.pg1.get_capture(len(pkts))
5431 p = self.reass_frags_and_verify(frags,
5433 self.pg1.remote_ip4)
5434 self.assertEqual(p[TCP].dport, 20)
5435 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5436 self.tcp_port_out = p[TCP].sport
5437 self.assertEqual(data, p[Raw].load)
5440 data = "A" * 4 + "B" * 16 + "C" * 3
5441 pkts = self.create_stream_frag(self.pg1,
5447 self.pg1.add_stream(pkts)
5448 self.pg_enable_capture(self.pg_interfaces)
5450 frags = self.pg0.get_capture(len(pkts))
5451 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5452 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5453 self.assertEqual(p[TCP].sport, 20)
5454 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5455 self.assertEqual(data, p[Raw].load)
5457 def test_interface_addr(self):
5458 """ Acquire NAT64 pool addresses from interface """
5459 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5461 # no address in NAT64 pool
5462 adresses = self.vapi.nat44_address_dump()
5463 self.assertEqual(0, len(adresses))
5465 # configure interface address and check NAT64 address pool
5466 self.pg4.config_ip4()
5467 addresses = self.vapi.nat64_pool_addr_dump()
5468 self.assertEqual(len(addresses), 1)
5469 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5471 # remove interface address and check NAT64 address pool
5472 self.pg4.unconfig_ip4()
5473 addresses = self.vapi.nat64_pool_addr_dump()
5474 self.assertEqual(0, len(adresses))
5476 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5477 def test_ipfix_max_bibs_sessions(self):
5478 """ IPFIX logging maximum session and BIB entries exceeded """
5481 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5485 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5487 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5488 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5492 for i in range(0, max_bibs):
5493 src = "fd01:aa::%x" % (i)
5494 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5495 IPv6(src=src, dst=remote_host_ip6) /
5496 TCP(sport=12345, dport=80))
5498 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5499 IPv6(src=src, dst=remote_host_ip6) /
5500 TCP(sport=12345, dport=22))
5502 self.pg0.add_stream(pkts)
5503 self.pg_enable_capture(self.pg_interfaces)
5505 self.pg1.get_capture(max_sessions)
5507 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5508 src_address=self.pg3.local_ip4n,
5510 template_interval=10)
5511 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5512 src_port=self.ipfix_src_port)
5514 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5515 IPv6(src=src, dst=remote_host_ip6) /
5516 TCP(sport=12345, dport=25))
5517 self.pg0.add_stream(p)
5518 self.pg_enable_capture(self.pg_interfaces)
5520 self.pg1.get_capture(0)
5521 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5522 capture = self.pg3.get_capture(9)
5523 ipfix = IPFIXDecoder()
5524 # first load template
5526 self.assertTrue(p.haslayer(IPFIX))
5527 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5528 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5529 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5530 self.assertEqual(p[UDP].dport, 4739)
5531 self.assertEqual(p[IPFIX].observationDomainID,
5532 self.ipfix_domain_id)
5533 if p.haslayer(Template):
5534 ipfix.add_template(p.getlayer(Template))
5535 # verify events in data set
5537 if p.haslayer(Data):
5538 data = ipfix.decode_data_set(p.getlayer(Set))
5539 self.verify_ipfix_max_sessions(data, max_sessions)
5541 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5542 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5543 TCP(sport=12345, dport=80))
5544 self.pg0.add_stream(p)
5545 self.pg_enable_capture(self.pg_interfaces)
5547 self.pg1.get_capture(0)
5548 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5549 capture = self.pg3.get_capture(1)
5550 # verify events in data set
5552 self.assertTrue(p.haslayer(IPFIX))
5553 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5554 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5555 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5556 self.assertEqual(p[UDP].dport, 4739)
5557 self.assertEqual(p[IPFIX].observationDomainID,
5558 self.ipfix_domain_id)
5559 if p.haslayer(Data):
5560 data = ipfix.decode_data_set(p.getlayer(Set))
5561 self.verify_ipfix_max_bibs(data, max_bibs)
5563 def test_ipfix_max_frags(self):
5564 """ IPFIX logging maximum fragments pending reassembly exceeded """
5565 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5567 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5568 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5569 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5570 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5571 src_address=self.pg3.local_ip4n,
5573 template_interval=10)
5574 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5575 src_port=self.ipfix_src_port)
5578 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5579 self.tcp_port_in, 20, data)
5580 self.pg0.add_stream(pkts[-1])
5581 self.pg_enable_capture(self.pg_interfaces)
5583 self.pg1.get_capture(0)
5584 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5585 capture = self.pg3.get_capture(9)
5586 ipfix = IPFIXDecoder()
5587 # first load template
5589 self.assertTrue(p.haslayer(IPFIX))
5590 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5591 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5592 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5593 self.assertEqual(p[UDP].dport, 4739)
5594 self.assertEqual(p[IPFIX].observationDomainID,
5595 self.ipfix_domain_id)
5596 if p.haslayer(Template):
5597 ipfix.add_template(p.getlayer(Template))
5598 # verify events in data set
5600 if p.haslayer(Data):
5601 data = ipfix.decode_data_set(p.getlayer(Set))
5602 self.verify_ipfix_max_fragments_ip6(data, 0,
5603 self.pg0.remote_ip6n)
5605 def test_ipfix_bib_ses(self):
5606 """ IPFIX logging NAT64 BIB/session create and delete events """
5607 self.tcp_port_in = random.randint(1025, 65535)
5608 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5612 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5614 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5615 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5616 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5617 src_address=self.pg3.local_ip4n,
5619 template_interval=10)
5620 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5621 src_port=self.ipfix_src_port)
5624 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5625 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5626 TCP(sport=self.tcp_port_in, dport=25))
5627 self.pg0.add_stream(p)
5628 self.pg_enable_capture(self.pg_interfaces)
5630 p = self.pg1.get_capture(1)
5631 self.tcp_port_out = p[0][TCP].sport
5632 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5633 capture = self.pg3.get_capture(10)
5634 ipfix = IPFIXDecoder()
5635 # first load template
5637 self.assertTrue(p.haslayer(IPFIX))
5638 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5639 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5640 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5641 self.assertEqual(p[UDP].dport, 4739)
5642 self.assertEqual(p[IPFIX].observationDomainID,
5643 self.ipfix_domain_id)
5644 if p.haslayer(Template):
5645 ipfix.add_template(p.getlayer(Template))
5646 # verify events in data set
5648 if p.haslayer(Data):
5649 data = ipfix.decode_data_set(p.getlayer(Set))
5650 if ord(data[0][230]) == 10:
5651 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5652 elif ord(data[0][230]) == 6:
5653 self.verify_ipfix_nat64_ses(data,
5655 self.pg0.remote_ip6n,
5656 self.pg1.remote_ip4,
5659 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5662 self.pg_enable_capture(self.pg_interfaces)
5663 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5666 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5667 capture = self.pg3.get_capture(2)
5668 # verify events in data set
5670 self.assertTrue(p.haslayer(IPFIX))
5671 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5672 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5673 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5674 self.assertEqual(p[UDP].dport, 4739)
5675 self.assertEqual(p[IPFIX].observationDomainID,
5676 self.ipfix_domain_id)
5677 if p.haslayer(Data):
5678 data = ipfix.decode_data_set(p.getlayer(Set))
5679 if ord(data[0][230]) == 11:
5680 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5681 elif ord(data[0][230]) == 7:
5682 self.verify_ipfix_nat64_ses(data,
5684 self.pg0.remote_ip6n,
5685 self.pg1.remote_ip4,
5688 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5690 def nat64_get_ses_num(self):
5692 Return number of active NAT64 sessions.
5694 st = self.vapi.nat64_st_dump()
5697 def clear_nat64(self):
5699 Clear NAT64 configuration.
5701 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5702 domain_id=self.ipfix_domain_id)
5703 self.ipfix_src_port = 4739
5704 self.ipfix_domain_id = 1
5706 self.vapi.nat64_set_timeouts()
5708 interfaces = self.vapi.nat64_interface_dump()
5709 for intf in interfaces:
5710 if intf.is_inside > 1:
5711 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5714 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5718 bib = self.vapi.nat64_bib_dump(255)
5721 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5729 adresses = self.vapi.nat64_pool_addr_dump()
5730 for addr in adresses:
5731 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5736 prefixes = self.vapi.nat64_prefix_dump()
5737 for prefix in prefixes:
5738 self.vapi.nat64_add_del_prefix(prefix.prefix,
5740 vrf_id=prefix.vrf_id,
5744 super(TestNAT64, self).tearDown()
5745 if not self.vpp_dead:
5746 self.logger.info(self.vapi.cli("show nat64 pool"))
5747 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5748 self.logger.info(self.vapi.cli("show nat64 prefix"))
5749 self.logger.info(self.vapi.cli("show nat64 bib all"))
5750 self.logger.info(self.vapi.cli("show nat64 session table all"))
5751 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5755 class TestDSlite(MethodHolder):
5756 """ DS-Lite Test Cases """
5759 def setUpClass(cls):
5760 super(TestDSlite, cls).setUpClass()
5763 cls.nat_addr = '10.0.0.3'
5764 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5766 cls.create_pg_interfaces(range(2))
5768 cls.pg0.config_ip4()
5769 cls.pg0.resolve_arp()
5771 cls.pg1.config_ip6()
5772 cls.pg1.generate_remote_hosts(2)
5773 cls.pg1.configure_ipv6_neighbors()
5776 super(TestDSlite, cls).tearDownClass()
5779 def test_dslite(self):
5780 """ Test DS-Lite """
5781 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5783 aftr_ip4 = '192.0.0.1'
5784 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5785 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5786 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5787 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5790 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5791 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5792 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5793 UDP(sport=20000, dport=10000))
5794 self.pg1.add_stream(p)
5795 self.pg_enable_capture(self.pg_interfaces)
5797 capture = self.pg0.get_capture(1)
5798 capture = capture[0]
5799 self.assertFalse(capture.haslayer(IPv6))
5800 self.assertEqual(capture[IP].src, self.nat_addr)
5801 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5802 self.assertNotEqual(capture[UDP].sport, 20000)
5803 self.assertEqual(capture[UDP].dport, 10000)
5804 self.check_ip_checksum(capture)
5805 out_port = capture[UDP].sport
5807 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5808 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5809 UDP(sport=10000, dport=out_port))
5810 self.pg0.add_stream(p)
5811 self.pg_enable_capture(self.pg_interfaces)
5813 capture = self.pg1.get_capture(1)
5814 capture = capture[0]
5815 self.assertEqual(capture[IPv6].src, aftr_ip6)
5816 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5817 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5818 self.assertEqual(capture[IP].dst, '192.168.1.1')
5819 self.assertEqual(capture[UDP].sport, 10000)
5820 self.assertEqual(capture[UDP].dport, 20000)
5821 self.check_ip_checksum(capture)
5824 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5825 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5826 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5827 TCP(sport=20001, dport=10001))
5828 self.pg1.add_stream(p)
5829 self.pg_enable_capture(self.pg_interfaces)
5831 capture = self.pg0.get_capture(1)
5832 capture = capture[0]
5833 self.assertFalse(capture.haslayer(IPv6))
5834 self.assertEqual(capture[IP].src, self.nat_addr)
5835 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5836 self.assertNotEqual(capture[TCP].sport, 20001)
5837 self.assertEqual(capture[TCP].dport, 10001)
5838 self.check_ip_checksum(capture)
5839 self.check_tcp_checksum(capture)
5840 out_port = capture[TCP].sport
5842 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5843 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5844 TCP(sport=10001, dport=out_port))
5845 self.pg0.add_stream(p)
5846 self.pg_enable_capture(self.pg_interfaces)
5848 capture = self.pg1.get_capture(1)
5849 capture = capture[0]
5850 self.assertEqual(capture[IPv6].src, aftr_ip6)
5851 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5852 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5853 self.assertEqual(capture[IP].dst, '192.168.1.1')
5854 self.assertEqual(capture[TCP].sport, 10001)
5855 self.assertEqual(capture[TCP].dport, 20001)
5856 self.check_ip_checksum(capture)
5857 self.check_tcp_checksum(capture)
5860 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5861 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5862 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5863 ICMP(id=4000, type='echo-request'))
5864 self.pg1.add_stream(p)
5865 self.pg_enable_capture(self.pg_interfaces)
5867 capture = self.pg0.get_capture(1)
5868 capture = capture[0]
5869 self.assertFalse(capture.haslayer(IPv6))
5870 self.assertEqual(capture[IP].src, self.nat_addr)
5871 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5872 self.assertNotEqual(capture[ICMP].id, 4000)
5873 self.check_ip_checksum(capture)
5874 self.check_icmp_checksum(capture)
5875 out_id = capture[ICMP].id
5877 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5878 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5879 ICMP(id=out_id, type='echo-reply'))
5880 self.pg0.add_stream(p)
5881 self.pg_enable_capture(self.pg_interfaces)
5883 capture = self.pg1.get_capture(1)
5884 capture = capture[0]
5885 self.assertEqual(capture[IPv6].src, aftr_ip6)
5886 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5887 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5888 self.assertEqual(capture[IP].dst, '192.168.1.1')
5889 self.assertEqual(capture[ICMP].id, 4000)
5890 self.check_ip_checksum(capture)
5891 self.check_icmp_checksum(capture)
5893 # ping DS-Lite AFTR tunnel endpoint address
5894 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5895 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5896 ICMPv6EchoRequest())
5897 self.pg1.add_stream(p)
5898 self.pg_enable_capture(self.pg_interfaces)
5900 capture = self.pg1.get_capture(1)
5901 self.assertEqual(1, len(capture))
5902 capture = capture[0]
5903 self.assertEqual(capture[IPv6].src, aftr_ip6)
5904 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5905 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5908 super(TestDSlite, self).tearDown()
5909 if not self.vpp_dead:
5910 self.logger.info(self.vapi.cli("show dslite pool"))
5912 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5913 self.logger.info(self.vapi.cli("show dslite sessions"))
5916 class TestDSliteCE(MethodHolder):
5917 """ DS-Lite CE Test Cases """
5920 def setUpConstants(cls):
5921 super(TestDSliteCE, cls).setUpConstants()
5922 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5925 def setUpClass(cls):
5926 super(TestDSliteCE, cls).setUpClass()
5929 cls.create_pg_interfaces(range(2))
5931 cls.pg0.config_ip4()
5932 cls.pg0.resolve_arp()
5934 cls.pg1.config_ip6()
5935 cls.pg1.generate_remote_hosts(1)
5936 cls.pg1.configure_ipv6_neighbors()
5939 super(TestDSliteCE, cls).tearDownClass()
5942 def test_dslite_ce(self):
5943 """ Test DS-Lite CE """
5945 b4_ip4 = '192.0.0.2'
5946 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5947 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5948 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5949 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5951 aftr_ip4 = '192.0.0.1'
5952 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5953 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5954 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5955 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5957 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5958 dst_address_length=128,
5959 next_hop_address=self.pg1.remote_ip6n,
5960 next_hop_sw_if_index=self.pg1.sw_if_index,
5964 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5965 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5966 UDP(sport=10000, dport=20000))
5967 self.pg0.add_stream(p)
5968 self.pg_enable_capture(self.pg_interfaces)
5970 capture = self.pg1.get_capture(1)
5971 capture = capture[0]
5972 self.assertEqual(capture[IPv6].src, b4_ip6)
5973 self.assertEqual(capture[IPv6].dst, aftr_ip6)
5974 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5975 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5976 self.assertEqual(capture[UDP].sport, 10000)
5977 self.assertEqual(capture[UDP].dport, 20000)
5978 self.check_ip_checksum(capture)
5981 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5982 IPv6(dst=b4_ip6, src=aftr_ip6) /
5983 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5984 UDP(sport=20000, dport=10000))
5985 self.pg1.add_stream(p)
5986 self.pg_enable_capture(self.pg_interfaces)
5988 capture = self.pg0.get_capture(1)
5989 capture = capture[0]
5990 self.assertFalse(capture.haslayer(IPv6))
5991 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5992 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5993 self.assertEqual(capture[UDP].sport, 20000)
5994 self.assertEqual(capture[UDP].dport, 10000)
5995 self.check_ip_checksum(capture)
5997 # ping DS-Lite B4 tunnel endpoint address
5998 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5999 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6000 ICMPv6EchoRequest())
6001 self.pg1.add_stream(p)
6002 self.pg_enable_capture(self.pg_interfaces)
6004 capture = self.pg1.get_capture(1)
6005 self.assertEqual(1, len(capture))
6006 capture = capture[0]
6007 self.assertEqual(capture[IPv6].src, b4_ip6)
6008 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6009 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6012 super(TestDSliteCE, self).tearDown()
6013 if not self.vpp_dead:
6015 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6017 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6019 if __name__ == '__main__':
6020 unittest.main(testRunner=VppTestRunner)