9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1035 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036 for lb_sm in lb_static_mappings:
1037 self.vapi.nat44_add_del_lb_static_mapping(
1038 lb_sm.external_addr,
1039 lb_sm.external_port,
1041 vrf_id=lb_sm.vrf_id,
1042 twice_nat=lb_sm.twice_nat,
1043 out2in_only=lb_sm.out2in_only,
1048 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049 for id_m in identity_mappings:
1050 self.vapi.nat44_add_del_identity_mapping(
1051 addr_only=id_m.addr_only,
1054 sw_if_index=id_m.sw_if_index,
1056 protocol=id_m.protocol,
1059 adresses = self.vapi.nat44_address_dump()
1060 for addr in adresses:
1061 self.vapi.nat44_add_del_address_range(addr.ip_address,
1063 twice_nat=addr.twice_nat,
1066 self.vapi.nat_set_reass()
1067 self.vapi.nat_set_reass(is_ip6=1)
1069 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070 local_port=0, external_port=0, vrf_id=0,
1071 is_add=1, external_sw_if_index=0xFFFFFFFF,
1072 proto=0, twice_nat=0, out2in_only=0):
1074 Add/delete NAT44 static mapping
1076 :param local_ip: Local IP address
1077 :param external_ip: External IP address
1078 :param local_port: Local port number (Optional)
1079 :param external_port: External port number (Optional)
1080 :param vrf_id: VRF ID (Default 0)
1081 :param is_add: 1 if add, 0 if delete (Default add)
1082 :param external_sw_if_index: External interface instead of IP address
1083 :param proto: IP protocol (Mandatory if port specified)
1084 :param twice_nat: 1 if translate external host address and port
1085 :param out2in_only: if 1 rule is matching only out2in direction
1088 if local_port and external_port:
1090 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092 self.vapi.nat44_add_del_static_mapping(
1095 external_sw_if_index,
1105 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1107 Add/delete NAT44 address
1109 :param ip: IP address
1110 :param is_add: 1 if add, 0 if delete (Default add)
1111 :param twice_nat: twice NAT address for extenal hosts
1113 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1116 twice_nat=twice_nat)
1118 def test_dynamic(self):
1119 """ NAT44 dynamic translation test """
1121 self.nat44_add_address(self.nat_addr)
1122 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1127 pkts = self.create_stream_in(self.pg0, self.pg1)
1128 self.pg0.add_stream(pkts)
1129 self.pg_enable_capture(self.pg_interfaces)
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out(capture)
1135 pkts = self.create_stream_out(self.pg1)
1136 self.pg1.add_stream(pkts)
1137 self.pg_enable_capture(self.pg_interfaces)
1139 capture = self.pg0.get_capture(len(pkts))
1140 self.verify_capture_in(capture, self.pg0)
1142 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143 """ NAT44 handling of client packets with TTL=1 """
1145 self.nat44_add_address(self.nat_addr)
1146 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1150 # Client side - generate traffic
1151 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152 self.pg0.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1156 # Client side - verify ICMP type 11 packets
1157 capture = self.pg0.get_capture(len(pkts))
1158 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1160 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161 """ NAT44 handling of server packets with TTL=1 """
1163 self.nat44_add_address(self.nat_addr)
1164 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1168 # Client side - create sessions
1169 pkts = self.create_stream_in(self.pg0, self.pg1)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1174 # Server side - generate traffic
1175 capture = self.pg1.get_capture(len(pkts))
1176 self.verify_capture_out(capture)
1177 pkts = self.create_stream_out(self.pg1, ttl=1)
1178 self.pg1.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Server side - verify ICMP type 11 packets
1183 capture = self.pg1.get_capture(len(pkts))
1184 self.verify_capture_out_with_icmp_errors(capture,
1185 src_ip=self.pg1.local_ip4)
1187 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188 """ NAT44 handling of error responses to client packets with TTL=2 """
1190 self.nat44_add_address(self.nat_addr)
1191 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1195 # Client side - generate traffic
1196 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197 self.pg0.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1201 # Server side - simulate ICMP type 11 response
1202 capture = self.pg1.get_capture(len(pkts))
1203 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205 ICMP(type=11) / packet[IP] for packet in capture]
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1210 # Client side - verify ICMP type 11 packets
1211 capture = self.pg0.get_capture(len(pkts))
1212 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1214 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215 """ NAT44 handling of error responses to server packets with TTL=2 """
1217 self.nat44_add_address(self.nat_addr)
1218 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1222 # Client side - create sessions
1223 pkts = self.create_stream_in(self.pg0, self.pg1)
1224 self.pg0.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1228 # Server side - generate traffic
1229 capture = self.pg1.get_capture(len(pkts))
1230 self.verify_capture_out(capture)
1231 pkts = self.create_stream_out(self.pg1, ttl=2)
1232 self.pg1.add_stream(pkts)
1233 self.pg_enable_capture(self.pg_interfaces)
1236 # Client side - simulate ICMP type 11 response
1237 capture = self.pg0.get_capture(len(pkts))
1238 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240 ICMP(type=11) / packet[IP] for packet in capture]
1241 self.pg0.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1245 # Server side - verify ICMP type 11 packets
1246 capture = self.pg1.get_capture(len(pkts))
1247 self.verify_capture_out_with_icmp_errors(capture)
1249 def test_ping_out_interface_from_outside(self):
1250 """ Ping NAT44 out interface from outside network """
1252 self.nat44_add_address(self.nat_addr)
1253 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1257 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259 ICMP(id=self.icmp_id_out, type='echo-request'))
1261 self.pg1.add_stream(pkts)
1262 self.pg_enable_capture(self.pg_interfaces)
1264 capture = self.pg1.get_capture(len(pkts))
1265 self.assertEqual(1, len(capture))
1268 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271 self.assertEqual(packet[ICMP].type, 0) # echo reply
1273 self.logger.error(ppp("Unexpected or invalid packet "
1274 "(outside network):", packet))
1277 def test_ping_internal_host_from_outside(self):
1278 """ Ping internal host from outside network """
1280 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1286 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288 ICMP(id=self.icmp_id_out, type='echo-request'))
1289 self.pg1.add_stream(pkt)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(1)
1293 self.verify_capture_in(capture, self.pg0, packet_num=1)
1294 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1297 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299 ICMP(id=self.icmp_id_in, type='echo-reply'))
1300 self.pg0.add_stream(pkt)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(1)
1304 self.verify_capture_out(capture, same_port=True, packet_num=1)
1305 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1307 def test_forwarding(self):
1308 """ NAT44 forwarding test """
1310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1313 self.vapi.nat44_forwarding_enable_disable(1)
1315 real_ip = self.pg0.remote_ip4n
1316 alias_ip = self.nat_addr_n
1317 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318 external_ip=alias_ip)
1321 # in2out - static mapping match
1323 pkts = self.create_stream_out(self.pg1)
1324 self.pg1.add_stream(pkts)
1325 self.pg_enable_capture(self.pg_interfaces)
1327 capture = self.pg0.get_capture(len(pkts))
1328 self.verify_capture_in(capture, self.pg0)
1330 pkts = self.create_stream_in(self.pg0, self.pg1)
1331 self.pg0.add_stream(pkts)
1332 self.pg_enable_capture(self.pg_interfaces)
1334 capture = self.pg1.get_capture(len(pkts))
1335 self.verify_capture_out(capture, same_port=True)
1337 # in2out - no static mapping match
1339 host0 = self.pg0.remote_hosts[0]
1340 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1342 pkts = self.create_stream_out(self.pg1,
1343 dst_ip=self.pg0.remote_ip4,
1344 use_inside_ports=True)
1345 self.pg1.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg0.get_capture(len(pkts))
1349 self.verify_capture_in(capture, self.pg0)
1351 pkts = self.create_stream_in(self.pg0, self.pg1)
1352 self.pg0.add_stream(pkts)
1353 self.pg_enable_capture(self.pg_interfaces)
1355 capture = self.pg1.get_capture(len(pkts))
1356 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1359 self.pg0.remote_hosts[0] = host0
1362 self.vapi.nat44_forwarding_enable_disable(0)
1363 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364 external_ip=alias_ip,
1367 def test_static_in(self):
1368 """ 1:1 NAT initialized from inside network """
1370 nat_ip = "10.0.0.10"
1371 self.tcp_port_out = 6303
1372 self.udp_port_out = 6304
1373 self.icmp_id_out = 6305
1375 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381 pkts = self.create_stream_in(self.pg0, self.pg1)
1382 self.pg0.add_stream(pkts)
1383 self.pg_enable_capture(self.pg_interfaces)
1385 capture = self.pg1.get_capture(len(pkts))
1386 self.verify_capture_out(capture, nat_ip, True)
1389 pkts = self.create_stream_out(self.pg1, nat_ip)
1390 self.pg1.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg0.get_capture(len(pkts))
1394 self.verify_capture_in(capture, self.pg0)
1396 def test_static_out(self):
1397 """ 1:1 NAT initialized from outside network """
1399 nat_ip = "10.0.0.20"
1400 self.tcp_port_out = 6303
1401 self.udp_port_out = 6304
1402 self.icmp_id_out = 6305
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1410 pkts = self.create_stream_out(self.pg1, nat_ip)
1411 self.pg1.add_stream(pkts)
1412 self.pg_enable_capture(self.pg_interfaces)
1414 capture = self.pg0.get_capture(len(pkts))
1415 self.verify_capture_in(capture, self.pg0)
1418 pkts = self.create_stream_in(self.pg0, self.pg1)
1419 self.pg0.add_stream(pkts)
1420 self.pg_enable_capture(self.pg_interfaces)
1422 capture = self.pg1.get_capture(len(pkts))
1423 self.verify_capture_out(capture, nat_ip, True)
1425 def test_static_with_port_in(self):
1426 """ 1:1 NAPT initialized from inside network """
1428 self.tcp_port_out = 3606
1429 self.udp_port_out = 3607
1430 self.icmp_id_out = 3608
1432 self.nat44_add_address(self.nat_addr)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.tcp_port_in, self.tcp_port_out,
1435 proto=IP_PROTOS.tcp)
1436 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437 self.udp_port_in, self.udp_port_out,
1438 proto=IP_PROTOS.udp)
1439 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440 self.icmp_id_in, self.icmp_id_out,
1441 proto=IP_PROTOS.icmp)
1442 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1447 pkts = self.create_stream_in(self.pg0, self.pg1)
1448 self.pg0.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(len(pkts))
1452 self.verify_capture_out(capture)
1455 pkts = self.create_stream_out(self.pg1)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1462 def test_static_with_port_out(self):
1463 """ 1:1 NAPT initialized from outside network """
1465 self.tcp_port_out = 30606
1466 self.udp_port_out = 30607
1467 self.icmp_id_out = 30608
1469 self.nat44_add_address(self.nat_addr)
1470 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471 self.tcp_port_in, self.tcp_port_out,
1472 proto=IP_PROTOS.tcp)
1473 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474 self.udp_port_in, self.udp_port_out,
1475 proto=IP_PROTOS.udp)
1476 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477 self.icmp_id_in, self.icmp_id_out,
1478 proto=IP_PROTOS.icmp)
1479 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1484 pkts = self.create_stream_out(self.pg1)
1485 self.pg1.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg0.get_capture(len(pkts))
1489 self.verify_capture_in(capture, self.pg0)
1492 pkts = self.create_stream_in(self.pg0, self.pg1)
1493 self.pg0.add_stream(pkts)
1494 self.pg_enable_capture(self.pg_interfaces)
1496 capture = self.pg1.get_capture(len(pkts))
1497 self.verify_capture_out(capture)
1499 def test_static_with_port_out2(self):
1500 """ 1:1 NAPT symmetrical rule """
1505 self.vapi.nat44_forwarding_enable_disable(1)
1506 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507 local_port, external_port,
1508 proto=IP_PROTOS.tcp, out2in_only=1)
1509 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1513 # from client to service
1514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516 TCP(sport=12345, dport=external_port))
1517 self.pg1.add_stream(p)
1518 self.pg_enable_capture(self.pg_interfaces)
1520 capture = self.pg0.get_capture(1)
1526 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527 self.assertEqual(tcp.dport, local_port)
1528 self.check_tcp_checksum(p)
1529 self.check_ip_checksum(p)
1531 self.logger.error(ppp("Unexpected or invalid packet:", p))
1534 # from service back to client
1535 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537 TCP(sport=local_port, dport=12345))
1538 self.pg0.add_stream(p)
1539 self.pg_enable_capture(self.pg_interfaces)
1541 capture = self.pg1.get_capture(1)
1546 self.assertEqual(ip.src, self.nat_addr)
1547 self.assertEqual(tcp.sport, external_port)
1548 self.check_tcp_checksum(p)
1549 self.check_ip_checksum(p)
1551 self.logger.error(ppp("Unexpected or invalid packet:", p))
1554 # from client to server (no translation)
1555 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557 TCP(sport=12346, dport=local_port))
1558 self.pg1.add_stream(p)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(tcp.dport, local_port)
1569 self.check_tcp_checksum(p)
1570 self.check_ip_checksum(p)
1572 self.logger.error(ppp("Unexpected or invalid packet:", p))
1575 # from service back to client (no translation)
1576 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578 TCP(sport=local_port, dport=12346))
1579 self.pg0.add_stream(p)
1580 self.pg_enable_capture(self.pg_interfaces)
1582 capture = self.pg1.get_capture(1)
1587 self.assertEqual(ip.src, self.pg0.remote_ip4)
1588 self.assertEqual(tcp.sport, local_port)
1589 self.check_tcp_checksum(p)
1590 self.check_ip_checksum(p)
1592 self.logger.error(ppp("Unexpected or invalid packet:", p))
1595 def test_static_vrf_aware(self):
1596 """ 1:1 NAT VRF awareness """
1598 nat_ip1 = "10.0.0.30"
1599 nat_ip2 = "10.0.0.40"
1600 self.tcp_port_out = 6303
1601 self.udp_port_out = 6304
1602 self.icmp_id_out = 6305
1604 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1606 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1608 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1613 # inside interface VRF match NAT44 static mapping VRF
1614 pkts = self.create_stream_in(self.pg4, self.pg3)
1615 self.pg4.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg3.get_capture(len(pkts))
1619 self.verify_capture_out(capture, nat_ip1, True)
1621 # inside interface VRF don't match NAT44 static mapping VRF (packets
1623 pkts = self.create_stream_in(self.pg0, self.pg3)
1624 self.pg0.add_stream(pkts)
1625 self.pg_enable_capture(self.pg_interfaces)
1627 self.pg3.assert_nothing_captured()
1629 def test_identity_nat(self):
1630 """ Identity NAT """
1632 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1637 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639 TCP(sport=12345, dport=56789))
1640 self.pg1.add_stream(p)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg0.get_capture(1)
1648 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649 self.assertEqual(ip.src, self.pg1.remote_ip4)
1650 self.assertEqual(tcp.dport, 56789)
1651 self.assertEqual(tcp.sport, 12345)
1652 self.check_tcp_checksum(p)
1653 self.check_ip_checksum(p)
1655 self.logger.error(ppp("Unexpected or invalid packet:", p))
1658 def test_static_lb(self):
1659 """ NAT44 local service load balancing """
1660 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1663 server1 = self.pg0.remote_hosts[0]
1664 server2 = self.pg0.remote_hosts[1]
1666 locals = [{'addr': server1.ip4n,
1669 {'addr': server2.ip4n,
1673 self.nat44_add_address(self.nat_addr)
1674 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1677 local_num=len(locals),
1679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1683 # from client to service
1684 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686 TCP(sport=12345, dport=external_port))
1687 self.pg1.add_stream(p)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg0.get_capture(1)
1696 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697 if ip.dst == server1.ip4:
1701 self.assertEqual(tcp.dport, local_port)
1702 self.check_tcp_checksum(p)
1703 self.check_ip_checksum(p)
1705 self.logger.error(ppp("Unexpected or invalid packet:", p))
1708 # from service back to client
1709 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711 TCP(sport=local_port, dport=12345))
1712 self.pg0.add_stream(p)
1713 self.pg_enable_capture(self.pg_interfaces)
1715 capture = self.pg1.get_capture(1)
1720 self.assertEqual(ip.src, self.nat_addr)
1721 self.assertEqual(tcp.sport, external_port)
1722 self.check_tcp_checksum(p)
1723 self.check_ip_checksum(p)
1725 self.logger.error(ppp("Unexpected or invalid packet:", p))
1731 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1733 for client in clients:
1734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735 IP(src=client, dst=self.nat_addr) /
1736 TCP(sport=12345, dport=external_port))
1738 self.pg1.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1741 capture = self.pg0.get_capture(len(pkts))
1743 if p[IP].dst == server1.ip4:
1747 self.assertTrue(server1_n > server2_n)
1749 def test_static_lb_2(self):
1750 """ NAT44 local service load balancing (asymmetrical rule) """
1751 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1754 server1 = self.pg0.remote_hosts[0]
1755 server2 = self.pg0.remote_hosts[1]
1757 locals = [{'addr': server1.ip4n,
1760 {'addr': server2.ip4n,
1764 self.vapi.nat44_forwarding_enable_disable(1)
1765 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1769 local_num=len(locals),
1771 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1775 # from client to service
1776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778 TCP(sport=12345, dport=external_port))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1788 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789 if ip.dst == server1.ip4:
1793 self.assertEqual(tcp.dport, local_port)
1794 self.check_tcp_checksum(p)
1795 self.check_ip_checksum(p)
1797 self.logger.error(ppp("Unexpected or invalid packet:", p))
1800 # from service back to client
1801 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803 TCP(sport=local_port, dport=12345))
1804 self.pg0.add_stream(p)
1805 self.pg_enable_capture(self.pg_interfaces)
1807 capture = self.pg1.get_capture(1)
1812 self.assertEqual(ip.src, self.nat_addr)
1813 self.assertEqual(tcp.sport, external_port)
1814 self.check_tcp_checksum(p)
1815 self.check_ip_checksum(p)
1817 self.logger.error(ppp("Unexpected or invalid packet:", p))
1820 # from client to server (no translation)
1821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823 TCP(sport=12346, dport=local_port))
1824 self.pg1.add_stream(p)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg0.get_capture(1)
1833 self.assertEqual(ip.dst, server1.ip4)
1834 self.assertEqual(tcp.dport, local_port)
1835 self.check_tcp_checksum(p)
1836 self.check_ip_checksum(p)
1838 self.logger.error(ppp("Unexpected or invalid packet:", p))
1841 # from service back to client (no translation)
1842 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844 TCP(sport=local_port, dport=12346))
1845 self.pg0.add_stream(p)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg1.get_capture(1)
1853 self.assertEqual(ip.src, server1.ip4)
1854 self.assertEqual(tcp.sport, local_port)
1855 self.check_tcp_checksum(p)
1856 self.check_ip_checksum(p)
1858 self.logger.error(ppp("Unexpected or invalid packet:", p))
1861 def test_multiple_inside_interfaces(self):
1862 """ NAT44 multiple non-overlapping address space inside interfaces """
1864 self.nat44_add_address(self.nat_addr)
1865 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1870 # between two NAT44 inside interfaces (no translation)
1871 pkts = self.create_stream_in(self.pg0, self.pg1)
1872 self.pg0.add_stream(pkts)
1873 self.pg_enable_capture(self.pg_interfaces)
1875 capture = self.pg1.get_capture(len(pkts))
1876 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1878 # from NAT44 inside to interface without NAT44 feature (no translation)
1879 pkts = self.create_stream_in(self.pg0, self.pg2)
1880 self.pg0.add_stream(pkts)
1881 self.pg_enable_capture(self.pg_interfaces)
1883 capture = self.pg2.get_capture(len(pkts))
1884 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1886 # in2out 1st interface
1887 pkts = self.create_stream_in(self.pg0, self.pg3)
1888 self.pg0.add_stream(pkts)
1889 self.pg_enable_capture(self.pg_interfaces)
1891 capture = self.pg3.get_capture(len(pkts))
1892 self.verify_capture_out(capture)
1894 # out2in 1st interface
1895 pkts = self.create_stream_out(self.pg3)
1896 self.pg3.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg0.get_capture(len(pkts))
1900 self.verify_capture_in(capture, self.pg0)
1902 # in2out 2nd interface
1903 pkts = self.create_stream_in(self.pg1, self.pg3)
1904 self.pg1.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg3.get_capture(len(pkts))
1908 self.verify_capture_out(capture)
1910 # out2in 2nd interface
1911 pkts = self.create_stream_out(self.pg3)
1912 self.pg3.add_stream(pkts)
1913 self.pg_enable_capture(self.pg_interfaces)
1915 capture = self.pg1.get_capture(len(pkts))
1916 self.verify_capture_in(capture, self.pg1)
1918 def test_inside_overlapping_interfaces(self):
1919 """ NAT44 multiple inside interfaces with overlapping address space """
1921 static_nat_ip = "10.0.0.10"
1922 self.nat44_add_address(self.nat_addr)
1923 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1925 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1931 # between NAT44 inside interfaces with same VRF (no translation)
1932 pkts = self.create_stream_in(self.pg4, self.pg5)
1933 self.pg4.add_stream(pkts)
1934 self.pg_enable_capture(self.pg_interfaces)
1936 capture = self.pg5.get_capture(len(pkts))
1937 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1939 # between NAT44 inside interfaces with different VRF (hairpinning)
1940 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942 TCP(sport=1234, dport=5678))
1943 self.pg4.add_stream(p)
1944 self.pg_enable_capture(self.pg_interfaces)
1946 capture = self.pg6.get_capture(1)
1951 self.assertEqual(ip.src, self.nat_addr)
1952 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953 self.assertNotEqual(tcp.sport, 1234)
1954 self.assertEqual(tcp.dport, 5678)
1956 self.logger.error(ppp("Unexpected or invalid packet:", p))
1959 # in2out 1st interface
1960 pkts = self.create_stream_in(self.pg4, self.pg3)
1961 self.pg4.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg3.get_capture(len(pkts))
1965 self.verify_capture_out(capture)
1967 # out2in 1st interface
1968 pkts = self.create_stream_out(self.pg3)
1969 self.pg3.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg4.get_capture(len(pkts))
1973 self.verify_capture_in(capture, self.pg4)
1975 # in2out 2nd interface
1976 pkts = self.create_stream_in(self.pg5, self.pg3)
1977 self.pg5.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg3.get_capture(len(pkts))
1981 self.verify_capture_out(capture)
1983 # out2in 2nd interface
1984 pkts = self.create_stream_out(self.pg3)
1985 self.pg3.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg5.get_capture(len(pkts))
1989 self.verify_capture_in(capture, self.pg5)
1992 addresses = self.vapi.nat44_address_dump()
1993 self.assertEqual(len(addresses), 1)
1994 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995 self.assertEqual(len(sessions), 3)
1996 for session in sessions:
1997 self.assertFalse(session.is_static)
1998 self.assertEqual(session.inside_ip_address[0:4],
1999 self.pg5.remote_ip4n)
2000 self.assertEqual(session.outside_ip_address,
2001 addresses[0].ip_address)
2002 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2012 # in2out 3rd interface
2013 pkts = self.create_stream_in(self.pg6, self.pg3)
2014 self.pg6.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg3.get_capture(len(pkts))
2018 self.verify_capture_out(capture, static_nat_ip, True)
2020 # out2in 3rd interface
2021 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022 self.pg3.add_stream(pkts)
2023 self.pg_enable_capture(self.pg_interfaces)
2025 capture = self.pg6.get_capture(len(pkts))
2026 self.verify_capture_in(capture, self.pg6)
2028 # general user and session dump verifications
2029 users = self.vapi.nat44_user_dump()
2030 self.assertTrue(len(users) >= 3)
2031 addresses = self.vapi.nat44_address_dump()
2032 self.assertEqual(len(addresses), 1)
2034 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2036 for session in sessions:
2037 self.assertEqual(user.ip_address, session.inside_ip_address)
2038 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039 self.assertTrue(session.protocol in
2040 [IP_PROTOS.tcp, IP_PROTOS.udp,
2044 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045 self.assertTrue(len(sessions) >= 4)
2046 for session in sessions:
2047 self.assertFalse(session.is_static)
2048 self.assertEqual(session.inside_ip_address[0:4],
2049 self.pg4.remote_ip4n)
2050 self.assertEqual(session.outside_ip_address,
2051 addresses[0].ip_address)
2054 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055 self.assertTrue(len(sessions) >= 3)
2056 for session in sessions:
2057 self.assertTrue(session.is_static)
2058 self.assertEqual(session.inside_ip_address[0:4],
2059 self.pg6.remote_ip4n)
2060 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061 map(int, static_nat_ip.split('.')))
2062 self.assertTrue(session.inside_port in
2063 [self.tcp_port_in, self.udp_port_in,
2066 def test_hairpinning(self):
2067 """ NAT44 hairpinning - 1:1 NAPT """
2069 host = self.pg0.remote_hosts[0]
2070 server = self.pg0.remote_hosts[1]
2073 server_in_port = 5678
2074 server_out_port = 8765
2076 self.nat44_add_address(self.nat_addr)
2077 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2080 # add static mapping for server
2081 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082 server_in_port, server_out_port,
2083 proto=IP_PROTOS.tcp)
2085 # send packet from host to server
2086 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087 IP(src=host.ip4, dst=self.nat_addr) /
2088 TCP(sport=host_in_port, dport=server_out_port))
2089 self.pg0.add_stream(p)
2090 self.pg_enable_capture(self.pg_interfaces)
2092 capture = self.pg0.get_capture(1)
2097 self.assertEqual(ip.src, self.nat_addr)
2098 self.assertEqual(ip.dst, server.ip4)
2099 self.assertNotEqual(tcp.sport, host_in_port)
2100 self.assertEqual(tcp.dport, server_in_port)
2101 self.check_tcp_checksum(p)
2102 host_out_port = tcp.sport
2104 self.logger.error(ppp("Unexpected or invalid packet:", p))
2107 # send reply from server to host
2108 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109 IP(src=server.ip4, dst=self.nat_addr) /
2110 TCP(sport=server_in_port, dport=host_out_port))
2111 self.pg0.add_stream(p)
2112 self.pg_enable_capture(self.pg_interfaces)
2114 capture = self.pg0.get_capture(1)
2119 self.assertEqual(ip.src, self.nat_addr)
2120 self.assertEqual(ip.dst, host.ip4)
2121 self.assertEqual(tcp.sport, server_out_port)
2122 self.assertEqual(tcp.dport, host_in_port)
2123 self.check_tcp_checksum(p)
2125 self.logger.error(ppp("Unexpected or invalid packet:", p))
2128 def test_hairpinning2(self):
2129 """ NAT44 hairpinning - 1:1 NAT"""
2131 server1_nat_ip = "10.0.0.10"
2132 server2_nat_ip = "10.0.0.11"
2133 host = self.pg0.remote_hosts[0]
2134 server1 = self.pg0.remote_hosts[1]
2135 server2 = self.pg0.remote_hosts[2]
2136 server_tcp_port = 22
2137 server_udp_port = 20
2139 self.nat44_add_address(self.nat_addr)
2140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2144 # add static mapping for servers
2145 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2150 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151 IP(src=host.ip4, dst=server1_nat_ip) /
2152 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2154 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155 IP(src=host.ip4, dst=server1_nat_ip) /
2156 UDP(sport=self.udp_port_in, dport=server_udp_port))
2158 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159 IP(src=host.ip4, dst=server1_nat_ip) /
2160 ICMP(id=self.icmp_id_in, type='echo-request'))
2162 self.pg0.add_stream(pkts)
2163 self.pg_enable_capture(self.pg_interfaces)
2165 capture = self.pg0.get_capture(len(pkts))
2166 for packet in capture:
2168 self.assertEqual(packet[IP].src, self.nat_addr)
2169 self.assertEqual(packet[IP].dst, server1.ip4)
2170 if packet.haslayer(TCP):
2171 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172 self.assertEqual(packet[TCP].dport, server_tcp_port)
2173 self.tcp_port_out = packet[TCP].sport
2174 self.check_tcp_checksum(packet)
2175 elif packet.haslayer(UDP):
2176 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177 self.assertEqual(packet[UDP].dport, server_udp_port)
2178 self.udp_port_out = packet[UDP].sport
2180 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181 self.icmp_id_out = packet[ICMP].id
2183 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2188 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189 IP(src=server1.ip4, dst=self.nat_addr) /
2190 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2192 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193 IP(src=server1.ip4, dst=self.nat_addr) /
2194 UDP(sport=server_udp_port, dport=self.udp_port_out))
2196 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197 IP(src=server1.ip4, dst=self.nat_addr) /
2198 ICMP(id=self.icmp_id_out, type='echo-reply'))
2200 self.pg0.add_stream(pkts)
2201 self.pg_enable_capture(self.pg_interfaces)
2203 capture = self.pg0.get_capture(len(pkts))
2204 for packet in capture:
2206 self.assertEqual(packet[IP].src, server1_nat_ip)
2207 self.assertEqual(packet[IP].dst, host.ip4)
2208 if packet.haslayer(TCP):
2209 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210 self.assertEqual(packet[TCP].sport, server_tcp_port)
2211 self.check_tcp_checksum(packet)
2212 elif packet.haslayer(UDP):
2213 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214 self.assertEqual(packet[UDP].sport, server_udp_port)
2216 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2218 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2221 # server2 to server1
2223 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224 IP(src=server2.ip4, dst=server1_nat_ip) /
2225 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2227 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228 IP(src=server2.ip4, dst=server1_nat_ip) /
2229 UDP(sport=self.udp_port_in, dport=server_udp_port))
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=server2.ip4, dst=server1_nat_ip) /
2233 ICMP(id=self.icmp_id_in, type='echo-request'))
2235 self.pg0.add_stream(pkts)
2236 self.pg_enable_capture(self.pg_interfaces)
2238 capture = self.pg0.get_capture(len(pkts))
2239 for packet in capture:
2241 self.assertEqual(packet[IP].src, server2_nat_ip)
2242 self.assertEqual(packet[IP].dst, server1.ip4)
2243 if packet.haslayer(TCP):
2244 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245 self.assertEqual(packet[TCP].dport, server_tcp_port)
2246 self.tcp_port_out = packet[TCP].sport
2247 self.check_tcp_checksum(packet)
2248 elif packet.haslayer(UDP):
2249 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250 self.assertEqual(packet[UDP].dport, server_udp_port)
2251 self.udp_port_out = packet[UDP].sport
2253 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254 self.icmp_id_out = packet[ICMP].id
2256 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2259 # server1 to server2
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=server1.ip4, dst=server2_nat_ip) /
2263 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=server1.ip4, dst=server2_nat_ip) /
2267 UDP(sport=server_udp_port, dport=self.udp_port_out))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=server2_nat_ip) /
2271 ICMP(id=self.icmp_id_out, type='echo-reply'))
2273 self.pg0.add_stream(pkts)
2274 self.pg_enable_capture(self.pg_interfaces)
2276 capture = self.pg0.get_capture(len(pkts))
2277 for packet in capture:
2279 self.assertEqual(packet[IP].src, server1_nat_ip)
2280 self.assertEqual(packet[IP].dst, server2.ip4)
2281 if packet.haslayer(TCP):
2282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283 self.assertEqual(packet[TCP].sport, server_tcp_port)
2284 self.check_tcp_checksum(packet)
2285 elif packet.haslayer(UDP):
2286 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287 self.assertEqual(packet[UDP].sport, server_udp_port)
2289 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2291 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2294 def test_max_translations_per_user(self):
2295 """ MAX translations per user - recycle the least recently used """
2297 self.nat44_add_address(self.nat_addr)
2298 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2302 # get maximum number of translations per user
2303 nat44_config = self.vapi.nat_show_config()
2305 # send more than maximum number of translations per user packets
2306 pkts_num = nat44_config.max_translations_per_user + 5
2308 for port in range(0, pkts_num):
2309 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311 TCP(sport=1025 + port))
2313 self.pg0.add_stream(pkts)
2314 self.pg_enable_capture(self.pg_interfaces)
2317 # verify number of translated packet
2318 self.pg1.get_capture(pkts_num)
2320 def test_interface_addr(self):
2321 """ Acquire NAT44 addresses from interface """
2322 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2324 # no address in NAT pool
2325 adresses = self.vapi.nat44_address_dump()
2326 self.assertEqual(0, len(adresses))
2328 # configure interface address and check NAT address pool
2329 self.pg7.config_ip4()
2330 adresses = self.vapi.nat44_address_dump()
2331 self.assertEqual(1, len(adresses))
2332 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2334 # remove interface address and check NAT address pool
2335 self.pg7.unconfig_ip4()
2336 adresses = self.vapi.nat44_address_dump()
2337 self.assertEqual(0, len(adresses))
2339 def test_interface_addr_static_mapping(self):
2340 """ Static mapping with addresses from interface """
2341 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342 self.nat44_add_static_mapping(
2344 external_sw_if_index=self.pg7.sw_if_index)
2346 # static mappings with external interface
2347 static_mappings = self.vapi.nat44_static_mapping_dump()
2348 self.assertEqual(1, len(static_mappings))
2349 self.assertEqual(self.pg7.sw_if_index,
2350 static_mappings[0].external_sw_if_index)
2352 # configure interface address and check static mappings
2353 self.pg7.config_ip4()
2354 static_mappings = self.vapi.nat44_static_mapping_dump()
2355 self.assertEqual(1, len(static_mappings))
2356 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357 self.pg7.local_ip4n)
2358 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2360 # remove interface address and check static mappings
2361 self.pg7.unconfig_ip4()
2362 static_mappings = self.vapi.nat44_static_mapping_dump()
2363 self.assertEqual(0, len(static_mappings))
2365 def test_interface_addr_identity_nat(self):
2366 """ Identity NAT with addresses from interface """
2369 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370 self.vapi.nat44_add_del_identity_mapping(
2371 sw_if_index=self.pg7.sw_if_index,
2373 protocol=IP_PROTOS.tcp,
2376 # identity mappings with external interface
2377 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378 self.assertEqual(1, len(identity_mappings))
2379 self.assertEqual(self.pg7.sw_if_index,
2380 identity_mappings[0].sw_if_index)
2382 # configure interface address and check identity mappings
2383 self.pg7.config_ip4()
2384 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385 self.assertEqual(1, len(identity_mappings))
2386 self.assertEqual(identity_mappings[0].ip_address,
2387 self.pg7.local_ip4n)
2388 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389 self.assertEqual(port, identity_mappings[0].port)
2390 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2392 # remove interface address and check identity mappings
2393 self.pg7.unconfig_ip4()
2394 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395 self.assertEqual(0, len(identity_mappings))
2397 def test_ipfix_nat44_sess(self):
2398 """ IPFIX logging NAT44 session created/delted """
2399 self.ipfix_domain_id = 10
2400 self.ipfix_src_port = 20202
2401 colector_port = 30303
2402 bind_layers(UDP, IPFIX, dport=30303)
2403 self.nat44_add_address(self.nat_addr)
2404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2407 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408 src_address=self.pg3.local_ip4n,
2410 template_interval=10,
2411 collector_port=colector_port)
2412 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413 src_port=self.ipfix_src_port)
2415 pkts = self.create_stream_in(self.pg0, self.pg1)
2416 self.pg0.add_stream(pkts)
2417 self.pg_enable_capture(self.pg_interfaces)
2419 capture = self.pg1.get_capture(len(pkts))
2420 self.verify_capture_out(capture)
2421 self.nat44_add_address(self.nat_addr, is_add=0)
2422 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2423 capture = self.pg3.get_capture(9)
2424 ipfix = IPFIXDecoder()
2425 # first load template
2427 self.assertTrue(p.haslayer(IPFIX))
2428 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431 self.assertEqual(p[UDP].dport, colector_port)
2432 self.assertEqual(p[IPFIX].observationDomainID,
2433 self.ipfix_domain_id)
2434 if p.haslayer(Template):
2435 ipfix.add_template(p.getlayer(Template))
2436 # verify events in data set
2438 if p.haslayer(Data):
2439 data = ipfix.decode_data_set(p.getlayer(Set))
2440 self.verify_ipfix_nat44_ses(data)
2442 def test_ipfix_addr_exhausted(self):
2443 """ IPFIX logging NAT addresses exhausted """
2444 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2447 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448 src_address=self.pg3.local_ip4n,
2450 template_interval=10)
2451 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452 src_port=self.ipfix_src_port)
2454 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2457 self.pg0.add_stream(p)
2458 self.pg_enable_capture(self.pg_interfaces)
2460 capture = self.pg1.get_capture(0)
2461 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2462 capture = self.pg3.get_capture(9)
2463 ipfix = IPFIXDecoder()
2464 # first load template
2466 self.assertTrue(p.haslayer(IPFIX))
2467 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470 self.assertEqual(p[UDP].dport, 4739)
2471 self.assertEqual(p[IPFIX].observationDomainID,
2472 self.ipfix_domain_id)
2473 if p.haslayer(Template):
2474 ipfix.add_template(p.getlayer(Template))
2475 # verify events in data set
2477 if p.haslayer(Data):
2478 data = ipfix.decode_data_set(p.getlayer(Set))
2479 self.verify_ipfix_addr_exhausted(data)
2481 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482 def test_ipfix_max_sessions(self):
2483 """ IPFIX logging maximum session entries exceeded """
2484 self.nat44_add_address(self.nat_addr)
2485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2489 nat44_config = self.vapi.nat_show_config()
2490 max_sessions = 10 * nat44_config.translation_buckets
2493 for i in range(0, max_sessions):
2494 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496 IP(src=src, dst=self.pg1.remote_ip4) /
2499 self.pg0.add_stream(pkts)
2500 self.pg_enable_capture(self.pg_interfaces)
2503 self.pg1.get_capture(max_sessions)
2504 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505 src_address=self.pg3.local_ip4n,
2507 template_interval=10)
2508 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509 src_port=self.ipfix_src_port)
2511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2514 self.pg0.add_stream(p)
2515 self.pg_enable_capture(self.pg_interfaces)
2517 self.pg1.get_capture(0)
2518 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2519 capture = self.pg3.get_capture(9)
2520 ipfix = IPFIXDecoder()
2521 # first load template
2523 self.assertTrue(p.haslayer(IPFIX))
2524 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527 self.assertEqual(p[UDP].dport, 4739)
2528 self.assertEqual(p[IPFIX].observationDomainID,
2529 self.ipfix_domain_id)
2530 if p.haslayer(Template):
2531 ipfix.add_template(p.getlayer(Template))
2532 # verify events in data set
2534 if p.haslayer(Data):
2535 data = ipfix.decode_data_set(p.getlayer(Set))
2536 self.verify_ipfix_max_sessions(data, max_sessions)
2538 def test_pool_addr_fib(self):
2539 """ NAT44 add pool addresses to FIB """
2540 static_addr = '10.0.0.10'
2541 self.nat44_add_address(self.nat_addr)
2542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2545 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2548 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549 ARP(op=ARP.who_has, pdst=self.nat_addr,
2550 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551 self.pg1.add_stream(p)
2552 self.pg_enable_capture(self.pg_interfaces)
2554 capture = self.pg1.get_capture(1)
2555 self.assertTrue(capture[0].haslayer(ARP))
2556 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2559 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560 ARP(op=ARP.who_has, pdst=static_addr,
2561 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562 self.pg1.add_stream(p)
2563 self.pg_enable_capture(self.pg_interfaces)
2565 capture = self.pg1.get_capture(1)
2566 self.assertTrue(capture[0].haslayer(ARP))
2567 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2569 # send ARP to non-NAT44 interface
2570 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571 ARP(op=ARP.who_has, pdst=self.nat_addr,
2572 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573 self.pg2.add_stream(p)
2574 self.pg_enable_capture(self.pg_interfaces)
2576 capture = self.pg1.get_capture(0)
2578 # remove addresses and verify
2579 self.nat44_add_address(self.nat_addr, is_add=0)
2580 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2583 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584 ARP(op=ARP.who_has, pdst=self.nat_addr,
2585 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586 self.pg1.add_stream(p)
2587 self.pg_enable_capture(self.pg_interfaces)
2589 capture = self.pg1.get_capture(0)
2591 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592 ARP(op=ARP.who_has, pdst=static_addr,
2593 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594 self.pg1.add_stream(p)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 capture = self.pg1.get_capture(0)
2599 def test_vrf_mode(self):
2600 """ NAT44 tenant VRF aware address pool mode """
2604 nat_ip1 = "10.0.0.10"
2605 nat_ip2 = "10.0.0.11"
2607 self.pg0.unconfig_ip4()
2608 self.pg1.unconfig_ip4()
2609 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611 self.pg0.set_table_ip4(vrf_id1)
2612 self.pg1.set_table_ip4(vrf_id2)
2613 self.pg0.config_ip4()
2614 self.pg1.config_ip4()
2616 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2624 pkts = self.create_stream_in(self.pg0, self.pg2)
2625 self.pg0.add_stream(pkts)
2626 self.pg_enable_capture(self.pg_interfaces)
2628 capture = self.pg2.get_capture(len(pkts))
2629 self.verify_capture_out(capture, nat_ip1)
2632 pkts = self.create_stream_in(self.pg1, self.pg2)
2633 self.pg1.add_stream(pkts)
2634 self.pg_enable_capture(self.pg_interfaces)
2636 capture = self.pg2.get_capture(len(pkts))
2637 self.verify_capture_out(capture, nat_ip2)
2639 self.pg0.unconfig_ip4()
2640 self.pg1.unconfig_ip4()
2641 self.pg0.set_table_ip4(0)
2642 self.pg1.set_table_ip4(0)
2643 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2646 def test_vrf_feature_independent(self):
2647 """ NAT44 tenant VRF independent address pool mode """
2649 nat_ip1 = "10.0.0.10"
2650 nat_ip2 = "10.0.0.11"
2652 self.nat44_add_address(nat_ip1)
2653 self.nat44_add_address(nat_ip2, vrf_id=99)
2654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2660 pkts = self.create_stream_in(self.pg0, self.pg2)
2661 self.pg0.add_stream(pkts)
2662 self.pg_enable_capture(self.pg_interfaces)
2664 capture = self.pg2.get_capture(len(pkts))
2665 self.verify_capture_out(capture, nat_ip1)
2668 pkts = self.create_stream_in(self.pg1, self.pg2)
2669 self.pg1.add_stream(pkts)
2670 self.pg_enable_capture(self.pg_interfaces)
2672 capture = self.pg2.get_capture(len(pkts))
2673 self.verify_capture_out(capture, nat_ip1)
2675 def test_dynamic_ipless_interfaces(self):
2676 """ NAT44 interfaces without configured IP address """
2678 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679 mactobinary(self.pg7.remote_mac),
2680 self.pg7.remote_ip4n,
2682 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683 mactobinary(self.pg8.remote_mac),
2684 self.pg8.remote_ip4n,
2687 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688 dst_address_length=32,
2689 next_hop_address=self.pg7.remote_ip4n,
2690 next_hop_sw_if_index=self.pg7.sw_if_index)
2691 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692 dst_address_length=32,
2693 next_hop_address=self.pg8.remote_ip4n,
2694 next_hop_sw_if_index=self.pg8.sw_if_index)
2696 self.nat44_add_address(self.nat_addr)
2697 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2702 pkts = self.create_stream_in(self.pg7, self.pg8)
2703 self.pg7.add_stream(pkts)
2704 self.pg_enable_capture(self.pg_interfaces)
2706 capture = self.pg8.get_capture(len(pkts))
2707 self.verify_capture_out(capture)
2710 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711 self.pg8.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg7.get_capture(len(pkts))
2715 self.verify_capture_in(capture, self.pg7)
2717 def test_static_ipless_interfaces(self):
2718 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2720 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721 mactobinary(self.pg7.remote_mac),
2722 self.pg7.remote_ip4n,
2724 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725 mactobinary(self.pg8.remote_mac),
2726 self.pg8.remote_ip4n,
2729 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730 dst_address_length=32,
2731 next_hop_address=self.pg7.remote_ip4n,
2732 next_hop_sw_if_index=self.pg7.sw_if_index)
2733 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg8.remote_ip4n,
2736 next_hop_sw_if_index=self.pg8.sw_if_index)
2738 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2744 pkts = self.create_stream_out(self.pg8)
2745 self.pg8.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg7.get_capture(len(pkts))
2749 self.verify_capture_in(capture, self.pg7)
2752 pkts = self.create_stream_in(self.pg7, self.pg8)
2753 self.pg7.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg8.get_capture(len(pkts))
2757 self.verify_capture_out(capture, self.nat_addr, True)
2759 def test_static_with_port_ipless_interfaces(self):
2760 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2762 self.tcp_port_out = 30606
2763 self.udp_port_out = 30607
2764 self.icmp_id_out = 30608
2766 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767 mactobinary(self.pg7.remote_mac),
2768 self.pg7.remote_ip4n,
2770 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771 mactobinary(self.pg8.remote_mac),
2772 self.pg8.remote_ip4n,
2775 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776 dst_address_length=32,
2777 next_hop_address=self.pg7.remote_ip4n,
2778 next_hop_sw_if_index=self.pg7.sw_if_index)
2779 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780 dst_address_length=32,
2781 next_hop_address=self.pg8.remote_ip4n,
2782 next_hop_sw_if_index=self.pg8.sw_if_index)
2784 self.nat44_add_address(self.nat_addr)
2785 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786 self.tcp_port_in, self.tcp_port_out,
2787 proto=IP_PROTOS.tcp)
2788 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789 self.udp_port_in, self.udp_port_out,
2790 proto=IP_PROTOS.udp)
2791 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792 self.icmp_id_in, self.icmp_id_out,
2793 proto=IP_PROTOS.icmp)
2794 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2799 pkts = self.create_stream_out(self.pg8)
2800 self.pg8.add_stream(pkts)
2801 self.pg_enable_capture(self.pg_interfaces)
2803 capture = self.pg7.get_capture(len(pkts))
2804 self.verify_capture_in(capture, self.pg7)
2807 pkts = self.create_stream_in(self.pg7, self.pg8)
2808 self.pg7.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 capture = self.pg8.get_capture(len(pkts))
2812 self.verify_capture_out(capture)
2814 def test_static_unknown_proto(self):
2815 """ 1:1 NAT translate packet with unknown protocol """
2816 nat_ip = "10.0.0.10"
2817 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2823 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2826 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827 TCP(sport=1234, dport=1234))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 p = self.pg1.get_capture(1)
2834 self.assertEqual(packet[IP].src, nat_ip)
2835 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836 self.assertTrue(packet.haslayer(GRE))
2837 self.check_ip_checksum(packet)
2839 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2843 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2846 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847 TCP(sport=1234, dport=1234))
2848 self.pg1.add_stream(p)
2849 self.pg_enable_capture(self.pg_interfaces)
2851 p = self.pg0.get_capture(1)
2854 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856 self.assertTrue(packet.haslayer(GRE))
2857 self.check_ip_checksum(packet)
2859 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2862 def test_hairpinning_static_unknown_proto(self):
2863 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2865 host = self.pg0.remote_hosts[0]
2866 server = self.pg0.remote_hosts[1]
2868 host_nat_ip = "10.0.0.10"
2869 server_nat_ip = "10.0.0.11"
2871 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2878 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879 IP(src=host.ip4, dst=server_nat_ip) /
2881 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882 TCP(sport=1234, dport=1234))
2883 self.pg0.add_stream(p)
2884 self.pg_enable_capture(self.pg_interfaces)
2886 p = self.pg0.get_capture(1)
2889 self.assertEqual(packet[IP].src, host_nat_ip)
2890 self.assertEqual(packet[IP].dst, server.ip4)
2891 self.assertTrue(packet.haslayer(GRE))
2892 self.check_ip_checksum(packet)
2894 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2898 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899 IP(src=server.ip4, dst=host_nat_ip) /
2901 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902 TCP(sport=1234, dport=1234))
2903 self.pg0.add_stream(p)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 p = self.pg0.get_capture(1)
2909 self.assertEqual(packet[IP].src, server_nat_ip)
2910 self.assertEqual(packet[IP].dst, host.ip4)
2911 self.assertTrue(packet.haslayer(GRE))
2912 self.check_ip_checksum(packet)
2914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2917 def test_unknown_proto(self):
2918 """ NAT44 translate packet with unknown protocol """
2919 self.nat44_add_address(self.nat_addr)
2920 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2925 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927 TCP(sport=self.tcp_port_in, dport=20))
2928 self.pg0.add_stream(p)
2929 self.pg_enable_capture(self.pg_interfaces)
2931 p = self.pg1.get_capture(1)
2933 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2936 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937 TCP(sport=1234, dport=1234))
2938 self.pg0.add_stream(p)
2939 self.pg_enable_capture(self.pg_interfaces)
2941 p = self.pg1.get_capture(1)
2944 self.assertEqual(packet[IP].src, self.nat_addr)
2945 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946 self.assertTrue(packet.haslayer(GRE))
2947 self.check_ip_checksum(packet)
2949 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2953 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2956 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957 TCP(sport=1234, dport=1234))
2958 self.pg1.add_stream(p)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 p = self.pg0.get_capture(1)
2964 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966 self.assertTrue(packet.haslayer(GRE))
2967 self.check_ip_checksum(packet)
2969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2972 def test_hairpinning_unknown_proto(self):
2973 """ NAT44 translate packet with unknown protocol - hairpinning """
2974 host = self.pg0.remote_hosts[0]
2975 server = self.pg0.remote_hosts[1]
2978 server_in_port = 5678
2979 server_out_port = 8765
2980 server_nat_ip = "10.0.0.11"
2982 self.nat44_add_address(self.nat_addr)
2983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2987 # add static mapping for server
2988 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2991 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992 IP(src=host.ip4, dst=server_nat_ip) /
2993 TCP(sport=host_in_port, dport=server_out_port))
2994 self.pg0.add_stream(p)
2995 self.pg_enable_capture(self.pg_interfaces)
2997 capture = self.pg0.get_capture(1)
2999 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000 IP(src=host.ip4, dst=server_nat_ip) /
3002 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003 TCP(sport=1234, dport=1234))
3004 self.pg0.add_stream(p)
3005 self.pg_enable_capture(self.pg_interfaces)
3007 p = self.pg0.get_capture(1)
3010 self.assertEqual(packet[IP].src, self.nat_addr)
3011 self.assertEqual(packet[IP].dst, server.ip4)
3012 self.assertTrue(packet.haslayer(GRE))
3013 self.check_ip_checksum(packet)
3015 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3019 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020 IP(src=server.ip4, dst=self.nat_addr) /
3022 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg0.get_capture(1)
3030 self.assertEqual(packet[IP].src, server_nat_ip)
3031 self.assertEqual(packet[IP].dst, host.ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3038 def test_output_feature(self):
3039 """ NAT44 interface output feature (in2out postrouting) """
3040 self.nat44_add_address(self.nat_addr)
3041 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3047 pkts = self.create_stream_in(self.pg0, self.pg3)
3048 self.pg0.add_stream(pkts)
3049 self.pg_enable_capture(self.pg_interfaces)
3051 capture = self.pg3.get_capture(len(pkts))
3052 self.verify_capture_out(capture)
3055 pkts = self.create_stream_out(self.pg3)
3056 self.pg3.add_stream(pkts)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 capture = self.pg0.get_capture(len(pkts))
3060 self.verify_capture_in(capture, self.pg0)
3062 # from non-NAT interface to NAT inside interface
3063 pkts = self.create_stream_in(self.pg2, self.pg0)
3064 self.pg2.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 capture = self.pg0.get_capture(len(pkts))
3068 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3070 def test_output_feature_vrf_aware(self):
3071 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072 nat_ip_vrf10 = "10.0.0.10"
3073 nat_ip_vrf20 = "10.0.0.20"
3075 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076 dst_address_length=32,
3077 next_hop_address=self.pg3.remote_ip4n,
3078 next_hop_sw_if_index=self.pg3.sw_if_index,
3080 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081 dst_address_length=32,
3082 next_hop_address=self.pg3.remote_ip4n,
3083 next_hop_sw_if_index=self.pg3.sw_if_index,
3086 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3094 pkts = self.create_stream_in(self.pg4, self.pg3)
3095 self.pg4.add_stream(pkts)
3096 self.pg_enable_capture(self.pg_interfaces)
3098 capture = self.pg3.get_capture(len(pkts))
3099 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3102 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103 self.pg3.add_stream(pkts)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg4.get_capture(len(pkts))
3107 self.verify_capture_in(capture, self.pg4)
3110 pkts = self.create_stream_in(self.pg6, self.pg3)
3111 self.pg6.add_stream(pkts)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 capture = self.pg3.get_capture(len(pkts))
3115 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3118 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119 self.pg3.add_stream(pkts)
3120 self.pg_enable_capture(self.pg_interfaces)
3122 capture = self.pg6.get_capture(len(pkts))
3123 self.verify_capture_in(capture, self.pg6)
3125 def test_output_feature_hairpinning(self):
3126 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127 host = self.pg0.remote_hosts[0]
3128 server = self.pg0.remote_hosts[1]
3131 server_in_port = 5678
3132 server_out_port = 8765
3134 self.nat44_add_address(self.nat_addr)
3135 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3139 # add static mapping for server
3140 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141 server_in_port, server_out_port,
3142 proto=IP_PROTOS.tcp)
3144 # send packet from host to server
3145 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146 IP(src=host.ip4, dst=self.nat_addr) /
3147 TCP(sport=host_in_port, dport=server_out_port))
3148 self.pg0.add_stream(p)
3149 self.pg_enable_capture(self.pg_interfaces)
3151 capture = self.pg0.get_capture(1)
3156 self.assertEqual(ip.src, self.nat_addr)
3157 self.assertEqual(ip.dst, server.ip4)
3158 self.assertNotEqual(tcp.sport, host_in_port)
3159 self.assertEqual(tcp.dport, server_in_port)
3160 self.check_tcp_checksum(p)
3161 host_out_port = tcp.sport
3163 self.logger.error(ppp("Unexpected or invalid packet:", p))
3166 # send reply from server to host
3167 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168 IP(src=server.ip4, dst=self.nat_addr) /
3169 TCP(sport=server_in_port, dport=host_out_port))
3170 self.pg0.add_stream(p)
3171 self.pg_enable_capture(self.pg_interfaces)
3173 capture = self.pg0.get_capture(1)
3178 self.assertEqual(ip.src, self.nat_addr)
3179 self.assertEqual(ip.dst, host.ip4)
3180 self.assertEqual(tcp.sport, server_out_port)
3181 self.assertEqual(tcp.dport, host_in_port)
3182 self.check_tcp_checksum(p)
3184 self.logger.error(ppp("Unexpected or invalid packet:", p))
3187 def test_one_armed_nat44(self):
3188 """ One armed NAT44 """
3189 remote_host = self.pg9.remote_hosts[0]
3190 local_host = self.pg9.remote_hosts[1]
3193 self.nat44_add_address(self.nat_addr)
3194 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3199 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200 IP(src=local_host.ip4, dst=remote_host.ip4) /
3201 TCP(sport=12345, dport=80))
3202 self.pg9.add_stream(p)
3203 self.pg_enable_capture(self.pg_interfaces)
3205 capture = self.pg9.get_capture(1)
3210 self.assertEqual(ip.src, self.nat_addr)
3211 self.assertEqual(ip.dst, remote_host.ip4)
3212 self.assertNotEqual(tcp.sport, 12345)
3213 external_port = tcp.sport
3214 self.assertEqual(tcp.dport, 80)
3215 self.check_tcp_checksum(p)
3216 self.check_ip_checksum(p)
3218 self.logger.error(ppp("Unexpected or invalid packet:", p))
3222 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223 IP(src=remote_host.ip4, dst=self.nat_addr) /
3224 TCP(sport=80, dport=external_port))
3225 self.pg9.add_stream(p)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 capture = self.pg9.get_capture(1)
3233 self.assertEqual(ip.src, remote_host.ip4)
3234 self.assertEqual(ip.dst, local_host.ip4)
3235 self.assertEqual(tcp.sport, 80)
3236 self.assertEqual(tcp.dport, 12345)
3237 self.check_tcp_checksum(p)
3238 self.check_ip_checksum(p)
3240 self.logger.error(ppp("Unexpected or invalid packet:", p))
3243 def test_one_armed_nat44_static(self):
3244 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3245 remote_host = self.pg9.remote_hosts[0]
3246 local_host = self.pg9.remote_hosts[1]
3251 self.vapi.nat44_forwarding_enable_disable(1)
3252 self.nat44_add_address(self.nat_addr, twice_nat=1)
3253 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3254 local_port, external_port,
3255 proto=IP_PROTOS.tcp, out2in_only=1,
3257 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3258 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3261 # from client to service
3262 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3263 IP(src=remote_host.ip4, dst=self.nat_addr) /
3264 TCP(sport=12345, dport=external_port))
3265 self.pg9.add_stream(p)
3266 self.pg_enable_capture(self.pg_interfaces)
3268 capture = self.pg9.get_capture(1)
3274 self.assertEqual(ip.dst, local_host.ip4)
3275 self.assertEqual(ip.src, self.nat_addr)
3276 self.assertEqual(tcp.dport, local_port)
3277 self.assertNotEqual(tcp.sport, 12345)
3278 eh_port_in = tcp.sport
3279 self.check_tcp_checksum(p)
3280 self.check_ip_checksum(p)
3282 self.logger.error(ppp("Unexpected or invalid packet:", p))
3285 # from service back to client
3286 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3287 IP(src=local_host.ip4, dst=self.nat_addr) /
3288 TCP(sport=local_port, dport=eh_port_in))
3289 self.pg9.add_stream(p)
3290 self.pg_enable_capture(self.pg_interfaces)
3292 capture = self.pg9.get_capture(1)
3297 self.assertEqual(ip.src, self.nat_addr)
3298 self.assertEqual(ip.dst, remote_host.ip4)
3299 self.assertEqual(tcp.sport, external_port)
3300 self.assertEqual(tcp.dport, 12345)
3301 self.check_tcp_checksum(p)
3302 self.check_ip_checksum(p)
3304 self.logger.error(ppp("Unexpected or invalid packet:", p))
3307 def test_del_session(self):
3308 """ Delete NAT44 session """
3309 self.nat44_add_address(self.nat_addr)
3310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3314 pkts = self.create_stream_in(self.pg0, self.pg1)
3315 self.pg0.add_stream(pkts)
3316 self.pg_enable_capture(self.pg_interfaces)
3318 capture = self.pg1.get_capture(len(pkts))
3320 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3321 nsessions = len(sessions)
3323 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3324 sessions[0].inside_port,
3325 sessions[0].protocol)
3326 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3327 sessions[1].outside_port,
3328 sessions[1].protocol,
3331 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3332 self.assertEqual(nsessions - len(sessions), 2)
3334 def test_set_get_reass(self):
3335 """ NAT44 set/get virtual fragmentation reassembly """
3336 reas_cfg1 = self.vapi.nat_get_reass()
3338 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3339 max_reass=reas_cfg1.ip4_max_reass * 2,
3340 max_frag=reas_cfg1.ip4_max_frag * 2)
3342 reas_cfg2 = self.vapi.nat_get_reass()
3344 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3345 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3346 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3348 self.vapi.nat_set_reass(drop_frag=1)
3349 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3351 def test_frag_in_order(self):
3352 """ NAT44 translate fragments arriving in order """
3353 self.nat44_add_address(self.nat_addr)
3354 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3358 data = "A" * 4 + "B" * 16 + "C" * 3
3359 self.tcp_port_in = random.randint(1025, 65535)
3361 reass = self.vapi.nat_reass_dump()
3362 reass_n_start = len(reass)
3365 pkts = self.create_stream_frag(self.pg0,
3366 self.pg1.remote_ip4,
3370 self.pg0.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 frags = self.pg1.get_capture(len(pkts))
3374 p = self.reass_frags_and_verify(frags,
3376 self.pg1.remote_ip4)
3377 self.assertEqual(p[TCP].dport, 20)
3378 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3379 self.tcp_port_out = p[TCP].sport
3380 self.assertEqual(data, p[Raw].load)
3383 pkts = self.create_stream_frag(self.pg1,
3388 self.pg1.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 frags = self.pg0.get_capture(len(pkts))
3392 p = self.reass_frags_and_verify(frags,
3393 self.pg1.remote_ip4,
3394 self.pg0.remote_ip4)
3395 self.assertEqual(p[TCP].sport, 20)
3396 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3397 self.assertEqual(data, p[Raw].load)
3399 reass = self.vapi.nat_reass_dump()
3400 reass_n_end = len(reass)
3402 self.assertEqual(reass_n_end - reass_n_start, 2)
3404 def test_reass_hairpinning(self):
3405 """ NAT44 fragments hairpinning """
3406 host = self.pg0.remote_hosts[0]
3407 server = self.pg0.remote_hosts[1]
3408 host_in_port = random.randint(1025, 65535)
3410 server_in_port = random.randint(1025, 65535)
3411 server_out_port = random.randint(1025, 65535)
3412 data = "A" * 4 + "B" * 16 + "C" * 3
3414 self.nat44_add_address(self.nat_addr)
3415 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3416 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3418 # add static mapping for server
3419 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3420 server_in_port, server_out_port,
3421 proto=IP_PROTOS.tcp)
3423 # send packet from host to server
3424 pkts = self.create_stream_frag(self.pg0,
3429 self.pg0.add_stream(pkts)
3430 self.pg_enable_capture(self.pg_interfaces)
3432 frags = self.pg0.get_capture(len(pkts))
3433 p = self.reass_frags_and_verify(frags,
3436 self.assertNotEqual(p[TCP].sport, host_in_port)
3437 self.assertEqual(p[TCP].dport, server_in_port)
3438 self.assertEqual(data, p[Raw].load)
3440 def test_frag_out_of_order(self):
3441 """ NAT44 translate fragments arriving out of order """
3442 self.nat44_add_address(self.nat_addr)
3443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3447 data = "A" * 4 + "B" * 16 + "C" * 3
3448 random.randint(1025, 65535)
3451 pkts = self.create_stream_frag(self.pg0,
3452 self.pg1.remote_ip4,
3457 self.pg0.add_stream(pkts)
3458 self.pg_enable_capture(self.pg_interfaces)
3460 frags = self.pg1.get_capture(len(pkts))
3461 p = self.reass_frags_and_verify(frags,
3463 self.pg1.remote_ip4)
3464 self.assertEqual(p[TCP].dport, 20)
3465 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3466 self.tcp_port_out = p[TCP].sport
3467 self.assertEqual(data, p[Raw].load)
3470 pkts = self.create_stream_frag(self.pg1,
3476 self.pg1.add_stream(pkts)
3477 self.pg_enable_capture(self.pg_interfaces)
3479 frags = self.pg0.get_capture(len(pkts))
3480 p = self.reass_frags_and_verify(frags,
3481 self.pg1.remote_ip4,
3482 self.pg0.remote_ip4)
3483 self.assertEqual(p[TCP].sport, 20)
3484 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3485 self.assertEqual(data, p[Raw].load)
3487 def test_port_restricted(self):
3488 """ Port restricted NAT44 (MAP-E CE) """
3489 self.nat44_add_address(self.nat_addr)
3490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3493 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3494 "psid-offset 6 psid-len 6")
3496 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3497 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3498 TCP(sport=4567, dport=22))
3499 self.pg0.add_stream(p)
3500 self.pg_enable_capture(self.pg_interfaces)
3502 capture = self.pg1.get_capture(1)
3507 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3508 self.assertEqual(ip.src, self.nat_addr)
3509 self.assertEqual(tcp.dport, 22)
3510 self.assertNotEqual(tcp.sport, 4567)
3511 self.assertEqual((tcp.sport >> 6) & 63, 10)
3512 self.check_tcp_checksum(p)
3513 self.check_ip_checksum(p)
3515 self.logger.error(ppp("Unexpected or invalid packet:", p))
3518 def test_twice_nat(self):
3520 twice_nat_addr = '10.0.1.3'
3525 self.nat44_add_address(self.nat_addr)
3526 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3527 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3528 port_in, port_out, proto=IP_PROTOS.tcp,
3530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3534 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3535 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3536 TCP(sport=eh_port_out, dport=port_out))
3537 self.pg1.add_stream(p)
3538 self.pg_enable_capture(self.pg_interfaces)
3540 capture = self.pg0.get_capture(1)
3545 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3546 self.assertEqual(ip.src, twice_nat_addr)
3547 self.assertEqual(tcp.dport, port_in)
3548 self.assertNotEqual(tcp.sport, eh_port_out)
3549 eh_port_in = tcp.sport
3550 self.check_tcp_checksum(p)
3551 self.check_ip_checksum(p)
3553 self.logger.error(ppp("Unexpected or invalid packet:", p))
3556 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3557 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3558 TCP(sport=port_in, dport=eh_port_in))
3559 self.pg0.add_stream(p)
3560 self.pg_enable_capture(self.pg_interfaces)
3562 capture = self.pg1.get_capture(1)
3567 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3568 self.assertEqual(ip.src, self.nat_addr)
3569 self.assertEqual(tcp.dport, eh_port_out)
3570 self.assertEqual(tcp.sport, port_out)
3571 self.check_tcp_checksum(p)
3572 self.check_ip_checksum(p)
3574 self.logger.error(ppp("Unexpected or invalid packet:", p))
3577 def test_twice_nat_lb(self):
3578 """ Twice NAT44 local service load balancing """
3579 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3580 twice_nat_addr = '10.0.1.3'
3585 server1 = self.pg0.remote_hosts[0]
3586 server2 = self.pg0.remote_hosts[1]
3588 locals = [{'addr': server1.ip4n,
3591 {'addr': server2.ip4n,
3595 self.nat44_add_address(self.nat_addr)
3596 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3598 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3602 local_num=len(locals),
3604 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3605 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3608 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3609 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3610 TCP(sport=eh_port_out, dport=external_port))
3611 self.pg1.add_stream(p)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 capture = self.pg0.get_capture(1)
3620 self.assertEqual(ip.src, twice_nat_addr)
3621 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3622 if ip.dst == server1.ip4:
3626 self.assertNotEqual(tcp.sport, eh_port_out)
3627 eh_port_in = tcp.sport
3628 self.assertEqual(tcp.dport, local_port)
3629 self.check_tcp_checksum(p)
3630 self.check_ip_checksum(p)
3632 self.logger.error(ppp("Unexpected or invalid packet:", p))
3635 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3636 IP(src=server.ip4, dst=twice_nat_addr) /
3637 TCP(sport=local_port, dport=eh_port_in))
3638 self.pg0.add_stream(p)
3639 self.pg_enable_capture(self.pg_interfaces)
3641 capture = self.pg1.get_capture(1)
3646 self.assertEqual(ip.src, self.nat_addr)
3647 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3648 self.assertEqual(tcp.sport, external_port)
3649 self.assertEqual(tcp.dport, eh_port_out)
3650 self.check_tcp_checksum(p)
3651 self.check_ip_checksum(p)
3653 self.logger.error(ppp("Unexpected or invalid packet:", p))
3656 def test_twice_nat_interface_addr(self):
3657 """ Acquire twice NAT44 addresses from interface """
3658 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3660 # no address in NAT pool
3661 adresses = self.vapi.nat44_address_dump()
3662 self.assertEqual(0, len(adresses))
3664 # configure interface address and check NAT address pool
3665 self.pg7.config_ip4()
3666 adresses = self.vapi.nat44_address_dump()
3667 self.assertEqual(1, len(adresses))
3668 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3669 self.assertEqual(adresses[0].twice_nat, 1)
3671 # remove interface address and check NAT address pool
3672 self.pg7.unconfig_ip4()
3673 adresses = self.vapi.nat44_address_dump()
3674 self.assertEqual(0, len(adresses))
3676 def test_ipfix_max_frags(self):
3677 """ IPFIX logging maximum fragments pending reassembly exceeded """
3678 self.nat44_add_address(self.nat_addr)
3679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3682 self.vapi.nat_set_reass(max_frag=0)
3683 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3684 src_address=self.pg3.local_ip4n,
3686 template_interval=10)
3687 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3688 src_port=self.ipfix_src_port)
3690 data = "A" * 4 + "B" * 16 + "C" * 3
3691 self.tcp_port_in = random.randint(1025, 65535)
3692 pkts = self.create_stream_frag(self.pg0,
3693 self.pg1.remote_ip4,
3697 self.pg0.add_stream(pkts[-1])
3698 self.pg_enable_capture(self.pg_interfaces)
3700 frags = self.pg1.get_capture(0)
3701 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3702 capture = self.pg3.get_capture(9)
3703 ipfix = IPFIXDecoder()
3704 # first load template
3706 self.assertTrue(p.haslayer(IPFIX))
3707 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3708 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3709 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3710 self.assertEqual(p[UDP].dport, 4739)
3711 self.assertEqual(p[IPFIX].observationDomainID,
3712 self.ipfix_domain_id)
3713 if p.haslayer(Template):
3714 ipfix.add_template(p.getlayer(Template))
3715 # verify events in data set
3717 if p.haslayer(Data):
3718 data = ipfix.decode_data_set(p.getlayer(Set))
3719 self.verify_ipfix_max_fragments_ip4(data, 0,
3720 self.pg0.remote_ip4n)
3723 super(TestNAT44, self).tearDown()
3724 if not self.vpp_dead:
3725 self.logger.info(self.vapi.cli("show nat44 addresses"))
3726 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3727 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3728 self.logger.info(self.vapi.cli("show nat44 interface address"))
3729 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3730 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3731 self.vapi.cli("nat addr-port-assignment-alg default")
3735 class TestNAT44Out2InDPO(MethodHolder):
3736 """ NAT44 Test Cases using out2in DPO """
3739 def setUpConstants(cls):
3740 super(TestNAT44Out2InDPO, cls).setUpConstants()
3741 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3744 def setUpClass(cls):
3745 super(TestNAT44Out2InDPO, cls).setUpClass()
3748 cls.tcp_port_in = 6303
3749 cls.tcp_port_out = 6303
3750 cls.udp_port_in = 6304
3751 cls.udp_port_out = 6304
3752 cls.icmp_id_in = 6305
3753 cls.icmp_id_out = 6305
3754 cls.nat_addr = '10.0.0.3'
3755 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3756 cls.dst_ip4 = '192.168.70.1'
3758 cls.create_pg_interfaces(range(2))
3761 cls.pg0.config_ip4()
3762 cls.pg0.resolve_arp()
3765 cls.pg1.config_ip6()
3766 cls.pg1.resolve_ndp()
3768 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3769 dst_address_length=0,
3770 next_hop_address=cls.pg1.remote_ip6n,
3771 next_hop_sw_if_index=cls.pg1.sw_if_index)
3774 super(TestNAT44Out2InDPO, cls).tearDownClass()
3777 def configure_xlat(self):
3778 self.dst_ip6_pfx = '1:2:3::'
3779 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3781 self.dst_ip6_pfx_len = 96
3782 self.src_ip6_pfx = '4:5:6::'
3783 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3785 self.src_ip6_pfx_len = 96
3786 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3787 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3788 '\x00\x00\x00\x00', 0, is_translation=1,
3791 def test_464xlat_ce(self):
3792 """ Test 464XLAT CE with NAT44 """
3794 self.configure_xlat()
3796 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3797 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3799 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3800 self.dst_ip6_pfx_len)
3801 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3802 self.src_ip6_pfx_len)
3805 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3806 self.pg0.add_stream(pkts)
3807 self.pg_enable_capture(self.pg_interfaces)
3809 capture = self.pg1.get_capture(len(pkts))
3810 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3813 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3815 self.pg1.add_stream(pkts)
3816 self.pg_enable_capture(self.pg_interfaces)
3818 capture = self.pg0.get_capture(len(pkts))
3819 self.verify_capture_in(capture, self.pg0)
3821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3823 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3824 self.nat_addr_n, is_add=0)
3826 def test_464xlat_ce_no_nat(self):
3827 """ Test 464XLAT CE without NAT44 """
3829 self.configure_xlat()
3831 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3832 self.dst_ip6_pfx_len)
3833 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3834 self.src_ip6_pfx_len)
3836 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3837 self.pg0.add_stream(pkts)
3838 self.pg_enable_capture(self.pg_interfaces)
3840 capture = self.pg1.get_capture(len(pkts))
3841 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3842 nat_ip=out_dst_ip6, same_port=True)
3844 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3845 self.pg1.add_stream(pkts)
3846 self.pg_enable_capture(self.pg_interfaces)
3848 capture = self.pg0.get_capture(len(pkts))
3849 self.verify_capture_in(capture, self.pg0)
3852 class TestDeterministicNAT(MethodHolder):
3853 """ Deterministic NAT Test Cases """
3856 def setUpConstants(cls):
3857 super(TestDeterministicNAT, cls).setUpConstants()
3858 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3861 def setUpClass(cls):
3862 super(TestDeterministicNAT, cls).setUpClass()
3865 cls.tcp_port_in = 6303
3866 cls.tcp_external_port = 6303
3867 cls.udp_port_in = 6304
3868 cls.udp_external_port = 6304
3869 cls.icmp_id_in = 6305
3870 cls.nat_addr = '10.0.0.3'
3872 cls.create_pg_interfaces(range(3))
3873 cls.interfaces = list(cls.pg_interfaces)
3875 for i in cls.interfaces:
3880 cls.pg0.generate_remote_hosts(2)
3881 cls.pg0.configure_ipv4_neighbors()
3884 super(TestDeterministicNAT, cls).tearDownClass()
3887 def create_stream_in(self, in_if, out_if, ttl=64):
3889 Create packet stream for inside network
3891 :param in_if: Inside interface
3892 :param out_if: Outside interface
3893 :param ttl: TTL of generated packets
3897 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3898 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3899 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3903 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3904 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3905 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3909 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3910 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3911 ICMP(id=self.icmp_id_in, type='echo-request'))
3916 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3918 Create packet stream for outside network
3920 :param out_if: Outside interface
3921 :param dst_ip: Destination IP address (Default use global NAT address)
3922 :param ttl: TTL of generated packets
3925 dst_ip = self.nat_addr
3928 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3929 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3930 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3934 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3935 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3936 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3940 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3941 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3942 ICMP(id=self.icmp_external_id, type='echo-reply'))
3947 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3949 Verify captured packets on outside network
3951 :param capture: Captured packets
3952 :param nat_ip: Translated IP address (Default use global NAT address)
3953 :param same_port: Sorce port number is not translated (Default False)
3954 :param packet_num: Expected number of packets (Default 3)
3957 nat_ip = self.nat_addr
3958 self.assertEqual(packet_num, len(capture))
3959 for packet in capture:
3961 self.assertEqual(packet[IP].src, nat_ip)
3962 if packet.haslayer(TCP):
3963 self.tcp_port_out = packet[TCP].sport
3964 elif packet.haslayer(UDP):
3965 self.udp_port_out = packet[UDP].sport
3967 self.icmp_external_id = packet[ICMP].id
3969 self.logger.error(ppp("Unexpected or invalid packet "
3970 "(outside network):", packet))
3973 def initiate_tcp_session(self, in_if, out_if):
3975 Initiates TCP session
3977 :param in_if: Inside interface
3978 :param out_if: Outside interface
3981 # SYN packet in->out
3982 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3983 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3984 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3987 self.pg_enable_capture(self.pg_interfaces)
3989 capture = out_if.get_capture(1)
3991 self.tcp_port_out = p[TCP].sport
3993 # SYN + ACK packet out->in
3994 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3995 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3996 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3998 out_if.add_stream(p)
3999 self.pg_enable_capture(self.pg_interfaces)
4001 in_if.get_capture(1)
4003 # ACK packet in->out
4004 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4005 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4006 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4009 self.pg_enable_capture(self.pg_interfaces)
4011 out_if.get_capture(1)
4014 self.logger.error("TCP 3 way handshake failed")
4017 def verify_ipfix_max_entries_per_user(self, data):
4019 Verify IPFIX maximum entries per user exceeded event
4021 :param data: Decoded IPFIX data records
4023 self.assertEqual(1, len(data))
4026 self.assertEqual(ord(record[230]), 13)
4027 # natQuotaExceededEvent
4028 self.assertEqual('\x03\x00\x00\x00', record[466])
4030 self.assertEqual('\xe8\x03\x00\x00', record[473])
4032 self.assertEqual(self.pg0.remote_ip4n, record[8])
4034 def test_deterministic_mode(self):
4035 """ NAT plugin run deterministic mode """
4036 in_addr = '172.16.255.0'
4037 out_addr = '172.17.255.50'
4038 in_addr_t = '172.16.255.20'
4039 in_addr_n = socket.inet_aton(in_addr)
4040 out_addr_n = socket.inet_aton(out_addr)
4041 in_addr_t_n = socket.inet_aton(in_addr_t)
4045 nat_config = self.vapi.nat_show_config()
4046 self.assertEqual(1, nat_config.deterministic)
4048 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4050 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4051 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4052 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4053 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4055 deterministic_mappings = self.vapi.nat_det_map_dump()
4056 self.assertEqual(len(deterministic_mappings), 1)
4057 dsm = deterministic_mappings[0]
4058 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4059 self.assertEqual(in_plen, dsm.in_plen)
4060 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4061 self.assertEqual(out_plen, dsm.out_plen)
4063 self.clear_nat_det()
4064 deterministic_mappings = self.vapi.nat_det_map_dump()
4065 self.assertEqual(len(deterministic_mappings), 0)
4067 def test_set_timeouts(self):
4068 """ Set deterministic NAT timeouts """
4069 timeouts_before = self.vapi.nat_det_get_timeouts()
4071 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4072 timeouts_before.tcp_established + 10,
4073 timeouts_before.tcp_transitory + 10,
4074 timeouts_before.icmp + 10)
4076 timeouts_after = self.vapi.nat_det_get_timeouts()
4078 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4079 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4080 self.assertNotEqual(timeouts_before.tcp_established,
4081 timeouts_after.tcp_established)
4082 self.assertNotEqual(timeouts_before.tcp_transitory,
4083 timeouts_after.tcp_transitory)
4085 def test_det_in(self):
4086 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4088 nat_ip = "10.0.0.10"
4090 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4092 socket.inet_aton(nat_ip),
4094 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4095 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4099 pkts = self.create_stream_in(self.pg0, self.pg1)
4100 self.pg0.add_stream(pkts)
4101 self.pg_enable_capture(self.pg_interfaces)
4103 capture = self.pg1.get_capture(len(pkts))
4104 self.verify_capture_out(capture, nat_ip)
4107 pkts = self.create_stream_out(self.pg1, nat_ip)
4108 self.pg1.add_stream(pkts)
4109 self.pg_enable_capture(self.pg_interfaces)
4111 capture = self.pg0.get_capture(len(pkts))
4112 self.verify_capture_in(capture, self.pg0)
4115 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4116 self.assertEqual(len(sessions), 3)
4120 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4121 self.assertEqual(s.in_port, self.tcp_port_in)
4122 self.assertEqual(s.out_port, self.tcp_port_out)
4123 self.assertEqual(s.ext_port, self.tcp_external_port)
4127 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4128 self.assertEqual(s.in_port, self.udp_port_in)
4129 self.assertEqual(s.out_port, self.udp_port_out)
4130 self.assertEqual(s.ext_port, self.udp_external_port)
4134 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4135 self.assertEqual(s.in_port, self.icmp_id_in)
4136 self.assertEqual(s.out_port, self.icmp_external_id)
4138 def test_multiple_users(self):
4139 """ Deterministic NAT multiple users """
4141 nat_ip = "10.0.0.10"
4143 external_port = 6303
4145 host0 = self.pg0.remote_hosts[0]
4146 host1 = self.pg0.remote_hosts[1]
4148 self.vapi.nat_det_add_del_map(host0.ip4n,
4150 socket.inet_aton(nat_ip),
4152 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4153 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4157 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4158 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4159 TCP(sport=port_in, dport=external_port))
4160 self.pg0.add_stream(p)
4161 self.pg_enable_capture(self.pg_interfaces)
4163 capture = self.pg1.get_capture(1)
4168 self.assertEqual(ip.src, nat_ip)
4169 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4170 self.assertEqual(tcp.dport, external_port)
4171 port_out0 = tcp.sport
4173 self.logger.error(ppp("Unexpected or invalid packet:", p))
4177 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4178 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4179 TCP(sport=port_in, dport=external_port))
4180 self.pg0.add_stream(p)
4181 self.pg_enable_capture(self.pg_interfaces)
4183 capture = self.pg1.get_capture(1)
4188 self.assertEqual(ip.src, nat_ip)
4189 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4190 self.assertEqual(tcp.dport, external_port)
4191 port_out1 = tcp.sport
4193 self.logger.error(ppp("Unexpected or invalid packet:", p))
4196 dms = self.vapi.nat_det_map_dump()
4197 self.assertEqual(1, len(dms))
4198 self.assertEqual(2, dms[0].ses_num)
4201 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4202 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4203 TCP(sport=external_port, dport=port_out0))
4204 self.pg1.add_stream(p)
4205 self.pg_enable_capture(self.pg_interfaces)
4207 capture = self.pg0.get_capture(1)
4212 self.assertEqual(ip.src, self.pg1.remote_ip4)
4213 self.assertEqual(ip.dst, host0.ip4)
4214 self.assertEqual(tcp.dport, port_in)
4215 self.assertEqual(tcp.sport, external_port)
4217 self.logger.error(ppp("Unexpected or invalid packet:", p))
4221 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4222 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4223 TCP(sport=external_port, dport=port_out1))
4224 self.pg1.add_stream(p)
4225 self.pg_enable_capture(self.pg_interfaces)
4227 capture = self.pg0.get_capture(1)
4232 self.assertEqual(ip.src, self.pg1.remote_ip4)
4233 self.assertEqual(ip.dst, host1.ip4)
4234 self.assertEqual(tcp.dport, port_in)
4235 self.assertEqual(tcp.sport, external_port)
4237 self.logger.error(ppp("Unexpected or invalid packet", p))
4240 # session close api test
4241 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4243 self.pg1.remote_ip4n,
4245 dms = self.vapi.nat_det_map_dump()
4246 self.assertEqual(dms[0].ses_num, 1)
4248 self.vapi.nat_det_close_session_in(host0.ip4n,
4250 self.pg1.remote_ip4n,
4252 dms = self.vapi.nat_det_map_dump()
4253 self.assertEqual(dms[0].ses_num, 0)
4255 def test_tcp_session_close_detection_in(self):
4256 """ Deterministic NAT TCP session close from inside network """
4257 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4259 socket.inet_aton(self.nat_addr),
4261 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4262 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4265 self.initiate_tcp_session(self.pg0, self.pg1)
4267 # close the session from inside
4269 # FIN packet in -> out
4270 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4271 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4272 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4274 self.pg0.add_stream(p)
4275 self.pg_enable_capture(self.pg_interfaces)
4277 self.pg1.get_capture(1)
4281 # ACK packet out -> in
4282 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4283 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4284 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4288 # FIN packet out -> in
4289 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4290 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4291 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4295 self.pg1.add_stream(pkts)
4296 self.pg_enable_capture(self.pg_interfaces)
4298 self.pg0.get_capture(2)
4300 # ACK packet in -> out
4301 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4303 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4305 self.pg0.add_stream(p)
4306 self.pg_enable_capture(self.pg_interfaces)
4308 self.pg1.get_capture(1)
4310 # Check if deterministic NAT44 closed the session
4311 dms = self.vapi.nat_det_map_dump()
4312 self.assertEqual(0, dms[0].ses_num)
4314 self.logger.error("TCP session termination failed")
4317 def test_tcp_session_close_detection_out(self):
4318 """ Deterministic NAT TCP session close from outside network """
4319 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4321 socket.inet_aton(self.nat_addr),
4323 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4324 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4327 self.initiate_tcp_session(self.pg0, self.pg1)
4329 # close the session from outside
4331 # FIN packet out -> in
4332 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4333 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4334 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4336 self.pg1.add_stream(p)
4337 self.pg_enable_capture(self.pg_interfaces)
4339 self.pg0.get_capture(1)
4343 # ACK packet in -> out
4344 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4345 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4346 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4350 # ACK packet in -> out
4351 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4352 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4353 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4357 self.pg0.add_stream(pkts)
4358 self.pg_enable_capture(self.pg_interfaces)
4360 self.pg1.get_capture(2)
4362 # ACK packet out -> in
4363 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4364 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4365 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4367 self.pg1.add_stream(p)
4368 self.pg_enable_capture(self.pg_interfaces)
4370 self.pg0.get_capture(1)
4372 # Check if deterministic NAT44 closed the session
4373 dms = self.vapi.nat_det_map_dump()
4374 self.assertEqual(0, dms[0].ses_num)
4376 self.logger.error("TCP session termination failed")
4379 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4380 def test_session_timeout(self):
4381 """ Deterministic NAT session timeouts """
4382 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4384 socket.inet_aton(self.nat_addr),
4386 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4387 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4390 self.initiate_tcp_session(self.pg0, self.pg1)
4391 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4392 pkts = self.create_stream_in(self.pg0, self.pg1)
4393 self.pg0.add_stream(pkts)
4394 self.pg_enable_capture(self.pg_interfaces)
4396 capture = self.pg1.get_capture(len(pkts))
4399 dms = self.vapi.nat_det_map_dump()
4400 self.assertEqual(0, dms[0].ses_num)
4402 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4403 def test_session_limit_per_user(self):
4404 """ Deterministic NAT maximum sessions per user limit """
4405 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4407 socket.inet_aton(self.nat_addr),
4409 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4410 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4412 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4413 src_address=self.pg2.local_ip4n,
4415 template_interval=10)
4416 self.vapi.nat_ipfix()
4419 for port in range(1025, 2025):
4420 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4421 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4422 UDP(sport=port, dport=port))
4425 self.pg0.add_stream(pkts)
4426 self.pg_enable_capture(self.pg_interfaces)
4428 capture = self.pg1.get_capture(len(pkts))
4430 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4431 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4432 UDP(sport=3001, dport=3002))
4433 self.pg0.add_stream(p)
4434 self.pg_enable_capture(self.pg_interfaces)
4436 capture = self.pg1.assert_nothing_captured()
4438 # verify ICMP error packet
4439 capture = self.pg0.get_capture(1)
4441 self.assertTrue(p.haslayer(ICMP))
4443 self.assertEqual(icmp.type, 3)
4444 self.assertEqual(icmp.code, 1)
4445 self.assertTrue(icmp.haslayer(IPerror))
4446 inner_ip = icmp[IPerror]
4447 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4448 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4450 dms = self.vapi.nat_det_map_dump()
4452 self.assertEqual(1000, dms[0].ses_num)
4454 # verify IPFIX logging
4455 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4457 capture = self.pg2.get_capture(2)
4458 ipfix = IPFIXDecoder()
4459 # first load template
4461 self.assertTrue(p.haslayer(IPFIX))
4462 if p.haslayer(Template):
4463 ipfix.add_template(p.getlayer(Template))
4464 # verify events in data set
4466 if p.haslayer(Data):
4467 data = ipfix.decode_data_set(p.getlayer(Set))
4468 self.verify_ipfix_max_entries_per_user(data)
4470 def clear_nat_det(self):
4472 Clear deterministic NAT configuration.
4474 self.vapi.nat_ipfix(enable=0)
4475 self.vapi.nat_det_set_timeouts()
4476 deterministic_mappings = self.vapi.nat_det_map_dump()
4477 for dsm in deterministic_mappings:
4478 self.vapi.nat_det_add_del_map(dsm.in_addr,
4484 interfaces = self.vapi.nat44_interface_dump()
4485 for intf in interfaces:
4486 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4491 super(TestDeterministicNAT, self).tearDown()
4492 if not self.vpp_dead:
4493 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4495 self.vapi.cli("show nat44 deterministic mappings"))
4497 self.vapi.cli("show nat44 deterministic timeouts"))
4499 self.vapi.cli("show nat44 deterministic sessions"))
4500 self.clear_nat_det()
4503 class TestNAT64(MethodHolder):
4504 """ NAT64 Test Cases """
4507 def setUpConstants(cls):
4508 super(TestNAT64, cls).setUpConstants()
4509 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4510 "nat64 st hash buckets 256", "}"])
4513 def setUpClass(cls):
4514 super(TestNAT64, cls).setUpClass()
4517 cls.tcp_port_in = 6303
4518 cls.tcp_port_out = 6303
4519 cls.udp_port_in = 6304
4520 cls.udp_port_out = 6304
4521 cls.icmp_id_in = 6305
4522 cls.icmp_id_out = 6305
4523 cls.nat_addr = '10.0.0.3'
4524 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4526 cls.vrf1_nat_addr = '10.0.10.3'
4527 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4529 cls.ipfix_src_port = 4739
4530 cls.ipfix_domain_id = 1
4532 cls.create_pg_interfaces(range(5))
4533 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4534 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4535 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4537 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4539 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4541 cls.pg0.generate_remote_hosts(2)
4543 for i in cls.ip6_interfaces:
4546 i.configure_ipv6_neighbors()
4548 for i in cls.ip4_interfaces:
4554 cls.pg3.config_ip4()
4555 cls.pg3.resolve_arp()
4556 cls.pg3.config_ip6()
4557 cls.pg3.configure_ipv6_neighbors()
4560 super(TestNAT64, cls).tearDownClass()
4563 def test_pool(self):
4564 """ Add/delete address to NAT64 pool """
4565 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4567 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4569 addresses = self.vapi.nat64_pool_addr_dump()
4570 self.assertEqual(len(addresses), 1)
4571 self.assertEqual(addresses[0].address, nat_addr)
4573 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4575 addresses = self.vapi.nat64_pool_addr_dump()
4576 self.assertEqual(len(addresses), 0)
4578 def test_interface(self):
4579 """ Enable/disable NAT64 feature on the interface """
4580 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4581 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4583 interfaces = self.vapi.nat64_interface_dump()
4584 self.assertEqual(len(interfaces), 2)
4587 for intf in interfaces:
4588 if intf.sw_if_index == self.pg0.sw_if_index:
4589 self.assertEqual(intf.is_inside, 1)
4591 elif intf.sw_if_index == self.pg1.sw_if_index:
4592 self.assertEqual(intf.is_inside, 0)
4594 self.assertTrue(pg0_found)
4595 self.assertTrue(pg1_found)
4597 features = self.vapi.cli("show interface features pg0")
4598 self.assertNotEqual(features.find('nat64-in2out'), -1)
4599 features = self.vapi.cli("show interface features pg1")
4600 self.assertNotEqual(features.find('nat64-out2in'), -1)
4602 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4603 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4605 interfaces = self.vapi.nat64_interface_dump()
4606 self.assertEqual(len(interfaces), 0)
4608 def test_static_bib(self):
4609 """ Add/delete static BIB entry """
4610 in_addr = socket.inet_pton(socket.AF_INET6,
4611 '2001:db8:85a3::8a2e:370:7334')
4612 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4615 proto = IP_PROTOS.tcp
4617 self.vapi.nat64_add_del_static_bib(in_addr,
4622 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4627 self.assertEqual(bibe.i_addr, in_addr)
4628 self.assertEqual(bibe.o_addr, out_addr)
4629 self.assertEqual(bibe.i_port, in_port)
4630 self.assertEqual(bibe.o_port, out_port)
4631 self.assertEqual(static_bib_num, 1)
4633 self.vapi.nat64_add_del_static_bib(in_addr,
4639 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4644 self.assertEqual(static_bib_num, 0)
4646 def test_set_timeouts(self):
4647 """ Set NAT64 timeouts """
4648 # verify default values
4649 timeouts = self.vapi.nat64_get_timeouts()
4650 self.assertEqual(timeouts.udp, 300)
4651 self.assertEqual(timeouts.icmp, 60)
4652 self.assertEqual(timeouts.tcp_trans, 240)
4653 self.assertEqual(timeouts.tcp_est, 7440)
4654 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4656 # set and verify custom values
4657 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4658 tcp_est=7450, tcp_incoming_syn=10)
4659 timeouts = self.vapi.nat64_get_timeouts()
4660 self.assertEqual(timeouts.udp, 200)
4661 self.assertEqual(timeouts.icmp, 30)
4662 self.assertEqual(timeouts.tcp_trans, 250)
4663 self.assertEqual(timeouts.tcp_est, 7450)
4664 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4666 def test_dynamic(self):
4667 """ NAT64 dynamic translation test """
4668 self.tcp_port_in = 6303
4669 self.udp_port_in = 6304
4670 self.icmp_id_in = 6305
4672 ses_num_start = self.nat64_get_ses_num()
4674 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4676 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4677 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4680 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4681 self.pg0.add_stream(pkts)
4682 self.pg_enable_capture(self.pg_interfaces)
4684 capture = self.pg1.get_capture(len(pkts))
4685 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4686 dst_ip=self.pg1.remote_ip4)
4689 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4690 self.pg1.add_stream(pkts)
4691 self.pg_enable_capture(self.pg_interfaces)
4693 capture = self.pg0.get_capture(len(pkts))
4694 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4695 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4698 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4699 self.pg0.add_stream(pkts)
4700 self.pg_enable_capture(self.pg_interfaces)
4702 capture = self.pg1.get_capture(len(pkts))
4703 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4704 dst_ip=self.pg1.remote_ip4)
4707 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4708 self.pg1.add_stream(pkts)
4709 self.pg_enable_capture(self.pg_interfaces)
4711 capture = self.pg0.get_capture(len(pkts))
4712 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4714 ses_num_end = self.nat64_get_ses_num()
4716 self.assertEqual(ses_num_end - ses_num_start, 3)
4718 # tenant with specific VRF
4719 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4720 self.vrf1_nat_addr_n,
4721 vrf_id=self.vrf1_id)
4722 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4724 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4725 self.pg2.add_stream(pkts)
4726 self.pg_enable_capture(self.pg_interfaces)
4728 capture = self.pg1.get_capture(len(pkts))
4729 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4730 dst_ip=self.pg1.remote_ip4)
4732 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4733 self.pg1.add_stream(pkts)
4734 self.pg_enable_capture(self.pg_interfaces)
4736 capture = self.pg2.get_capture(len(pkts))
4737 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4739 def test_static(self):
4740 """ NAT64 static translation test """
4741 self.tcp_port_in = 60303
4742 self.udp_port_in = 60304
4743 self.icmp_id_in = 60305
4744 self.tcp_port_out = 60303
4745 self.udp_port_out = 60304
4746 self.icmp_id_out = 60305
4748 ses_num_start = self.nat64_get_ses_num()
4750 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4752 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4753 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4755 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4760 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4765 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4772 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4773 self.pg0.add_stream(pkts)
4774 self.pg_enable_capture(self.pg_interfaces)
4776 capture = self.pg1.get_capture(len(pkts))
4777 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4778 dst_ip=self.pg1.remote_ip4, same_port=True)
4781 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4782 self.pg1.add_stream(pkts)
4783 self.pg_enable_capture(self.pg_interfaces)
4785 capture = self.pg0.get_capture(len(pkts))
4786 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4787 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4789 ses_num_end = self.nat64_get_ses_num()
4791 self.assertEqual(ses_num_end - ses_num_start, 3)
4793 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4794 def test_session_timeout(self):
4795 """ NAT64 session timeout """
4796 self.icmp_id_in = 1234
4797 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4799 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4800 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4801 self.vapi.nat64_set_timeouts(icmp=5)
4803 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4804 self.pg0.add_stream(pkts)
4805 self.pg_enable_capture(self.pg_interfaces)
4807 capture = self.pg1.get_capture(len(pkts))
4809 ses_num_before_timeout = self.nat64_get_ses_num()
4813 # ICMP session after timeout
4814 ses_num_after_timeout = self.nat64_get_ses_num()
4815 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4817 def test_icmp_error(self):
4818 """ NAT64 ICMP Error message translation """
4819 self.tcp_port_in = 6303
4820 self.udp_port_in = 6304
4821 self.icmp_id_in = 6305
4823 ses_num_start = self.nat64_get_ses_num()
4825 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4827 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4828 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4830 # send some packets to create sessions
4831 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4832 self.pg0.add_stream(pkts)
4833 self.pg_enable_capture(self.pg_interfaces)
4835 capture_ip4 = self.pg1.get_capture(len(pkts))
4836 self.verify_capture_out(capture_ip4,
4837 nat_ip=self.nat_addr,
4838 dst_ip=self.pg1.remote_ip4)
4840 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4841 self.pg1.add_stream(pkts)
4842 self.pg_enable_capture(self.pg_interfaces)
4844 capture_ip6 = self.pg0.get_capture(len(pkts))
4845 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4846 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4847 self.pg0.remote_ip6)
4850 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4851 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4852 ICMPv6DestUnreach(code=1) /
4853 packet[IPv6] for packet in capture_ip6]
4854 self.pg0.add_stream(pkts)
4855 self.pg_enable_capture(self.pg_interfaces)
4857 capture = self.pg1.get_capture(len(pkts))
4858 for packet in capture:
4860 self.assertEqual(packet[IP].src, self.nat_addr)
4861 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4862 self.assertEqual(packet[ICMP].type, 3)
4863 self.assertEqual(packet[ICMP].code, 13)
4864 inner = packet[IPerror]
4865 self.assertEqual(inner.src, self.pg1.remote_ip4)
4866 self.assertEqual(inner.dst, self.nat_addr)
4867 self.check_icmp_checksum(packet)
4868 if inner.haslayer(TCPerror):
4869 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4870 elif inner.haslayer(UDPerror):
4871 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4873 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4875 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4879 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4880 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4881 ICMP(type=3, code=13) /
4882 packet[IP] for packet in capture_ip4]
4883 self.pg1.add_stream(pkts)
4884 self.pg_enable_capture(self.pg_interfaces)
4886 capture = self.pg0.get_capture(len(pkts))
4887 for packet in capture:
4889 self.assertEqual(packet[IPv6].src, ip.src)
4890 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4891 icmp = packet[ICMPv6DestUnreach]
4892 self.assertEqual(icmp.code, 1)
4893 inner = icmp[IPerror6]
4894 self.assertEqual(inner.src, self.pg0.remote_ip6)
4895 self.assertEqual(inner.dst, ip.src)
4896 self.check_icmpv6_checksum(packet)
4897 if inner.haslayer(TCPerror):
4898 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4899 elif inner.haslayer(UDPerror):
4900 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4902 self.assertEqual(inner[ICMPv6EchoRequest].id,
4905 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4908 def test_hairpinning(self):
4909 """ NAT64 hairpinning """
4911 client = self.pg0.remote_hosts[0]
4912 server = self.pg0.remote_hosts[1]
4913 server_tcp_in_port = 22
4914 server_tcp_out_port = 4022
4915 server_udp_in_port = 23
4916 server_udp_out_port = 4023
4917 client_tcp_in_port = 1234
4918 client_udp_in_port = 1235
4919 client_tcp_out_port = 0
4920 client_udp_out_port = 0
4921 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4922 nat_addr_ip6 = ip.src
4924 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4926 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4927 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4929 self.vapi.nat64_add_del_static_bib(server.ip6n,
4932 server_tcp_out_port,
4934 self.vapi.nat64_add_del_static_bib(server.ip6n,
4937 server_udp_out_port,
4942 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4943 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4944 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4946 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4947 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4948 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4950 self.pg0.add_stream(pkts)
4951 self.pg_enable_capture(self.pg_interfaces)
4953 capture = self.pg0.get_capture(len(pkts))
4954 for packet in capture:
4956 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4957 self.assertEqual(packet[IPv6].dst, server.ip6)
4958 if packet.haslayer(TCP):
4959 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4960 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4961 self.check_tcp_checksum(packet)
4962 client_tcp_out_port = packet[TCP].sport
4964 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4965 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4966 self.check_udp_checksum(packet)
4967 client_udp_out_port = packet[UDP].sport
4969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4974 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4975 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4976 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4978 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4979 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4980 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4982 self.pg0.add_stream(pkts)
4983 self.pg_enable_capture(self.pg_interfaces)
4985 capture = self.pg0.get_capture(len(pkts))
4986 for packet in capture:
4988 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4989 self.assertEqual(packet[IPv6].dst, client.ip6)
4990 if packet.haslayer(TCP):
4991 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4992 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4993 self.check_tcp_checksum(packet)
4995 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4996 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4997 self.check_udp_checksum(packet)
4999 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5004 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5005 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5006 ICMPv6DestUnreach(code=1) /
5007 packet[IPv6] for packet in capture]
5008 self.pg0.add_stream(pkts)
5009 self.pg_enable_capture(self.pg_interfaces)
5011 capture = self.pg0.get_capture(len(pkts))
5012 for packet in capture:
5014 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5015 self.assertEqual(packet[IPv6].dst, server.ip6)
5016 icmp = packet[ICMPv6DestUnreach]
5017 self.assertEqual(icmp.code, 1)
5018 inner = icmp[IPerror6]
5019 self.assertEqual(inner.src, server.ip6)
5020 self.assertEqual(inner.dst, nat_addr_ip6)
5021 self.check_icmpv6_checksum(packet)
5022 if inner.haslayer(TCPerror):
5023 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5024 self.assertEqual(inner[TCPerror].dport,
5025 client_tcp_out_port)
5027 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5028 self.assertEqual(inner[UDPerror].dport,
5029 client_udp_out_port)
5031 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5034 def test_prefix(self):
5035 """ NAT64 Network-Specific Prefix """
5037 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5039 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5040 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5041 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5042 self.vrf1_nat_addr_n,
5043 vrf_id=self.vrf1_id)
5044 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5047 global_pref64 = "2001:db8::"
5048 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5049 global_pref64_len = 32
5050 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5052 prefix = self.vapi.nat64_prefix_dump()
5053 self.assertEqual(len(prefix), 1)
5054 self.assertEqual(prefix[0].prefix, global_pref64_n)
5055 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5056 self.assertEqual(prefix[0].vrf_id, 0)
5058 # Add tenant specific prefix
5059 vrf1_pref64 = "2001:db8:122:300::"
5060 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5061 vrf1_pref64_len = 56
5062 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5064 vrf_id=self.vrf1_id)
5065 prefix = self.vapi.nat64_prefix_dump()
5066 self.assertEqual(len(prefix), 2)
5069 pkts = self.create_stream_in_ip6(self.pg0,
5072 plen=global_pref64_len)
5073 self.pg0.add_stream(pkts)
5074 self.pg_enable_capture(self.pg_interfaces)
5076 capture = self.pg1.get_capture(len(pkts))
5077 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5078 dst_ip=self.pg1.remote_ip4)
5080 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5081 self.pg1.add_stream(pkts)
5082 self.pg_enable_capture(self.pg_interfaces)
5084 capture = self.pg0.get_capture(len(pkts))
5085 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5088 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5090 # Tenant specific prefix
5091 pkts = self.create_stream_in_ip6(self.pg2,
5094 plen=vrf1_pref64_len)
5095 self.pg2.add_stream(pkts)
5096 self.pg_enable_capture(self.pg_interfaces)
5098 capture = self.pg1.get_capture(len(pkts))
5099 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5100 dst_ip=self.pg1.remote_ip4)
5102 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5103 self.pg1.add_stream(pkts)
5104 self.pg_enable_capture(self.pg_interfaces)
5106 capture = self.pg2.get_capture(len(pkts))
5107 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5110 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5112 def test_unknown_proto(self):
5113 """ NAT64 translate packet with unknown protocol """
5115 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5117 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5118 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5119 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5122 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5123 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5124 TCP(sport=self.tcp_port_in, dport=20))
5125 self.pg0.add_stream(p)
5126 self.pg_enable_capture(self.pg_interfaces)
5128 p = self.pg1.get_capture(1)
5130 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5131 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5133 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5134 TCP(sport=1234, dport=1234))
5135 self.pg0.add_stream(p)
5136 self.pg_enable_capture(self.pg_interfaces)
5138 p = self.pg1.get_capture(1)
5141 self.assertEqual(packet[IP].src, self.nat_addr)
5142 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5143 self.assertTrue(packet.haslayer(GRE))
5144 self.check_ip_checksum(packet)
5146 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5150 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5151 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5153 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5154 TCP(sport=1234, dport=1234))
5155 self.pg1.add_stream(p)
5156 self.pg_enable_capture(self.pg_interfaces)
5158 p = self.pg0.get_capture(1)
5161 self.assertEqual(packet[IPv6].src, remote_ip6)
5162 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5163 self.assertEqual(packet[IPv6].nh, 47)
5165 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5168 def test_hairpinning_unknown_proto(self):
5169 """ NAT64 translate packet with unknown protocol - hairpinning """
5171 client = self.pg0.remote_hosts[0]
5172 server = self.pg0.remote_hosts[1]
5173 server_tcp_in_port = 22
5174 server_tcp_out_port = 4022
5175 client_tcp_in_port = 1234
5176 client_tcp_out_port = 1235
5177 server_nat_ip = "10.0.0.100"
5178 client_nat_ip = "10.0.0.110"
5179 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5180 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5181 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5182 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5184 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5186 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5187 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5189 self.vapi.nat64_add_del_static_bib(server.ip6n,
5192 server_tcp_out_port,
5195 self.vapi.nat64_add_del_static_bib(server.ip6n,
5201 self.vapi.nat64_add_del_static_bib(client.ip6n,
5204 client_tcp_out_port,
5208 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5209 IPv6(src=client.ip6, dst=server_nat_ip6) /
5210 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5211 self.pg0.add_stream(p)
5212 self.pg_enable_capture(self.pg_interfaces)
5214 p = self.pg0.get_capture(1)
5216 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5217 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5219 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5220 TCP(sport=1234, dport=1234))
5221 self.pg0.add_stream(p)
5222 self.pg_enable_capture(self.pg_interfaces)
5224 p = self.pg0.get_capture(1)
5227 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5228 self.assertEqual(packet[IPv6].dst, server.ip6)
5229 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5231 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5236 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5238 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5239 TCP(sport=1234, dport=1234))
5240 self.pg0.add_stream(p)
5241 self.pg_enable_capture(self.pg_interfaces)
5243 p = self.pg0.get_capture(1)
5246 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5247 self.assertEqual(packet[IPv6].dst, client.ip6)
5248 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5250 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5253 def test_one_armed_nat64(self):
5254 """ One armed NAT64 """
5256 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5260 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5262 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5263 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5266 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5267 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5268 TCP(sport=12345, dport=80))
5269 self.pg3.add_stream(p)
5270 self.pg_enable_capture(self.pg_interfaces)
5272 capture = self.pg3.get_capture(1)
5277 self.assertEqual(ip.src, self.nat_addr)
5278 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5279 self.assertNotEqual(tcp.sport, 12345)
5280 external_port = tcp.sport
5281 self.assertEqual(tcp.dport, 80)
5282 self.check_tcp_checksum(p)
5283 self.check_ip_checksum(p)
5285 self.logger.error(ppp("Unexpected or invalid packet:", p))
5289 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5290 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5291 TCP(sport=80, dport=external_port))
5292 self.pg3.add_stream(p)
5293 self.pg_enable_capture(self.pg_interfaces)
5295 capture = self.pg3.get_capture(1)
5300 self.assertEqual(ip.src, remote_host_ip6)
5301 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5302 self.assertEqual(tcp.sport, 80)
5303 self.assertEqual(tcp.dport, 12345)
5304 self.check_tcp_checksum(p)
5306 self.logger.error(ppp("Unexpected or invalid packet:", p))
5309 def test_frag_in_order(self):
5310 """ NAT64 translate fragments arriving in order """
5311 self.tcp_port_in = random.randint(1025, 65535)
5313 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5315 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5316 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5318 reass = self.vapi.nat_reass_dump()
5319 reass_n_start = len(reass)
5323 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5324 self.tcp_port_in, 20, data)
5325 self.pg0.add_stream(pkts)
5326 self.pg_enable_capture(self.pg_interfaces)
5328 frags = self.pg1.get_capture(len(pkts))
5329 p = self.reass_frags_and_verify(frags,
5331 self.pg1.remote_ip4)
5332 self.assertEqual(p[TCP].dport, 20)
5333 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5334 self.tcp_port_out = p[TCP].sport
5335 self.assertEqual(data, p[Raw].load)
5338 data = "A" * 4 + "b" * 16 + "C" * 3
5339 pkts = self.create_stream_frag(self.pg1,
5344 self.pg1.add_stream(pkts)
5345 self.pg_enable_capture(self.pg_interfaces)
5347 frags = self.pg0.get_capture(len(pkts))
5348 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5349 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5350 self.assertEqual(p[TCP].sport, 20)
5351 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5352 self.assertEqual(data, p[Raw].load)
5354 reass = self.vapi.nat_reass_dump()
5355 reass_n_end = len(reass)
5357 self.assertEqual(reass_n_end - reass_n_start, 2)
5359 def test_reass_hairpinning(self):
5360 """ NAT64 fragments hairpinning """
5362 client = self.pg0.remote_hosts[0]
5363 server = self.pg0.remote_hosts[1]
5364 server_in_port = random.randint(1025, 65535)
5365 server_out_port = random.randint(1025, 65535)
5366 client_in_port = random.randint(1025, 65535)
5367 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5368 nat_addr_ip6 = ip.src
5370 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5372 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5373 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5375 # add static BIB entry for server
5376 self.vapi.nat64_add_del_static_bib(server.ip6n,
5382 # send packet from host to server
5383 pkts = self.create_stream_frag_ip6(self.pg0,
5388 self.pg0.add_stream(pkts)
5389 self.pg_enable_capture(self.pg_interfaces)
5391 frags = self.pg0.get_capture(len(pkts))
5392 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5393 self.assertNotEqual(p[TCP].sport, client_in_port)
5394 self.assertEqual(p[TCP].dport, server_in_port)
5395 self.assertEqual(data, p[Raw].load)
5397 def test_frag_out_of_order(self):
5398 """ NAT64 translate fragments arriving out of order """
5399 self.tcp_port_in = random.randint(1025, 65535)
5401 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5403 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5404 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5408 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5409 self.tcp_port_in, 20, data)
5411 self.pg0.add_stream(pkts)
5412 self.pg_enable_capture(self.pg_interfaces)
5414 frags = self.pg1.get_capture(len(pkts))
5415 p = self.reass_frags_and_verify(frags,
5417 self.pg1.remote_ip4)
5418 self.assertEqual(p[TCP].dport, 20)
5419 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5420 self.tcp_port_out = p[TCP].sport
5421 self.assertEqual(data, p[Raw].load)
5424 data = "A" * 4 + "B" * 16 + "C" * 3
5425 pkts = self.create_stream_frag(self.pg1,
5431 self.pg1.add_stream(pkts)
5432 self.pg_enable_capture(self.pg_interfaces)
5434 frags = self.pg0.get_capture(len(pkts))
5435 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5436 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5437 self.assertEqual(p[TCP].sport, 20)
5438 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5439 self.assertEqual(data, p[Raw].load)
5441 def test_interface_addr(self):
5442 """ Acquire NAT64 pool addresses from interface """
5443 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5445 # no address in NAT64 pool
5446 adresses = self.vapi.nat44_address_dump()
5447 self.assertEqual(0, len(adresses))
5449 # configure interface address and check NAT64 address pool
5450 self.pg4.config_ip4()
5451 addresses = self.vapi.nat64_pool_addr_dump()
5452 self.assertEqual(len(addresses), 1)
5453 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5455 # remove interface address and check NAT64 address pool
5456 self.pg4.unconfig_ip4()
5457 addresses = self.vapi.nat64_pool_addr_dump()
5458 self.assertEqual(0, len(adresses))
5460 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5461 def test_ipfix_max_bibs_sessions(self):
5462 """ IPFIX logging maximum session and BIB entries exceeded """
5465 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5469 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5471 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5472 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5476 for i in range(0, max_bibs):
5477 src = "fd01:aa::%x" % (i)
5478 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5479 IPv6(src=src, dst=remote_host_ip6) /
5480 TCP(sport=12345, dport=80))
5482 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5483 IPv6(src=src, dst=remote_host_ip6) /
5484 TCP(sport=12345, dport=22))
5486 self.pg0.add_stream(pkts)
5487 self.pg_enable_capture(self.pg_interfaces)
5489 self.pg1.get_capture(max_sessions)
5491 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5492 src_address=self.pg3.local_ip4n,
5494 template_interval=10)
5495 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5496 src_port=self.ipfix_src_port)
5498 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5499 IPv6(src=src, dst=remote_host_ip6) /
5500 TCP(sport=12345, dport=25))
5501 self.pg0.add_stream(p)
5502 self.pg_enable_capture(self.pg_interfaces)
5504 self.pg1.get_capture(0)
5505 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5506 capture = self.pg3.get_capture(9)
5507 ipfix = IPFIXDecoder()
5508 # first load template
5510 self.assertTrue(p.haslayer(IPFIX))
5511 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5512 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5513 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5514 self.assertEqual(p[UDP].dport, 4739)
5515 self.assertEqual(p[IPFIX].observationDomainID,
5516 self.ipfix_domain_id)
5517 if p.haslayer(Template):
5518 ipfix.add_template(p.getlayer(Template))
5519 # verify events in data set
5521 if p.haslayer(Data):
5522 data = ipfix.decode_data_set(p.getlayer(Set))
5523 self.verify_ipfix_max_sessions(data, max_sessions)
5525 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5526 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5527 TCP(sport=12345, dport=80))
5528 self.pg0.add_stream(p)
5529 self.pg_enable_capture(self.pg_interfaces)
5531 self.pg1.get_capture(0)
5532 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5533 capture = self.pg3.get_capture(1)
5534 # verify events in data set
5536 self.assertTrue(p.haslayer(IPFIX))
5537 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5538 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5539 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5540 self.assertEqual(p[UDP].dport, 4739)
5541 self.assertEqual(p[IPFIX].observationDomainID,
5542 self.ipfix_domain_id)
5543 if p.haslayer(Data):
5544 data = ipfix.decode_data_set(p.getlayer(Set))
5545 self.verify_ipfix_max_bibs(data, max_bibs)
5547 def test_ipfix_max_frags(self):
5548 """ IPFIX logging maximum fragments pending reassembly exceeded """
5549 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5551 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5552 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5553 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5554 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5555 src_address=self.pg3.local_ip4n,
5557 template_interval=10)
5558 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5559 src_port=self.ipfix_src_port)
5562 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5563 self.tcp_port_in, 20, data)
5564 self.pg0.add_stream(pkts[-1])
5565 self.pg_enable_capture(self.pg_interfaces)
5567 self.pg1.get_capture(0)
5568 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5569 capture = self.pg3.get_capture(9)
5570 ipfix = IPFIXDecoder()
5571 # first load template
5573 self.assertTrue(p.haslayer(IPFIX))
5574 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5575 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5576 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5577 self.assertEqual(p[UDP].dport, 4739)
5578 self.assertEqual(p[IPFIX].observationDomainID,
5579 self.ipfix_domain_id)
5580 if p.haslayer(Template):
5581 ipfix.add_template(p.getlayer(Template))
5582 # verify events in data set
5584 if p.haslayer(Data):
5585 data = ipfix.decode_data_set(p.getlayer(Set))
5586 self.verify_ipfix_max_fragments_ip6(data, 0,
5587 self.pg0.remote_ip6n)
5589 def test_ipfix_bib_ses(self):
5590 """ IPFIX logging NAT64 BIB/session create and delete events """
5591 self.tcp_port_in = random.randint(1025, 65535)
5592 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5596 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5598 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5599 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5600 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5601 src_address=self.pg3.local_ip4n,
5603 template_interval=10)
5604 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5605 src_port=self.ipfix_src_port)
5608 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5609 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5610 TCP(sport=self.tcp_port_in, dport=25))
5611 self.pg0.add_stream(p)
5612 self.pg_enable_capture(self.pg_interfaces)
5614 p = self.pg1.get_capture(1)
5615 self.tcp_port_out = p[0][TCP].sport
5616 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5617 capture = self.pg3.get_capture(10)
5618 ipfix = IPFIXDecoder()
5619 # first load template
5621 self.assertTrue(p.haslayer(IPFIX))
5622 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5623 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5624 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5625 self.assertEqual(p[UDP].dport, 4739)
5626 self.assertEqual(p[IPFIX].observationDomainID,
5627 self.ipfix_domain_id)
5628 if p.haslayer(Template):
5629 ipfix.add_template(p.getlayer(Template))
5630 # verify events in data set
5632 if p.haslayer(Data):
5633 data = ipfix.decode_data_set(p.getlayer(Set))
5634 if ord(data[0][230]) == 10:
5635 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5636 elif ord(data[0][230]) == 6:
5637 self.verify_ipfix_nat64_ses(data,
5639 self.pg0.remote_ip6n,
5640 self.pg1.remote_ip4,
5643 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5646 self.pg_enable_capture(self.pg_interfaces)
5647 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5650 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5651 capture = self.pg3.get_capture(2)
5652 # verify events in data set
5654 self.assertTrue(p.haslayer(IPFIX))
5655 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5656 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5657 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5658 self.assertEqual(p[UDP].dport, 4739)
5659 self.assertEqual(p[IPFIX].observationDomainID,
5660 self.ipfix_domain_id)
5661 if p.haslayer(Data):
5662 data = ipfix.decode_data_set(p.getlayer(Set))
5663 if ord(data[0][230]) == 11:
5664 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5665 elif ord(data[0][230]) == 7:
5666 self.verify_ipfix_nat64_ses(data,
5668 self.pg0.remote_ip6n,
5669 self.pg1.remote_ip4,
5672 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5674 def nat64_get_ses_num(self):
5676 Return number of active NAT64 sessions.
5678 st = self.vapi.nat64_st_dump()
5681 def clear_nat64(self):
5683 Clear NAT64 configuration.
5685 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5686 domain_id=self.ipfix_domain_id)
5687 self.ipfix_src_port = 4739
5688 self.ipfix_domain_id = 1
5690 self.vapi.nat64_set_timeouts()
5692 interfaces = self.vapi.nat64_interface_dump()
5693 for intf in interfaces:
5694 if intf.is_inside > 1:
5695 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5698 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5702 bib = self.vapi.nat64_bib_dump(255)
5705 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5713 adresses = self.vapi.nat64_pool_addr_dump()
5714 for addr in adresses:
5715 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5720 prefixes = self.vapi.nat64_prefix_dump()
5721 for prefix in prefixes:
5722 self.vapi.nat64_add_del_prefix(prefix.prefix,
5724 vrf_id=prefix.vrf_id,
5728 super(TestNAT64, self).tearDown()
5729 if not self.vpp_dead:
5730 self.logger.info(self.vapi.cli("show nat64 pool"))
5731 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5732 self.logger.info(self.vapi.cli("show nat64 prefix"))
5733 self.logger.info(self.vapi.cli("show nat64 bib all"))
5734 self.logger.info(self.vapi.cli("show nat64 session table all"))
5735 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5739 class TestDSlite(MethodHolder):
5740 """ DS-Lite Test Cases """
5743 def setUpClass(cls):
5744 super(TestDSlite, cls).setUpClass()
5747 cls.nat_addr = '10.0.0.3'
5748 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5750 cls.create_pg_interfaces(range(2))
5752 cls.pg0.config_ip4()
5753 cls.pg0.resolve_arp()
5755 cls.pg1.config_ip6()
5756 cls.pg1.generate_remote_hosts(2)
5757 cls.pg1.configure_ipv6_neighbors()
5760 super(TestDSlite, cls).tearDownClass()
5763 def test_dslite(self):
5764 """ Test DS-Lite """
5765 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5767 aftr_ip4 = '192.0.0.1'
5768 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5769 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5770 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5771 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5774 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5775 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5776 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5777 UDP(sport=20000, dport=10000))
5778 self.pg1.add_stream(p)
5779 self.pg_enable_capture(self.pg_interfaces)
5781 capture = self.pg0.get_capture(1)
5782 capture = capture[0]
5783 self.assertFalse(capture.haslayer(IPv6))
5784 self.assertEqual(capture[IP].src, self.nat_addr)
5785 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5786 self.assertNotEqual(capture[UDP].sport, 20000)
5787 self.assertEqual(capture[UDP].dport, 10000)
5788 self.check_ip_checksum(capture)
5789 out_port = capture[UDP].sport
5791 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5792 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5793 UDP(sport=10000, dport=out_port))
5794 self.pg0.add_stream(p)
5795 self.pg_enable_capture(self.pg_interfaces)
5797 capture = self.pg1.get_capture(1)
5798 capture = capture[0]
5799 self.assertEqual(capture[IPv6].src, aftr_ip6)
5800 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5801 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5802 self.assertEqual(capture[IP].dst, '192.168.1.1')
5803 self.assertEqual(capture[UDP].sport, 10000)
5804 self.assertEqual(capture[UDP].dport, 20000)
5805 self.check_ip_checksum(capture)
5808 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5809 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5810 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5811 TCP(sport=20001, dport=10001))
5812 self.pg1.add_stream(p)
5813 self.pg_enable_capture(self.pg_interfaces)
5815 capture = self.pg0.get_capture(1)
5816 capture = capture[0]
5817 self.assertFalse(capture.haslayer(IPv6))
5818 self.assertEqual(capture[IP].src, self.nat_addr)
5819 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5820 self.assertNotEqual(capture[TCP].sport, 20001)
5821 self.assertEqual(capture[TCP].dport, 10001)
5822 self.check_ip_checksum(capture)
5823 self.check_tcp_checksum(capture)
5824 out_port = capture[TCP].sport
5826 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5827 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5828 TCP(sport=10001, dport=out_port))
5829 self.pg0.add_stream(p)
5830 self.pg_enable_capture(self.pg_interfaces)
5832 capture = self.pg1.get_capture(1)
5833 capture = capture[0]
5834 self.assertEqual(capture[IPv6].src, aftr_ip6)
5835 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5836 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5837 self.assertEqual(capture[IP].dst, '192.168.1.1')
5838 self.assertEqual(capture[TCP].sport, 10001)
5839 self.assertEqual(capture[TCP].dport, 20001)
5840 self.check_ip_checksum(capture)
5841 self.check_tcp_checksum(capture)
5844 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5845 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5846 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5847 ICMP(id=4000, type='echo-request'))
5848 self.pg1.add_stream(p)
5849 self.pg_enable_capture(self.pg_interfaces)
5851 capture = self.pg0.get_capture(1)
5852 capture = capture[0]
5853 self.assertFalse(capture.haslayer(IPv6))
5854 self.assertEqual(capture[IP].src, self.nat_addr)
5855 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5856 self.assertNotEqual(capture[ICMP].id, 4000)
5857 self.check_ip_checksum(capture)
5858 self.check_icmp_checksum(capture)
5859 out_id = capture[ICMP].id
5861 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5862 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5863 ICMP(id=out_id, type='echo-reply'))
5864 self.pg0.add_stream(p)
5865 self.pg_enable_capture(self.pg_interfaces)
5867 capture = self.pg1.get_capture(1)
5868 capture = capture[0]
5869 self.assertEqual(capture[IPv6].src, aftr_ip6)
5870 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5871 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5872 self.assertEqual(capture[IP].dst, '192.168.1.1')
5873 self.assertEqual(capture[ICMP].id, 4000)
5874 self.check_ip_checksum(capture)
5875 self.check_icmp_checksum(capture)
5877 # ping DS-Lite AFTR tunnel endpoint address
5878 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5879 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5880 ICMPv6EchoRequest())
5881 self.pg1.add_stream(p)
5882 self.pg_enable_capture(self.pg_interfaces)
5884 capture = self.pg1.get_capture(1)
5885 self.assertEqual(1, len(capture))
5886 capture = capture[0]
5887 self.assertEqual(capture[IPv6].src, aftr_ip6)
5888 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5889 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5892 super(TestDSlite, self).tearDown()
5893 if not self.vpp_dead:
5894 self.logger.info(self.vapi.cli("show dslite pool"))
5896 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5897 self.logger.info(self.vapi.cli("show dslite sessions"))
5900 class TestDSliteCE(MethodHolder):
5901 """ DS-Lite CE Test Cases """
5904 def setUpConstants(cls):
5905 super(TestDSliteCE, cls).setUpConstants()
5906 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5909 def setUpClass(cls):
5910 super(TestDSliteCE, cls).setUpClass()
5913 cls.create_pg_interfaces(range(2))
5915 cls.pg0.config_ip4()
5916 cls.pg0.resolve_arp()
5918 cls.pg1.config_ip6()
5919 cls.pg1.generate_remote_hosts(1)
5920 cls.pg1.configure_ipv6_neighbors()
5923 super(TestDSliteCE, cls).tearDownClass()
5926 def test_dslite_ce(self):
5927 """ Test DS-Lite CE """
5929 b4_ip4 = '192.0.0.2'
5930 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5931 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5932 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5933 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5935 aftr_ip4 = '192.0.0.1'
5936 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5937 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5938 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5939 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5941 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5942 dst_address_length=128,
5943 next_hop_address=self.pg1.remote_ip6n,
5944 next_hop_sw_if_index=self.pg1.sw_if_index,
5948 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5949 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5950 UDP(sport=10000, dport=20000))
5951 self.pg0.add_stream(p)
5952 self.pg_enable_capture(self.pg_interfaces)
5954 capture = self.pg1.get_capture(1)
5955 capture = capture[0]
5956 self.assertEqual(capture[IPv6].src, b4_ip6)
5957 self.assertEqual(capture[IPv6].dst, aftr_ip6)
5958 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5959 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5960 self.assertEqual(capture[UDP].sport, 10000)
5961 self.assertEqual(capture[UDP].dport, 20000)
5962 self.check_ip_checksum(capture)
5965 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5966 IPv6(dst=b4_ip6, src=aftr_ip6) /
5967 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5968 UDP(sport=20000, dport=10000))
5969 self.pg1.add_stream(p)
5970 self.pg_enable_capture(self.pg_interfaces)
5972 capture = self.pg0.get_capture(1)
5973 capture = capture[0]
5974 self.assertFalse(capture.haslayer(IPv6))
5975 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5976 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5977 self.assertEqual(capture[UDP].sport, 20000)
5978 self.assertEqual(capture[UDP].dport, 10000)
5979 self.check_ip_checksum(capture)
5981 # ping DS-Lite B4 tunnel endpoint address
5982 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5983 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5984 ICMPv6EchoRequest())
5985 self.pg1.add_stream(p)
5986 self.pg_enable_capture(self.pg_interfaces)
5988 capture = self.pg1.get_capture(1)
5989 self.assertEqual(1, len(capture))
5990 capture = capture[0]
5991 self.assertEqual(capture[IPv6].src, b4_ip6)
5992 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5993 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5996 super(TestDSliteCE, self).tearDown()
5997 if not self.vpp_dead:
5999 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6001 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6003 if __name__ == '__main__':
6004 unittest.main(testRunner=VppTestRunner)