9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1035 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036 for lb_sm in lb_static_mappings:
1037 self.vapi.nat44_add_del_lb_static_mapping(
1038 lb_sm.external_addr,
1039 lb_sm.external_port,
1041 vrf_id=lb_sm.vrf_id,
1042 twice_nat=lb_sm.twice_nat,
1043 out2in_only=lb_sm.out2in_only,
1048 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049 for id_m in identity_mappings:
1050 self.vapi.nat44_add_del_identity_mapping(
1051 addr_only=id_m.addr_only,
1054 sw_if_index=id_m.sw_if_index,
1056 protocol=id_m.protocol,
1059 adresses = self.vapi.nat44_address_dump()
1060 for addr in adresses:
1061 self.vapi.nat44_add_del_address_range(addr.ip_address,
1063 twice_nat=addr.twice_nat,
1066 self.vapi.nat_set_reass()
1067 self.vapi.nat_set_reass(is_ip6=1)
1069 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070 local_port=0, external_port=0, vrf_id=0,
1071 is_add=1, external_sw_if_index=0xFFFFFFFF,
1072 proto=0, twice_nat=0, out2in_only=0):
1074 Add/delete NAT44 static mapping
1076 :param local_ip: Local IP address
1077 :param external_ip: External IP address
1078 :param local_port: Local port number (Optional)
1079 :param external_port: External port number (Optional)
1080 :param vrf_id: VRF ID (Default 0)
1081 :param is_add: 1 if add, 0 if delete (Default add)
1082 :param external_sw_if_index: External interface instead of IP address
1083 :param proto: IP protocol (Mandatory if port specified)
1084 :param twice_nat: 1 if translate external host address and port
1085 :param out2in_only: if 1 rule is matching only out2in direction
1088 if local_port and external_port:
1090 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092 self.vapi.nat44_add_del_static_mapping(
1095 external_sw_if_index,
1105 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1107 Add/delete NAT44 address
1109 :param ip: IP address
1110 :param is_add: 1 if add, 0 if delete (Default add)
1111 :param twice_nat: twice NAT address for extenal hosts
1113 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1116 twice_nat=twice_nat)
1118 def test_dynamic(self):
1119 """ NAT44 dynamic translation test """
1121 self.nat44_add_address(self.nat_addr)
1122 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1127 pkts = self.create_stream_in(self.pg0, self.pg1)
1128 self.pg0.add_stream(pkts)
1129 self.pg_enable_capture(self.pg_interfaces)
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out(capture)
1135 pkts = self.create_stream_out(self.pg1)
1136 self.pg1.add_stream(pkts)
1137 self.pg_enable_capture(self.pg_interfaces)
1139 capture = self.pg0.get_capture(len(pkts))
1140 self.verify_capture_in(capture, self.pg0)
1142 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143 """ NAT44 handling of client packets with TTL=1 """
1145 self.nat44_add_address(self.nat_addr)
1146 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1150 # Client side - generate traffic
1151 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152 self.pg0.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1156 # Client side - verify ICMP type 11 packets
1157 capture = self.pg0.get_capture(len(pkts))
1158 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1160 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161 """ NAT44 handling of server packets with TTL=1 """
1163 self.nat44_add_address(self.nat_addr)
1164 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1168 # Client side - create sessions
1169 pkts = self.create_stream_in(self.pg0, self.pg1)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1174 # Server side - generate traffic
1175 capture = self.pg1.get_capture(len(pkts))
1176 self.verify_capture_out(capture)
1177 pkts = self.create_stream_out(self.pg1, ttl=1)
1178 self.pg1.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Server side - verify ICMP type 11 packets
1183 capture = self.pg1.get_capture(len(pkts))
1184 self.verify_capture_out_with_icmp_errors(capture,
1185 src_ip=self.pg1.local_ip4)
1187 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188 """ NAT44 handling of error responses to client packets with TTL=2 """
1190 self.nat44_add_address(self.nat_addr)
1191 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1195 # Client side - generate traffic
1196 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197 self.pg0.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1201 # Server side - simulate ICMP type 11 response
1202 capture = self.pg1.get_capture(len(pkts))
1203 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205 ICMP(type=11) / packet[IP] for packet in capture]
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1210 # Client side - verify ICMP type 11 packets
1211 capture = self.pg0.get_capture(len(pkts))
1212 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1214 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215 """ NAT44 handling of error responses to server packets with TTL=2 """
1217 self.nat44_add_address(self.nat_addr)
1218 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1222 # Client side - create sessions
1223 pkts = self.create_stream_in(self.pg0, self.pg1)
1224 self.pg0.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1228 # Server side - generate traffic
1229 capture = self.pg1.get_capture(len(pkts))
1230 self.verify_capture_out(capture)
1231 pkts = self.create_stream_out(self.pg1, ttl=2)
1232 self.pg1.add_stream(pkts)
1233 self.pg_enable_capture(self.pg_interfaces)
1236 # Client side - simulate ICMP type 11 response
1237 capture = self.pg0.get_capture(len(pkts))
1238 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240 ICMP(type=11) / packet[IP] for packet in capture]
1241 self.pg0.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1245 # Server side - verify ICMP type 11 packets
1246 capture = self.pg1.get_capture(len(pkts))
1247 self.verify_capture_out_with_icmp_errors(capture)
1249 def test_ping_out_interface_from_outside(self):
1250 """ Ping NAT44 out interface from outside network """
1252 self.nat44_add_address(self.nat_addr)
1253 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1257 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259 ICMP(id=self.icmp_id_out, type='echo-request'))
1261 self.pg1.add_stream(pkts)
1262 self.pg_enable_capture(self.pg_interfaces)
1264 capture = self.pg1.get_capture(len(pkts))
1265 self.assertEqual(1, len(capture))
1268 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271 self.assertEqual(packet[ICMP].type, 0) # echo reply
1273 self.logger.error(ppp("Unexpected or invalid packet "
1274 "(outside network):", packet))
1277 def test_ping_internal_host_from_outside(self):
1278 """ Ping internal host from outside network """
1280 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1286 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288 ICMP(id=self.icmp_id_out, type='echo-request'))
1289 self.pg1.add_stream(pkt)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(1)
1293 self.verify_capture_in(capture, self.pg0, packet_num=1)
1294 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1297 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299 ICMP(id=self.icmp_id_in, type='echo-reply'))
1300 self.pg0.add_stream(pkt)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(1)
1304 self.verify_capture_out(capture, same_port=True, packet_num=1)
1305 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1307 def test_forwarding(self):
1308 """ NAT44 forwarding test """
1310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1313 self.vapi.nat44_forwarding_enable_disable(1)
1315 real_ip = self.pg0.remote_ip4n
1316 alias_ip = self.nat_addr_n
1317 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318 external_ip=alias_ip)
1321 # in2out - static mapping match
1323 pkts = self.create_stream_out(self.pg1)
1324 self.pg1.add_stream(pkts)
1325 self.pg_enable_capture(self.pg_interfaces)
1327 capture = self.pg0.get_capture(len(pkts))
1328 self.verify_capture_in(capture, self.pg0)
1330 pkts = self.create_stream_in(self.pg0, self.pg1)
1331 self.pg0.add_stream(pkts)
1332 self.pg_enable_capture(self.pg_interfaces)
1334 capture = self.pg1.get_capture(len(pkts))
1335 self.verify_capture_out(capture, same_port=True)
1337 # in2out - no static mapping match
1339 host0 = self.pg0.remote_hosts[0]
1340 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1342 pkts = self.create_stream_out(self.pg1,
1343 dst_ip=self.pg0.remote_ip4,
1344 use_inside_ports=True)
1345 self.pg1.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg0.get_capture(len(pkts))
1349 self.verify_capture_in(capture, self.pg0)
1351 pkts = self.create_stream_in(self.pg0, self.pg1)
1352 self.pg0.add_stream(pkts)
1353 self.pg_enable_capture(self.pg_interfaces)
1355 capture = self.pg1.get_capture(len(pkts))
1356 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1359 self.pg0.remote_hosts[0] = host0
1362 self.vapi.nat44_forwarding_enable_disable(0)
1363 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364 external_ip=alias_ip,
1367 def test_static_in(self):
1368 """ 1:1 NAT initialized from inside network """
1370 nat_ip = "10.0.0.10"
1371 self.tcp_port_out = 6303
1372 self.udp_port_out = 6304
1373 self.icmp_id_out = 6305
1375 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381 pkts = self.create_stream_in(self.pg0, self.pg1)
1382 self.pg0.add_stream(pkts)
1383 self.pg_enable_capture(self.pg_interfaces)
1385 capture = self.pg1.get_capture(len(pkts))
1386 self.verify_capture_out(capture, nat_ip, True)
1389 pkts = self.create_stream_out(self.pg1, nat_ip)
1390 self.pg1.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg0.get_capture(len(pkts))
1394 self.verify_capture_in(capture, self.pg0)
1396 def test_static_out(self):
1397 """ 1:1 NAT initialized from outside network """
1399 nat_ip = "10.0.0.20"
1400 self.tcp_port_out = 6303
1401 self.udp_port_out = 6304
1402 self.icmp_id_out = 6305
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1410 pkts = self.create_stream_out(self.pg1, nat_ip)
1411 self.pg1.add_stream(pkts)
1412 self.pg_enable_capture(self.pg_interfaces)
1414 capture = self.pg0.get_capture(len(pkts))
1415 self.verify_capture_in(capture, self.pg0)
1418 pkts = self.create_stream_in(self.pg0, self.pg1)
1419 self.pg0.add_stream(pkts)
1420 self.pg_enable_capture(self.pg_interfaces)
1422 capture = self.pg1.get_capture(len(pkts))
1423 self.verify_capture_out(capture, nat_ip, True)
1425 def test_static_with_port_in(self):
1426 """ 1:1 NAPT initialized from inside network """
1428 self.tcp_port_out = 3606
1429 self.udp_port_out = 3607
1430 self.icmp_id_out = 3608
1432 self.nat44_add_address(self.nat_addr)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.tcp_port_in, self.tcp_port_out,
1435 proto=IP_PROTOS.tcp)
1436 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437 self.udp_port_in, self.udp_port_out,
1438 proto=IP_PROTOS.udp)
1439 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440 self.icmp_id_in, self.icmp_id_out,
1441 proto=IP_PROTOS.icmp)
1442 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1447 pkts = self.create_stream_in(self.pg0, self.pg1)
1448 self.pg0.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(len(pkts))
1452 self.verify_capture_out(capture)
1455 pkts = self.create_stream_out(self.pg1)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1462 def test_static_with_port_out(self):
1463 """ 1:1 NAPT initialized from outside network """
1465 self.tcp_port_out = 30606
1466 self.udp_port_out = 30607
1467 self.icmp_id_out = 30608
1469 self.nat44_add_address(self.nat_addr)
1470 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471 self.tcp_port_in, self.tcp_port_out,
1472 proto=IP_PROTOS.tcp)
1473 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474 self.udp_port_in, self.udp_port_out,
1475 proto=IP_PROTOS.udp)
1476 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477 self.icmp_id_in, self.icmp_id_out,
1478 proto=IP_PROTOS.icmp)
1479 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1484 pkts = self.create_stream_out(self.pg1)
1485 self.pg1.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg0.get_capture(len(pkts))
1489 self.verify_capture_in(capture, self.pg0)
1492 pkts = self.create_stream_in(self.pg0, self.pg1)
1493 self.pg0.add_stream(pkts)
1494 self.pg_enable_capture(self.pg_interfaces)
1496 capture = self.pg1.get_capture(len(pkts))
1497 self.verify_capture_out(capture)
1499 def test_static_with_port_out2(self):
1500 """ 1:1 NAPT symmetrical rule """
1505 self.vapi.nat44_forwarding_enable_disable(1)
1506 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507 local_port, external_port,
1508 proto=IP_PROTOS.tcp, out2in_only=1)
1509 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1513 # from client to service
1514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516 TCP(sport=12345, dport=external_port))
1517 self.pg1.add_stream(p)
1518 self.pg_enable_capture(self.pg_interfaces)
1520 capture = self.pg0.get_capture(1)
1526 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527 self.assertEqual(tcp.dport, local_port)
1528 self.check_tcp_checksum(p)
1529 self.check_ip_checksum(p)
1531 self.logger.error(ppp("Unexpected or invalid packet:", p))
1534 # from service back to client
1535 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537 TCP(sport=local_port, dport=12345))
1538 self.pg0.add_stream(p)
1539 self.pg_enable_capture(self.pg_interfaces)
1541 capture = self.pg1.get_capture(1)
1546 self.assertEqual(ip.src, self.nat_addr)
1547 self.assertEqual(tcp.sport, external_port)
1548 self.check_tcp_checksum(p)
1549 self.check_ip_checksum(p)
1551 self.logger.error(ppp("Unexpected or invalid packet:", p))
1554 # from client to server (no translation)
1555 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557 TCP(sport=12346, dport=local_port))
1558 self.pg1.add_stream(p)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(tcp.dport, local_port)
1569 self.check_tcp_checksum(p)
1570 self.check_ip_checksum(p)
1572 self.logger.error(ppp("Unexpected or invalid packet:", p))
1575 # from service back to client (no translation)
1576 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578 TCP(sport=local_port, dport=12346))
1579 self.pg0.add_stream(p)
1580 self.pg_enable_capture(self.pg_interfaces)
1582 capture = self.pg1.get_capture(1)
1587 self.assertEqual(ip.src, self.pg0.remote_ip4)
1588 self.assertEqual(tcp.sport, local_port)
1589 self.check_tcp_checksum(p)
1590 self.check_ip_checksum(p)
1592 self.logger.error(ppp("Unexpected or invalid packet:", p))
1595 def test_static_vrf_aware(self):
1596 """ 1:1 NAT VRF awareness """
1598 nat_ip1 = "10.0.0.30"
1599 nat_ip2 = "10.0.0.40"
1600 self.tcp_port_out = 6303
1601 self.udp_port_out = 6304
1602 self.icmp_id_out = 6305
1604 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1606 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1608 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1613 # inside interface VRF match NAT44 static mapping VRF
1614 pkts = self.create_stream_in(self.pg4, self.pg3)
1615 self.pg4.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg3.get_capture(len(pkts))
1619 self.verify_capture_out(capture, nat_ip1, True)
1621 # inside interface VRF don't match NAT44 static mapping VRF (packets
1623 pkts = self.create_stream_in(self.pg0, self.pg3)
1624 self.pg0.add_stream(pkts)
1625 self.pg_enable_capture(self.pg_interfaces)
1627 self.pg3.assert_nothing_captured()
1629 def test_identity_nat(self):
1630 """ Identity NAT """
1632 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1637 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639 TCP(sport=12345, dport=56789))
1640 self.pg1.add_stream(p)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg0.get_capture(1)
1648 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649 self.assertEqual(ip.src, self.pg1.remote_ip4)
1650 self.assertEqual(tcp.dport, 56789)
1651 self.assertEqual(tcp.sport, 12345)
1652 self.check_tcp_checksum(p)
1653 self.check_ip_checksum(p)
1655 self.logger.error(ppp("Unexpected or invalid packet:", p))
1658 def test_static_lb(self):
1659 """ NAT44 local service load balancing """
1660 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1663 server1 = self.pg0.remote_hosts[0]
1664 server2 = self.pg0.remote_hosts[1]
1666 locals = [{'addr': server1.ip4n,
1669 {'addr': server2.ip4n,
1673 self.nat44_add_address(self.nat_addr)
1674 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1677 local_num=len(locals),
1679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1683 # from client to service
1684 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686 TCP(sport=12345, dport=external_port))
1687 self.pg1.add_stream(p)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg0.get_capture(1)
1696 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697 if ip.dst == server1.ip4:
1701 self.assertEqual(tcp.dport, local_port)
1702 self.check_tcp_checksum(p)
1703 self.check_ip_checksum(p)
1705 self.logger.error(ppp("Unexpected or invalid packet:", p))
1708 # from service back to client
1709 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711 TCP(sport=local_port, dport=12345))
1712 self.pg0.add_stream(p)
1713 self.pg_enable_capture(self.pg_interfaces)
1715 capture = self.pg1.get_capture(1)
1720 self.assertEqual(ip.src, self.nat_addr)
1721 self.assertEqual(tcp.sport, external_port)
1722 self.check_tcp_checksum(p)
1723 self.check_ip_checksum(p)
1725 self.logger.error(ppp("Unexpected or invalid packet:", p))
1731 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1733 for client in clients:
1734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735 IP(src=client, dst=self.nat_addr) /
1736 TCP(sport=12345, dport=external_port))
1738 self.pg1.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1741 capture = self.pg0.get_capture(len(pkts))
1743 if p[IP].dst == server1.ip4:
1747 self.assertTrue(server1_n > server2_n)
1749 def test_static_lb_2(self):
1750 """ NAT44 local service load balancing (asymmetrical rule) """
1751 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1754 server1 = self.pg0.remote_hosts[0]
1755 server2 = self.pg0.remote_hosts[1]
1757 locals = [{'addr': server1.ip4n,
1760 {'addr': server2.ip4n,
1764 self.vapi.nat44_forwarding_enable_disable(1)
1765 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1769 local_num=len(locals),
1771 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1775 # from client to service
1776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778 TCP(sport=12345, dport=external_port))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1788 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789 if ip.dst == server1.ip4:
1793 self.assertEqual(tcp.dport, local_port)
1794 self.check_tcp_checksum(p)
1795 self.check_ip_checksum(p)
1797 self.logger.error(ppp("Unexpected or invalid packet:", p))
1800 # from service back to client
1801 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803 TCP(sport=local_port, dport=12345))
1804 self.pg0.add_stream(p)
1805 self.pg_enable_capture(self.pg_interfaces)
1807 capture = self.pg1.get_capture(1)
1812 self.assertEqual(ip.src, self.nat_addr)
1813 self.assertEqual(tcp.sport, external_port)
1814 self.check_tcp_checksum(p)
1815 self.check_ip_checksum(p)
1817 self.logger.error(ppp("Unexpected or invalid packet:", p))
1820 # from client to server (no translation)
1821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823 TCP(sport=12346, dport=local_port))
1824 self.pg1.add_stream(p)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg0.get_capture(1)
1833 self.assertEqual(ip.dst, server1.ip4)
1834 self.assertEqual(tcp.dport, local_port)
1835 self.check_tcp_checksum(p)
1836 self.check_ip_checksum(p)
1838 self.logger.error(ppp("Unexpected or invalid packet:", p))
1841 # from service back to client (no translation)
1842 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844 TCP(sport=local_port, dport=12346))
1845 self.pg0.add_stream(p)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg1.get_capture(1)
1853 self.assertEqual(ip.src, server1.ip4)
1854 self.assertEqual(tcp.sport, local_port)
1855 self.check_tcp_checksum(p)
1856 self.check_ip_checksum(p)
1858 self.logger.error(ppp("Unexpected or invalid packet:", p))
1861 def test_multiple_inside_interfaces(self):
1862 """ NAT44 multiple non-overlapping address space inside interfaces """
1864 self.nat44_add_address(self.nat_addr)
1865 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1870 # between two NAT44 inside interfaces (no translation)
1871 pkts = self.create_stream_in(self.pg0, self.pg1)
1872 self.pg0.add_stream(pkts)
1873 self.pg_enable_capture(self.pg_interfaces)
1875 capture = self.pg1.get_capture(len(pkts))
1876 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1878 # from NAT44 inside to interface without NAT44 feature (no translation)
1879 pkts = self.create_stream_in(self.pg0, self.pg2)
1880 self.pg0.add_stream(pkts)
1881 self.pg_enable_capture(self.pg_interfaces)
1883 capture = self.pg2.get_capture(len(pkts))
1884 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1886 # in2out 1st interface
1887 pkts = self.create_stream_in(self.pg0, self.pg3)
1888 self.pg0.add_stream(pkts)
1889 self.pg_enable_capture(self.pg_interfaces)
1891 capture = self.pg3.get_capture(len(pkts))
1892 self.verify_capture_out(capture)
1894 # out2in 1st interface
1895 pkts = self.create_stream_out(self.pg3)
1896 self.pg3.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg0.get_capture(len(pkts))
1900 self.verify_capture_in(capture, self.pg0)
1902 # in2out 2nd interface
1903 pkts = self.create_stream_in(self.pg1, self.pg3)
1904 self.pg1.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg3.get_capture(len(pkts))
1908 self.verify_capture_out(capture)
1910 # out2in 2nd interface
1911 pkts = self.create_stream_out(self.pg3)
1912 self.pg3.add_stream(pkts)
1913 self.pg_enable_capture(self.pg_interfaces)
1915 capture = self.pg1.get_capture(len(pkts))
1916 self.verify_capture_in(capture, self.pg1)
1918 def test_inside_overlapping_interfaces(self):
1919 """ NAT44 multiple inside interfaces with overlapping address space """
1921 static_nat_ip = "10.0.0.10"
1922 self.nat44_add_address(self.nat_addr)
1923 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1925 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1931 # between NAT44 inside interfaces with same VRF (no translation)
1932 pkts = self.create_stream_in(self.pg4, self.pg5)
1933 self.pg4.add_stream(pkts)
1934 self.pg_enable_capture(self.pg_interfaces)
1936 capture = self.pg5.get_capture(len(pkts))
1937 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1939 # between NAT44 inside interfaces with different VRF (hairpinning)
1940 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942 TCP(sport=1234, dport=5678))
1943 self.pg4.add_stream(p)
1944 self.pg_enable_capture(self.pg_interfaces)
1946 capture = self.pg6.get_capture(1)
1951 self.assertEqual(ip.src, self.nat_addr)
1952 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953 self.assertNotEqual(tcp.sport, 1234)
1954 self.assertEqual(tcp.dport, 5678)
1956 self.logger.error(ppp("Unexpected or invalid packet:", p))
1959 # in2out 1st interface
1960 pkts = self.create_stream_in(self.pg4, self.pg3)
1961 self.pg4.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg3.get_capture(len(pkts))
1965 self.verify_capture_out(capture)
1967 # out2in 1st interface
1968 pkts = self.create_stream_out(self.pg3)
1969 self.pg3.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg4.get_capture(len(pkts))
1973 self.verify_capture_in(capture, self.pg4)
1975 # in2out 2nd interface
1976 pkts = self.create_stream_in(self.pg5, self.pg3)
1977 self.pg5.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg3.get_capture(len(pkts))
1981 self.verify_capture_out(capture)
1983 # out2in 2nd interface
1984 pkts = self.create_stream_out(self.pg3)
1985 self.pg3.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg5.get_capture(len(pkts))
1989 self.verify_capture_in(capture, self.pg5)
1992 addresses = self.vapi.nat44_address_dump()
1993 self.assertEqual(len(addresses), 1)
1994 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995 self.assertEqual(len(sessions), 3)
1996 for session in sessions:
1997 self.assertFalse(session.is_static)
1998 self.assertEqual(session.inside_ip_address[0:4],
1999 self.pg5.remote_ip4n)
2000 self.assertEqual(session.outside_ip_address,
2001 addresses[0].ip_address)
2002 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2012 # in2out 3rd interface
2013 pkts = self.create_stream_in(self.pg6, self.pg3)
2014 self.pg6.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg3.get_capture(len(pkts))
2018 self.verify_capture_out(capture, static_nat_ip, True)
2020 # out2in 3rd interface
2021 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022 self.pg3.add_stream(pkts)
2023 self.pg_enable_capture(self.pg_interfaces)
2025 capture = self.pg6.get_capture(len(pkts))
2026 self.verify_capture_in(capture, self.pg6)
2028 # general user and session dump verifications
2029 users = self.vapi.nat44_user_dump()
2030 self.assertTrue(len(users) >= 3)
2031 addresses = self.vapi.nat44_address_dump()
2032 self.assertEqual(len(addresses), 1)
2034 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2036 for session in sessions:
2037 self.assertEqual(user.ip_address, session.inside_ip_address)
2038 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039 self.assertTrue(session.protocol in
2040 [IP_PROTOS.tcp, IP_PROTOS.udp,
2044 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045 self.assertTrue(len(sessions) >= 4)
2046 for session in sessions:
2047 self.assertFalse(session.is_static)
2048 self.assertEqual(session.inside_ip_address[0:4],
2049 self.pg4.remote_ip4n)
2050 self.assertEqual(session.outside_ip_address,
2051 addresses[0].ip_address)
2054 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055 self.assertTrue(len(sessions) >= 3)
2056 for session in sessions:
2057 self.assertTrue(session.is_static)
2058 self.assertEqual(session.inside_ip_address[0:4],
2059 self.pg6.remote_ip4n)
2060 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061 map(int, static_nat_ip.split('.')))
2062 self.assertTrue(session.inside_port in
2063 [self.tcp_port_in, self.udp_port_in,
2066 def test_hairpinning(self):
2067 """ NAT44 hairpinning - 1:1 NAPT """
2069 host = self.pg0.remote_hosts[0]
2070 server = self.pg0.remote_hosts[1]
2073 server_in_port = 5678
2074 server_out_port = 8765
2076 self.nat44_add_address(self.nat_addr)
2077 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2080 # add static mapping for server
2081 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082 server_in_port, server_out_port,
2083 proto=IP_PROTOS.tcp)
2085 # send packet from host to server
2086 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087 IP(src=host.ip4, dst=self.nat_addr) /
2088 TCP(sport=host_in_port, dport=server_out_port))
2089 self.pg0.add_stream(p)
2090 self.pg_enable_capture(self.pg_interfaces)
2092 capture = self.pg0.get_capture(1)
2097 self.assertEqual(ip.src, self.nat_addr)
2098 self.assertEqual(ip.dst, server.ip4)
2099 self.assertNotEqual(tcp.sport, host_in_port)
2100 self.assertEqual(tcp.dport, server_in_port)
2101 self.check_tcp_checksum(p)
2102 host_out_port = tcp.sport
2104 self.logger.error(ppp("Unexpected or invalid packet:", p))
2107 # send reply from server to host
2108 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109 IP(src=server.ip4, dst=self.nat_addr) /
2110 TCP(sport=server_in_port, dport=host_out_port))
2111 self.pg0.add_stream(p)
2112 self.pg_enable_capture(self.pg_interfaces)
2114 capture = self.pg0.get_capture(1)
2119 self.assertEqual(ip.src, self.nat_addr)
2120 self.assertEqual(ip.dst, host.ip4)
2121 self.assertEqual(tcp.sport, server_out_port)
2122 self.assertEqual(tcp.dport, host_in_port)
2123 self.check_tcp_checksum(p)
2125 self.logger.error(ppp("Unexpected or invalid packet:", p))
2128 def test_hairpinning2(self):
2129 """ NAT44 hairpinning - 1:1 NAT"""
2131 server1_nat_ip = "10.0.0.10"
2132 server2_nat_ip = "10.0.0.11"
2133 host = self.pg0.remote_hosts[0]
2134 server1 = self.pg0.remote_hosts[1]
2135 server2 = self.pg0.remote_hosts[2]
2136 server_tcp_port = 22
2137 server_udp_port = 20
2139 self.nat44_add_address(self.nat_addr)
2140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2144 # add static mapping for servers
2145 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2150 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151 IP(src=host.ip4, dst=server1_nat_ip) /
2152 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2154 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155 IP(src=host.ip4, dst=server1_nat_ip) /
2156 UDP(sport=self.udp_port_in, dport=server_udp_port))
2158 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159 IP(src=host.ip4, dst=server1_nat_ip) /
2160 ICMP(id=self.icmp_id_in, type='echo-request'))
2162 self.pg0.add_stream(pkts)
2163 self.pg_enable_capture(self.pg_interfaces)
2165 capture = self.pg0.get_capture(len(pkts))
2166 for packet in capture:
2168 self.assertEqual(packet[IP].src, self.nat_addr)
2169 self.assertEqual(packet[IP].dst, server1.ip4)
2170 if packet.haslayer(TCP):
2171 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172 self.assertEqual(packet[TCP].dport, server_tcp_port)
2173 self.tcp_port_out = packet[TCP].sport
2174 self.check_tcp_checksum(packet)
2175 elif packet.haslayer(UDP):
2176 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177 self.assertEqual(packet[UDP].dport, server_udp_port)
2178 self.udp_port_out = packet[UDP].sport
2180 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181 self.icmp_id_out = packet[ICMP].id
2183 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2188 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189 IP(src=server1.ip4, dst=self.nat_addr) /
2190 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2192 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193 IP(src=server1.ip4, dst=self.nat_addr) /
2194 UDP(sport=server_udp_port, dport=self.udp_port_out))
2196 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197 IP(src=server1.ip4, dst=self.nat_addr) /
2198 ICMP(id=self.icmp_id_out, type='echo-reply'))
2200 self.pg0.add_stream(pkts)
2201 self.pg_enable_capture(self.pg_interfaces)
2203 capture = self.pg0.get_capture(len(pkts))
2204 for packet in capture:
2206 self.assertEqual(packet[IP].src, server1_nat_ip)
2207 self.assertEqual(packet[IP].dst, host.ip4)
2208 if packet.haslayer(TCP):
2209 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210 self.assertEqual(packet[TCP].sport, server_tcp_port)
2211 self.check_tcp_checksum(packet)
2212 elif packet.haslayer(UDP):
2213 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214 self.assertEqual(packet[UDP].sport, server_udp_port)
2216 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2218 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2221 # server2 to server1
2223 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224 IP(src=server2.ip4, dst=server1_nat_ip) /
2225 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2227 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228 IP(src=server2.ip4, dst=server1_nat_ip) /
2229 UDP(sport=self.udp_port_in, dport=server_udp_port))
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=server2.ip4, dst=server1_nat_ip) /
2233 ICMP(id=self.icmp_id_in, type='echo-request'))
2235 self.pg0.add_stream(pkts)
2236 self.pg_enable_capture(self.pg_interfaces)
2238 capture = self.pg0.get_capture(len(pkts))
2239 for packet in capture:
2241 self.assertEqual(packet[IP].src, server2_nat_ip)
2242 self.assertEqual(packet[IP].dst, server1.ip4)
2243 if packet.haslayer(TCP):
2244 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245 self.assertEqual(packet[TCP].dport, server_tcp_port)
2246 self.tcp_port_out = packet[TCP].sport
2247 self.check_tcp_checksum(packet)
2248 elif packet.haslayer(UDP):
2249 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250 self.assertEqual(packet[UDP].dport, server_udp_port)
2251 self.udp_port_out = packet[UDP].sport
2253 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254 self.icmp_id_out = packet[ICMP].id
2256 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2259 # server1 to server2
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=server1.ip4, dst=server2_nat_ip) /
2263 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=server1.ip4, dst=server2_nat_ip) /
2267 UDP(sport=server_udp_port, dport=self.udp_port_out))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=server2_nat_ip) /
2271 ICMP(id=self.icmp_id_out, type='echo-reply'))
2273 self.pg0.add_stream(pkts)
2274 self.pg_enable_capture(self.pg_interfaces)
2276 capture = self.pg0.get_capture(len(pkts))
2277 for packet in capture:
2279 self.assertEqual(packet[IP].src, server1_nat_ip)
2280 self.assertEqual(packet[IP].dst, server2.ip4)
2281 if packet.haslayer(TCP):
2282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283 self.assertEqual(packet[TCP].sport, server_tcp_port)
2284 self.check_tcp_checksum(packet)
2285 elif packet.haslayer(UDP):
2286 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287 self.assertEqual(packet[UDP].sport, server_udp_port)
2289 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2291 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2294 def test_max_translations_per_user(self):
2295 """ MAX translations per user - recycle the least recently used """
2297 self.nat44_add_address(self.nat_addr)
2298 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2302 # get maximum number of translations per user
2303 nat44_config = self.vapi.nat_show_config()
2305 # send more than maximum number of translations per user packets
2306 pkts_num = nat44_config.max_translations_per_user + 5
2308 for port in range(0, pkts_num):
2309 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311 TCP(sport=1025 + port))
2313 self.pg0.add_stream(pkts)
2314 self.pg_enable_capture(self.pg_interfaces)
2317 # verify number of translated packet
2318 self.pg1.get_capture(pkts_num)
2320 def test_interface_addr(self):
2321 """ Acquire NAT44 addresses from interface """
2322 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2324 # no address in NAT pool
2325 adresses = self.vapi.nat44_address_dump()
2326 self.assertEqual(0, len(adresses))
2328 # configure interface address and check NAT address pool
2329 self.pg7.config_ip4()
2330 adresses = self.vapi.nat44_address_dump()
2331 self.assertEqual(1, len(adresses))
2332 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2334 # remove interface address and check NAT address pool
2335 self.pg7.unconfig_ip4()
2336 adresses = self.vapi.nat44_address_dump()
2337 self.assertEqual(0, len(adresses))
2339 def test_interface_addr_static_mapping(self):
2340 """ Static mapping with addresses from interface """
2341 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342 self.nat44_add_static_mapping(
2344 external_sw_if_index=self.pg7.sw_if_index)
2346 # static mappings with external interface
2347 static_mappings = self.vapi.nat44_static_mapping_dump()
2348 self.assertEqual(1, len(static_mappings))
2349 self.assertEqual(self.pg7.sw_if_index,
2350 static_mappings[0].external_sw_if_index)
2352 # configure interface address and check static mappings
2353 self.pg7.config_ip4()
2354 static_mappings = self.vapi.nat44_static_mapping_dump()
2355 self.assertEqual(1, len(static_mappings))
2356 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357 self.pg7.local_ip4n)
2358 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2360 # remove interface address and check static mappings
2361 self.pg7.unconfig_ip4()
2362 static_mappings = self.vapi.nat44_static_mapping_dump()
2363 self.assertEqual(0, len(static_mappings))
2365 def test_interface_addr_identity_nat(self):
2366 """ Identity NAT with addresses from interface """
2369 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370 self.vapi.nat44_add_del_identity_mapping(
2371 sw_if_index=self.pg7.sw_if_index,
2373 protocol=IP_PROTOS.tcp,
2376 # identity mappings with external interface
2377 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378 self.assertEqual(1, len(identity_mappings))
2379 self.assertEqual(self.pg7.sw_if_index,
2380 identity_mappings[0].sw_if_index)
2382 # configure interface address and check identity mappings
2383 self.pg7.config_ip4()
2384 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385 self.assertEqual(1, len(identity_mappings))
2386 self.assertEqual(identity_mappings[0].ip_address,
2387 self.pg7.local_ip4n)
2388 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389 self.assertEqual(port, identity_mappings[0].port)
2390 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2392 # remove interface address and check identity mappings
2393 self.pg7.unconfig_ip4()
2394 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395 self.assertEqual(0, len(identity_mappings))
2397 def test_ipfix_nat44_sess(self):
2398 """ IPFIX logging NAT44 session created/delted """
2399 self.ipfix_domain_id = 10
2400 self.ipfix_src_port = 20202
2401 colector_port = 30303
2402 bind_layers(UDP, IPFIX, dport=30303)
2403 self.nat44_add_address(self.nat_addr)
2404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2407 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408 src_address=self.pg3.local_ip4n,
2410 template_interval=10,
2411 collector_port=colector_port)
2412 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413 src_port=self.ipfix_src_port)
2415 pkts = self.create_stream_in(self.pg0, self.pg1)
2416 self.pg0.add_stream(pkts)
2417 self.pg_enable_capture(self.pg_interfaces)
2419 capture = self.pg1.get_capture(len(pkts))
2420 self.verify_capture_out(capture)
2421 self.nat44_add_address(self.nat_addr, is_add=0)
2422 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2423 capture = self.pg3.get_capture(9)
2424 ipfix = IPFIXDecoder()
2425 # first load template
2427 self.assertTrue(p.haslayer(IPFIX))
2428 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431 self.assertEqual(p[UDP].dport, colector_port)
2432 self.assertEqual(p[IPFIX].observationDomainID,
2433 self.ipfix_domain_id)
2434 if p.haslayer(Template):
2435 ipfix.add_template(p.getlayer(Template))
2436 # verify events in data set
2438 if p.haslayer(Data):
2439 data = ipfix.decode_data_set(p.getlayer(Set))
2440 self.verify_ipfix_nat44_ses(data)
2442 def test_ipfix_addr_exhausted(self):
2443 """ IPFIX logging NAT addresses exhausted """
2444 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2447 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448 src_address=self.pg3.local_ip4n,
2450 template_interval=10)
2451 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452 src_port=self.ipfix_src_port)
2454 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2457 self.pg0.add_stream(p)
2458 self.pg_enable_capture(self.pg_interfaces)
2460 capture = self.pg1.get_capture(0)
2461 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2462 capture = self.pg3.get_capture(9)
2463 ipfix = IPFIXDecoder()
2464 # first load template
2466 self.assertTrue(p.haslayer(IPFIX))
2467 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470 self.assertEqual(p[UDP].dport, 4739)
2471 self.assertEqual(p[IPFIX].observationDomainID,
2472 self.ipfix_domain_id)
2473 if p.haslayer(Template):
2474 ipfix.add_template(p.getlayer(Template))
2475 # verify events in data set
2477 if p.haslayer(Data):
2478 data = ipfix.decode_data_set(p.getlayer(Set))
2479 self.verify_ipfix_addr_exhausted(data)
2481 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482 def test_ipfix_max_sessions(self):
2483 """ IPFIX logging maximum session entries exceeded """
2484 self.nat44_add_address(self.nat_addr)
2485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2489 nat44_config = self.vapi.nat_show_config()
2490 max_sessions = 10 * nat44_config.translation_buckets
2493 for i in range(0, max_sessions):
2494 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496 IP(src=src, dst=self.pg1.remote_ip4) /
2499 self.pg0.add_stream(pkts)
2500 self.pg_enable_capture(self.pg_interfaces)
2503 self.pg1.get_capture(max_sessions)
2504 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505 src_address=self.pg3.local_ip4n,
2507 template_interval=10)
2508 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509 src_port=self.ipfix_src_port)
2511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2514 self.pg0.add_stream(p)
2515 self.pg_enable_capture(self.pg_interfaces)
2517 self.pg1.get_capture(0)
2518 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2519 capture = self.pg3.get_capture(9)
2520 ipfix = IPFIXDecoder()
2521 # first load template
2523 self.assertTrue(p.haslayer(IPFIX))
2524 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527 self.assertEqual(p[UDP].dport, 4739)
2528 self.assertEqual(p[IPFIX].observationDomainID,
2529 self.ipfix_domain_id)
2530 if p.haslayer(Template):
2531 ipfix.add_template(p.getlayer(Template))
2532 # verify events in data set
2534 if p.haslayer(Data):
2535 data = ipfix.decode_data_set(p.getlayer(Set))
2536 self.verify_ipfix_max_sessions(data, max_sessions)
2538 def test_pool_addr_fib(self):
2539 """ NAT44 add pool addresses to FIB """
2540 static_addr = '10.0.0.10'
2541 self.nat44_add_address(self.nat_addr)
2542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2545 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2548 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549 ARP(op=ARP.who_has, pdst=self.nat_addr,
2550 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551 self.pg1.add_stream(p)
2552 self.pg_enable_capture(self.pg_interfaces)
2554 capture = self.pg1.get_capture(1)
2555 self.assertTrue(capture[0].haslayer(ARP))
2556 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2559 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560 ARP(op=ARP.who_has, pdst=static_addr,
2561 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562 self.pg1.add_stream(p)
2563 self.pg_enable_capture(self.pg_interfaces)
2565 capture = self.pg1.get_capture(1)
2566 self.assertTrue(capture[0].haslayer(ARP))
2567 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2569 # send ARP to non-NAT44 interface
2570 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571 ARP(op=ARP.who_has, pdst=self.nat_addr,
2572 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573 self.pg2.add_stream(p)
2574 self.pg_enable_capture(self.pg_interfaces)
2576 capture = self.pg1.get_capture(0)
2578 # remove addresses and verify
2579 self.nat44_add_address(self.nat_addr, is_add=0)
2580 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2583 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584 ARP(op=ARP.who_has, pdst=self.nat_addr,
2585 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586 self.pg1.add_stream(p)
2587 self.pg_enable_capture(self.pg_interfaces)
2589 capture = self.pg1.get_capture(0)
2591 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592 ARP(op=ARP.who_has, pdst=static_addr,
2593 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594 self.pg1.add_stream(p)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 capture = self.pg1.get_capture(0)
2599 def test_vrf_mode(self):
2600 """ NAT44 tenant VRF aware address pool mode """
2604 nat_ip1 = "10.0.0.10"
2605 nat_ip2 = "10.0.0.11"
2607 self.pg0.unconfig_ip4()
2608 self.pg1.unconfig_ip4()
2609 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611 self.pg0.set_table_ip4(vrf_id1)
2612 self.pg1.set_table_ip4(vrf_id2)
2613 self.pg0.config_ip4()
2614 self.pg1.config_ip4()
2616 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2624 pkts = self.create_stream_in(self.pg0, self.pg2)
2625 self.pg0.add_stream(pkts)
2626 self.pg_enable_capture(self.pg_interfaces)
2628 capture = self.pg2.get_capture(len(pkts))
2629 self.verify_capture_out(capture, nat_ip1)
2632 pkts = self.create_stream_in(self.pg1, self.pg2)
2633 self.pg1.add_stream(pkts)
2634 self.pg_enable_capture(self.pg_interfaces)
2636 capture = self.pg2.get_capture(len(pkts))
2637 self.verify_capture_out(capture, nat_ip2)
2639 self.pg0.unconfig_ip4()
2640 self.pg1.unconfig_ip4()
2641 self.pg0.set_table_ip4(0)
2642 self.pg1.set_table_ip4(0)
2643 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2646 def test_vrf_feature_independent(self):
2647 """ NAT44 tenant VRF independent address pool mode """
2649 nat_ip1 = "10.0.0.10"
2650 nat_ip2 = "10.0.0.11"
2652 self.nat44_add_address(nat_ip1)
2653 self.nat44_add_address(nat_ip2, vrf_id=99)
2654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2660 pkts = self.create_stream_in(self.pg0, self.pg2)
2661 self.pg0.add_stream(pkts)
2662 self.pg_enable_capture(self.pg_interfaces)
2664 capture = self.pg2.get_capture(len(pkts))
2665 self.verify_capture_out(capture, nat_ip1)
2668 pkts = self.create_stream_in(self.pg1, self.pg2)
2669 self.pg1.add_stream(pkts)
2670 self.pg_enable_capture(self.pg_interfaces)
2672 capture = self.pg2.get_capture(len(pkts))
2673 self.verify_capture_out(capture, nat_ip1)
2675 def test_dynamic_ipless_interfaces(self):
2676 """ NAT44 interfaces without configured IP address """
2678 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679 mactobinary(self.pg7.remote_mac),
2680 self.pg7.remote_ip4n,
2682 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683 mactobinary(self.pg8.remote_mac),
2684 self.pg8.remote_ip4n,
2687 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688 dst_address_length=32,
2689 next_hop_address=self.pg7.remote_ip4n,
2690 next_hop_sw_if_index=self.pg7.sw_if_index)
2691 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692 dst_address_length=32,
2693 next_hop_address=self.pg8.remote_ip4n,
2694 next_hop_sw_if_index=self.pg8.sw_if_index)
2696 self.nat44_add_address(self.nat_addr)
2697 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2702 pkts = self.create_stream_in(self.pg7, self.pg8)
2703 self.pg7.add_stream(pkts)
2704 self.pg_enable_capture(self.pg_interfaces)
2706 capture = self.pg8.get_capture(len(pkts))
2707 self.verify_capture_out(capture)
2710 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711 self.pg8.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg7.get_capture(len(pkts))
2715 self.verify_capture_in(capture, self.pg7)
2717 def test_static_ipless_interfaces(self):
2718 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2720 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721 mactobinary(self.pg7.remote_mac),
2722 self.pg7.remote_ip4n,
2724 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725 mactobinary(self.pg8.remote_mac),
2726 self.pg8.remote_ip4n,
2729 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730 dst_address_length=32,
2731 next_hop_address=self.pg7.remote_ip4n,
2732 next_hop_sw_if_index=self.pg7.sw_if_index)
2733 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg8.remote_ip4n,
2736 next_hop_sw_if_index=self.pg8.sw_if_index)
2738 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2744 pkts = self.create_stream_out(self.pg8)
2745 self.pg8.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg7.get_capture(len(pkts))
2749 self.verify_capture_in(capture, self.pg7)
2752 pkts = self.create_stream_in(self.pg7, self.pg8)
2753 self.pg7.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg8.get_capture(len(pkts))
2757 self.verify_capture_out(capture, self.nat_addr, True)
2759 def test_static_with_port_ipless_interfaces(self):
2760 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2762 self.tcp_port_out = 30606
2763 self.udp_port_out = 30607
2764 self.icmp_id_out = 30608
2766 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767 mactobinary(self.pg7.remote_mac),
2768 self.pg7.remote_ip4n,
2770 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771 mactobinary(self.pg8.remote_mac),
2772 self.pg8.remote_ip4n,
2775 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776 dst_address_length=32,
2777 next_hop_address=self.pg7.remote_ip4n,
2778 next_hop_sw_if_index=self.pg7.sw_if_index)
2779 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780 dst_address_length=32,
2781 next_hop_address=self.pg8.remote_ip4n,
2782 next_hop_sw_if_index=self.pg8.sw_if_index)
2784 self.nat44_add_address(self.nat_addr)
2785 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786 self.tcp_port_in, self.tcp_port_out,
2787 proto=IP_PROTOS.tcp)
2788 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789 self.udp_port_in, self.udp_port_out,
2790 proto=IP_PROTOS.udp)
2791 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792 self.icmp_id_in, self.icmp_id_out,
2793 proto=IP_PROTOS.icmp)
2794 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2799 pkts = self.create_stream_out(self.pg8)
2800 self.pg8.add_stream(pkts)
2801 self.pg_enable_capture(self.pg_interfaces)
2803 capture = self.pg7.get_capture(len(pkts))
2804 self.verify_capture_in(capture, self.pg7)
2807 pkts = self.create_stream_in(self.pg7, self.pg8)
2808 self.pg7.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 capture = self.pg8.get_capture(len(pkts))
2812 self.verify_capture_out(capture)
2814 def test_static_unknown_proto(self):
2815 """ 1:1 NAT translate packet with unknown protocol """
2816 nat_ip = "10.0.0.10"
2817 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2823 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2826 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827 TCP(sport=1234, dport=1234))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 p = self.pg1.get_capture(1)
2834 self.assertEqual(packet[IP].src, nat_ip)
2835 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836 self.assertTrue(packet.haslayer(GRE))
2837 self.check_ip_checksum(packet)
2839 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2843 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2846 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847 TCP(sport=1234, dport=1234))
2848 self.pg1.add_stream(p)
2849 self.pg_enable_capture(self.pg_interfaces)
2851 p = self.pg0.get_capture(1)
2854 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856 self.assertTrue(packet.haslayer(GRE))
2857 self.check_ip_checksum(packet)
2859 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2862 def test_hairpinning_static_unknown_proto(self):
2863 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2865 host = self.pg0.remote_hosts[0]
2866 server = self.pg0.remote_hosts[1]
2868 host_nat_ip = "10.0.0.10"
2869 server_nat_ip = "10.0.0.11"
2871 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2878 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879 IP(src=host.ip4, dst=server_nat_ip) /
2881 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882 TCP(sport=1234, dport=1234))
2883 self.pg0.add_stream(p)
2884 self.pg_enable_capture(self.pg_interfaces)
2886 p = self.pg0.get_capture(1)
2889 self.assertEqual(packet[IP].src, host_nat_ip)
2890 self.assertEqual(packet[IP].dst, server.ip4)
2891 self.assertTrue(packet.haslayer(GRE))
2892 self.check_ip_checksum(packet)
2894 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2898 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899 IP(src=server.ip4, dst=host_nat_ip) /
2901 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902 TCP(sport=1234, dport=1234))
2903 self.pg0.add_stream(p)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 p = self.pg0.get_capture(1)
2909 self.assertEqual(packet[IP].src, server_nat_ip)
2910 self.assertEqual(packet[IP].dst, host.ip4)
2911 self.assertTrue(packet.haslayer(GRE))
2912 self.check_ip_checksum(packet)
2914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2917 def test_unknown_proto(self):
2918 """ NAT44 translate packet with unknown protocol """
2919 self.nat44_add_address(self.nat_addr)
2920 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2925 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927 TCP(sport=self.tcp_port_in, dport=20))
2928 self.pg0.add_stream(p)
2929 self.pg_enable_capture(self.pg_interfaces)
2931 p = self.pg1.get_capture(1)
2933 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2936 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937 TCP(sport=1234, dport=1234))
2938 self.pg0.add_stream(p)
2939 self.pg_enable_capture(self.pg_interfaces)
2941 p = self.pg1.get_capture(1)
2944 self.assertEqual(packet[IP].src, self.nat_addr)
2945 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946 self.assertTrue(packet.haslayer(GRE))
2947 self.check_ip_checksum(packet)
2949 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2953 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2956 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957 TCP(sport=1234, dport=1234))
2958 self.pg1.add_stream(p)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 p = self.pg0.get_capture(1)
2964 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966 self.assertTrue(packet.haslayer(GRE))
2967 self.check_ip_checksum(packet)
2969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2972 def test_hairpinning_unknown_proto(self):
2973 """ NAT44 translate packet with unknown protocol - hairpinning """
2974 host = self.pg0.remote_hosts[0]
2975 server = self.pg0.remote_hosts[1]
2978 server_in_port = 5678
2979 server_out_port = 8765
2980 server_nat_ip = "10.0.0.11"
2982 self.nat44_add_address(self.nat_addr)
2983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2987 # add static mapping for server
2988 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2991 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992 IP(src=host.ip4, dst=server_nat_ip) /
2993 TCP(sport=host_in_port, dport=server_out_port))
2994 self.pg0.add_stream(p)
2995 self.pg_enable_capture(self.pg_interfaces)
2997 capture = self.pg0.get_capture(1)
2999 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000 IP(src=host.ip4, dst=server_nat_ip) /
3002 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003 TCP(sport=1234, dport=1234))
3004 self.pg0.add_stream(p)
3005 self.pg_enable_capture(self.pg_interfaces)
3007 p = self.pg0.get_capture(1)
3010 self.assertEqual(packet[IP].src, self.nat_addr)
3011 self.assertEqual(packet[IP].dst, server.ip4)
3012 self.assertTrue(packet.haslayer(GRE))
3013 self.check_ip_checksum(packet)
3015 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3019 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020 IP(src=server.ip4, dst=self.nat_addr) /
3022 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg0.get_capture(1)
3030 self.assertEqual(packet[IP].src, server_nat_ip)
3031 self.assertEqual(packet[IP].dst, host.ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3038 def test_output_feature(self):
3039 """ NAT44 interface output feature (in2out postrouting) """
3040 self.nat44_add_address(self.nat_addr)
3041 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3047 pkts = self.create_stream_in(self.pg0, self.pg3)
3048 self.pg0.add_stream(pkts)
3049 self.pg_enable_capture(self.pg_interfaces)
3051 capture = self.pg3.get_capture(len(pkts))
3052 self.verify_capture_out(capture)
3055 pkts = self.create_stream_out(self.pg3)
3056 self.pg3.add_stream(pkts)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 capture = self.pg0.get_capture(len(pkts))
3060 self.verify_capture_in(capture, self.pg0)
3062 # from non-NAT interface to NAT inside interface
3063 pkts = self.create_stream_in(self.pg2, self.pg0)
3064 self.pg2.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 capture = self.pg0.get_capture(len(pkts))
3068 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3070 def test_output_feature_vrf_aware(self):
3071 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072 nat_ip_vrf10 = "10.0.0.10"
3073 nat_ip_vrf20 = "10.0.0.20"
3075 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076 dst_address_length=32,
3077 next_hop_address=self.pg3.remote_ip4n,
3078 next_hop_sw_if_index=self.pg3.sw_if_index,
3080 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081 dst_address_length=32,
3082 next_hop_address=self.pg3.remote_ip4n,
3083 next_hop_sw_if_index=self.pg3.sw_if_index,
3086 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3094 pkts = self.create_stream_in(self.pg4, self.pg3)
3095 self.pg4.add_stream(pkts)
3096 self.pg_enable_capture(self.pg_interfaces)
3098 capture = self.pg3.get_capture(len(pkts))
3099 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3102 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103 self.pg3.add_stream(pkts)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg4.get_capture(len(pkts))
3107 self.verify_capture_in(capture, self.pg4)
3110 pkts = self.create_stream_in(self.pg6, self.pg3)
3111 self.pg6.add_stream(pkts)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 capture = self.pg3.get_capture(len(pkts))
3115 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3118 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119 self.pg3.add_stream(pkts)
3120 self.pg_enable_capture(self.pg_interfaces)
3122 capture = self.pg6.get_capture(len(pkts))
3123 self.verify_capture_in(capture, self.pg6)
3125 def test_output_feature_hairpinning(self):
3126 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127 host = self.pg0.remote_hosts[0]
3128 server = self.pg0.remote_hosts[1]
3131 server_in_port = 5678
3132 server_out_port = 8765
3134 self.nat44_add_address(self.nat_addr)
3135 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3139 # add static mapping for server
3140 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141 server_in_port, server_out_port,
3142 proto=IP_PROTOS.tcp)
3144 # send packet from host to server
3145 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146 IP(src=host.ip4, dst=self.nat_addr) /
3147 TCP(sport=host_in_port, dport=server_out_port))
3148 self.pg0.add_stream(p)
3149 self.pg_enable_capture(self.pg_interfaces)
3151 capture = self.pg0.get_capture(1)
3156 self.assertEqual(ip.src, self.nat_addr)
3157 self.assertEqual(ip.dst, server.ip4)
3158 self.assertNotEqual(tcp.sport, host_in_port)
3159 self.assertEqual(tcp.dport, server_in_port)
3160 self.check_tcp_checksum(p)
3161 host_out_port = tcp.sport
3163 self.logger.error(ppp("Unexpected or invalid packet:", p))
3166 # send reply from server to host
3167 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168 IP(src=server.ip4, dst=self.nat_addr) /
3169 TCP(sport=server_in_port, dport=host_out_port))
3170 self.pg0.add_stream(p)
3171 self.pg_enable_capture(self.pg_interfaces)
3173 capture = self.pg0.get_capture(1)
3178 self.assertEqual(ip.src, self.nat_addr)
3179 self.assertEqual(ip.dst, host.ip4)
3180 self.assertEqual(tcp.sport, server_out_port)
3181 self.assertEqual(tcp.dport, host_in_port)
3182 self.check_tcp_checksum(p)
3184 self.logger.error(ppp("Unexpected or invalid packet:", p))
3187 def test_one_armed_nat44(self):
3188 """ One armed NAT44 """
3189 remote_host = self.pg9.remote_hosts[0]
3190 local_host = self.pg9.remote_hosts[1]
3193 self.nat44_add_address(self.nat_addr)
3194 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3199 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200 IP(src=local_host.ip4, dst=remote_host.ip4) /
3201 TCP(sport=12345, dport=80))
3202 self.pg9.add_stream(p)
3203 self.pg_enable_capture(self.pg_interfaces)
3205 capture = self.pg9.get_capture(1)
3210 self.assertEqual(ip.src, self.nat_addr)
3211 self.assertEqual(ip.dst, remote_host.ip4)
3212 self.assertNotEqual(tcp.sport, 12345)
3213 external_port = tcp.sport
3214 self.assertEqual(tcp.dport, 80)
3215 self.check_tcp_checksum(p)
3216 self.check_ip_checksum(p)
3218 self.logger.error(ppp("Unexpected or invalid packet:", p))
3222 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223 IP(src=remote_host.ip4, dst=self.nat_addr) /
3224 TCP(sport=80, dport=external_port))
3225 self.pg9.add_stream(p)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 capture = self.pg9.get_capture(1)
3233 self.assertEqual(ip.src, remote_host.ip4)
3234 self.assertEqual(ip.dst, local_host.ip4)
3235 self.assertEqual(tcp.sport, 80)
3236 self.assertEqual(tcp.dport, 12345)
3237 self.check_tcp_checksum(p)
3238 self.check_ip_checksum(p)
3240 self.logger.error(ppp("Unexpected or invalid packet:", p))
3243 def test_one_armed_nat44_static(self):
3244 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3245 remote_host = self.pg9.remote_hosts[0]
3246 local_host = self.pg9.remote_hosts[1]
3251 self.vapi.nat44_forwarding_enable_disable(1)
3252 self.nat44_add_address(self.nat_addr, twice_nat=1)
3253 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3254 local_port, external_port,
3255 proto=IP_PROTOS.tcp, out2in_only=1,
3257 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3258 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3261 # from client to service
3262 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3263 IP(src=remote_host.ip4, dst=self.nat_addr) /
3264 TCP(sport=12345, dport=external_port))
3265 self.pg9.add_stream(p)
3266 self.pg_enable_capture(self.pg_interfaces)
3268 capture = self.pg9.get_capture(1)
3274 self.assertEqual(ip.dst, local_host.ip4)
3275 self.assertEqual(ip.src, self.nat_addr)
3276 self.assertEqual(tcp.dport, local_port)
3277 self.assertNotEqual(tcp.sport, 12345)
3278 eh_port_in = tcp.sport
3279 self.check_tcp_checksum(p)
3280 self.check_ip_checksum(p)
3282 self.logger.error(ppp("Unexpected or invalid packet:", p))
3285 # from service back to client
3286 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3287 IP(src=local_host.ip4, dst=self.nat_addr) /
3288 TCP(sport=local_port, dport=eh_port_in))
3289 self.pg9.add_stream(p)
3290 self.pg_enable_capture(self.pg_interfaces)
3292 capture = self.pg9.get_capture(1)
3297 self.assertEqual(ip.src, self.nat_addr)
3298 self.assertEqual(ip.dst, remote_host.ip4)
3299 self.assertEqual(tcp.sport, external_port)
3300 self.assertEqual(tcp.dport, 12345)
3301 self.check_tcp_checksum(p)
3302 self.check_ip_checksum(p)
3304 self.logger.error(ppp("Unexpected or invalid packet:", p))
3307 def test_del_session(self):
3308 """ Delete NAT44 session """
3309 self.nat44_add_address(self.nat_addr)
3310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3314 pkts = self.create_stream_in(self.pg0, self.pg1)
3315 self.pg0.add_stream(pkts)
3316 self.pg_enable_capture(self.pg_interfaces)
3318 capture = self.pg1.get_capture(len(pkts))
3320 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3321 nsessions = len(sessions)
3323 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3324 sessions[0].inside_port,
3325 sessions[0].protocol)
3326 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3327 sessions[1].outside_port,
3328 sessions[1].protocol,
3331 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3332 self.assertEqual(nsessions - len(sessions), 2)
3334 def test_set_get_reass(self):
3335 """ NAT44 set/get virtual fragmentation reassembly """
3336 reas_cfg1 = self.vapi.nat_get_reass()
3338 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3339 max_reass=reas_cfg1.ip4_max_reass * 2,
3340 max_frag=reas_cfg1.ip4_max_frag * 2)
3342 reas_cfg2 = self.vapi.nat_get_reass()
3344 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3345 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3346 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3348 self.vapi.nat_set_reass(drop_frag=1)
3349 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3351 def test_frag_in_order(self):
3352 """ NAT44 translate fragments arriving in order """
3353 self.nat44_add_address(self.nat_addr)
3354 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3355 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3358 data = "A" * 4 + "B" * 16 + "C" * 3
3359 self.tcp_port_in = random.randint(1025, 65535)
3361 reass = self.vapi.nat_reass_dump()
3362 reass_n_start = len(reass)
3365 pkts = self.create_stream_frag(self.pg0,
3366 self.pg1.remote_ip4,
3370 self.pg0.add_stream(pkts)
3371 self.pg_enable_capture(self.pg_interfaces)
3373 frags = self.pg1.get_capture(len(pkts))
3374 p = self.reass_frags_and_verify(frags,
3376 self.pg1.remote_ip4)
3377 self.assertEqual(p[TCP].dport, 20)
3378 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3379 self.tcp_port_out = p[TCP].sport
3380 self.assertEqual(data, p[Raw].load)
3383 pkts = self.create_stream_frag(self.pg1,
3388 self.pg1.add_stream(pkts)
3389 self.pg_enable_capture(self.pg_interfaces)
3391 frags = self.pg0.get_capture(len(pkts))
3392 p = self.reass_frags_and_verify(frags,
3393 self.pg1.remote_ip4,
3394 self.pg0.remote_ip4)
3395 self.assertEqual(p[TCP].sport, 20)
3396 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3397 self.assertEqual(data, p[Raw].load)
3399 reass = self.vapi.nat_reass_dump()
3400 reass_n_end = len(reass)
3402 self.assertEqual(reass_n_end - reass_n_start, 2)
3404 def test_reass_hairpinning(self):
3405 """ NAT44 fragments hairpinning """
3406 host = self.pg0.remote_hosts[0]
3407 server = self.pg0.remote_hosts[1]
3408 host_in_port = random.randint(1025, 65535)
3410 server_in_port = random.randint(1025, 65535)
3411 server_out_port = random.randint(1025, 65535)
3412 data = "A" * 4 + "B" * 16 + "C" * 3
3414 self.nat44_add_address(self.nat_addr)
3415 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3416 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3418 # add static mapping for server
3419 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3420 server_in_port, server_out_port,
3421 proto=IP_PROTOS.tcp)
3423 # send packet from host to server
3424 pkts = self.create_stream_frag(self.pg0,
3429 self.pg0.add_stream(pkts)
3430 self.pg_enable_capture(self.pg_interfaces)
3432 frags = self.pg0.get_capture(len(pkts))
3433 p = self.reass_frags_and_verify(frags,
3436 self.assertNotEqual(p[TCP].sport, host_in_port)
3437 self.assertEqual(p[TCP].dport, server_in_port)
3438 self.assertEqual(data, p[Raw].load)
3440 def test_frag_out_of_order(self):
3441 """ NAT44 translate fragments arriving out of order """
3442 self.nat44_add_address(self.nat_addr)
3443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3447 data = "A" * 4 + "B" * 16 + "C" * 3
3448 random.randint(1025, 65535)
3451 pkts = self.create_stream_frag(self.pg0,
3452 self.pg1.remote_ip4,
3457 self.pg0.add_stream(pkts)
3458 self.pg_enable_capture(self.pg_interfaces)
3460 frags = self.pg1.get_capture(len(pkts))
3461 p = self.reass_frags_and_verify(frags,
3463 self.pg1.remote_ip4)
3464 self.assertEqual(p[TCP].dport, 20)
3465 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3466 self.tcp_port_out = p[TCP].sport
3467 self.assertEqual(data, p[Raw].load)
3470 pkts = self.create_stream_frag(self.pg1,
3476 self.pg1.add_stream(pkts)
3477 self.pg_enable_capture(self.pg_interfaces)
3479 frags = self.pg0.get_capture(len(pkts))
3480 p = self.reass_frags_and_verify(frags,
3481 self.pg1.remote_ip4,
3482 self.pg0.remote_ip4)
3483 self.assertEqual(p[TCP].sport, 20)
3484 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3485 self.assertEqual(data, p[Raw].load)
3487 def test_port_restricted(self):
3488 """ Port restricted NAT44 (MAP-E CE) """
3489 self.nat44_add_address(self.nat_addr)
3490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3493 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3494 "psid-offset 6 psid-len 6")
3496 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3497 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3498 TCP(sport=4567, dport=22))
3499 self.pg0.add_stream(p)
3500 self.pg_enable_capture(self.pg_interfaces)
3502 capture = self.pg1.get_capture(1)
3507 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3508 self.assertEqual(ip.src, self.nat_addr)
3509 self.assertEqual(tcp.dport, 22)
3510 self.assertNotEqual(tcp.sport, 4567)
3511 self.assertEqual((tcp.sport >> 6) & 63, 10)
3512 self.check_tcp_checksum(p)
3513 self.check_ip_checksum(p)
3515 self.logger.error(ppp("Unexpected or invalid packet:", p))
3518 def test_twice_nat(self):
3520 twice_nat_addr = '10.0.1.3'
3525 self.nat44_add_address(self.nat_addr)
3526 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3527 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3528 port_in, port_out, proto=IP_PROTOS.tcp,
3530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3534 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3535 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3536 TCP(sport=eh_port_out, dport=port_out))
3537 self.pg1.add_stream(p)
3538 self.pg_enable_capture(self.pg_interfaces)
3540 capture = self.pg0.get_capture(1)
3545 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3546 self.assertEqual(ip.src, twice_nat_addr)
3547 self.assertEqual(tcp.dport, port_in)
3548 self.assertNotEqual(tcp.sport, eh_port_out)
3549 eh_port_in = tcp.sport
3550 self.check_tcp_checksum(p)
3551 self.check_ip_checksum(p)
3553 self.logger.error(ppp("Unexpected or invalid packet:", p))
3556 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3557 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3558 TCP(sport=port_in, dport=eh_port_in))
3559 self.pg0.add_stream(p)
3560 self.pg_enable_capture(self.pg_interfaces)
3562 capture = self.pg1.get_capture(1)
3567 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3568 self.assertEqual(ip.src, self.nat_addr)
3569 self.assertEqual(tcp.dport, eh_port_out)
3570 self.assertEqual(tcp.sport, port_out)
3571 self.check_tcp_checksum(p)
3572 self.check_ip_checksum(p)
3574 self.logger.error(ppp("Unexpected or invalid packet:", p))
3577 def test_twice_nat_lb(self):
3578 """ Twice NAT44 local service load balancing """
3579 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3580 twice_nat_addr = '10.0.1.3'
3585 server1 = self.pg0.remote_hosts[0]
3586 server2 = self.pg0.remote_hosts[1]
3588 locals = [{'addr': server1.ip4n,
3591 {'addr': server2.ip4n,
3595 self.nat44_add_address(self.nat_addr)
3596 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3598 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3602 local_num=len(locals),
3604 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3605 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3608 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3609 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3610 TCP(sport=eh_port_out, dport=external_port))
3611 self.pg1.add_stream(p)
3612 self.pg_enable_capture(self.pg_interfaces)
3614 capture = self.pg0.get_capture(1)
3620 self.assertEqual(ip.src, twice_nat_addr)
3621 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3622 if ip.dst == server1.ip4:
3626 self.assertNotEqual(tcp.sport, eh_port_out)
3627 eh_port_in = tcp.sport
3628 self.assertEqual(tcp.dport, local_port)
3629 self.check_tcp_checksum(p)
3630 self.check_ip_checksum(p)
3632 self.logger.error(ppp("Unexpected or invalid packet:", p))
3635 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3636 IP(src=server.ip4, dst=twice_nat_addr) /
3637 TCP(sport=local_port, dport=eh_port_in))
3638 self.pg0.add_stream(p)
3639 self.pg_enable_capture(self.pg_interfaces)
3641 capture = self.pg1.get_capture(1)
3646 self.assertEqual(ip.src, self.nat_addr)
3647 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3648 self.assertEqual(tcp.sport, external_port)
3649 self.assertEqual(tcp.dport, eh_port_out)
3650 self.check_tcp_checksum(p)
3651 self.check_ip_checksum(p)
3653 self.logger.error(ppp("Unexpected or invalid packet:", p))
3656 def test_twice_nat_interface_addr(self):
3657 """ Acquire twice NAT44 addresses from interface """
3658 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3660 # no address in NAT pool
3661 adresses = self.vapi.nat44_address_dump()
3662 self.assertEqual(0, len(adresses))
3664 # configure interface address and check NAT address pool
3665 self.pg7.config_ip4()
3666 adresses = self.vapi.nat44_address_dump()
3667 self.assertEqual(1, len(adresses))
3668 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3669 self.assertEqual(adresses[0].twice_nat, 1)
3671 # remove interface address and check NAT address pool
3672 self.pg7.unconfig_ip4()
3673 adresses = self.vapi.nat44_address_dump()
3674 self.assertEqual(0, len(adresses))
3676 def test_ipfix_max_frags(self):
3677 """ IPFIX logging maximum fragments pending reassembly exceeded """
3678 self.nat44_add_address(self.nat_addr)
3679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3682 self.vapi.nat_set_reass(max_frag=0)
3683 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3684 src_address=self.pg3.local_ip4n,
3686 template_interval=10)
3687 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3688 src_port=self.ipfix_src_port)
3690 data = "A" * 4 + "B" * 16 + "C" * 3
3691 self.tcp_port_in = random.randint(1025, 65535)
3692 pkts = self.create_stream_frag(self.pg0,
3693 self.pg1.remote_ip4,
3697 self.pg0.add_stream(pkts[-1])
3698 self.pg_enable_capture(self.pg_interfaces)
3700 frags = self.pg1.get_capture(0)
3701 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3702 capture = self.pg3.get_capture(9)
3703 ipfix = IPFIXDecoder()
3704 # first load template
3706 self.assertTrue(p.haslayer(IPFIX))
3707 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3708 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3709 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3710 self.assertEqual(p[UDP].dport, 4739)
3711 self.assertEqual(p[IPFIX].observationDomainID,
3712 self.ipfix_domain_id)
3713 if p.haslayer(Template):
3714 ipfix.add_template(p.getlayer(Template))
3715 # verify events in data set
3717 if p.haslayer(Data):
3718 data = ipfix.decode_data_set(p.getlayer(Set))
3719 self.verify_ipfix_max_fragments_ip4(data, 0,
3720 self.pg0.remote_ip4n)
3723 super(TestNAT44, self).tearDown()
3724 if not self.vpp_dead:
3725 self.logger.info(self.vapi.cli("show nat44 verbose"))
3726 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3727 self.vapi.cli("nat addr-port-assignment-alg default")
3731 class TestNAT44Out2InDPO(MethodHolder):
3732 """ NAT44 Test Cases using out2in DPO """
3735 def setUpConstants(cls):
3736 super(TestNAT44Out2InDPO, cls).setUpConstants()
3737 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3740 def setUpClass(cls):
3741 super(TestNAT44Out2InDPO, cls).setUpClass()
3744 cls.tcp_port_in = 6303
3745 cls.tcp_port_out = 6303
3746 cls.udp_port_in = 6304
3747 cls.udp_port_out = 6304
3748 cls.icmp_id_in = 6305
3749 cls.icmp_id_out = 6305
3750 cls.nat_addr = '10.0.0.3'
3751 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3752 cls.dst_ip4 = '192.168.70.1'
3754 cls.create_pg_interfaces(range(2))
3757 cls.pg0.config_ip4()
3758 cls.pg0.resolve_arp()
3761 cls.pg1.config_ip6()
3762 cls.pg1.resolve_ndp()
3764 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3765 dst_address_length=0,
3766 next_hop_address=cls.pg1.remote_ip6n,
3767 next_hop_sw_if_index=cls.pg1.sw_if_index)
3770 super(TestNAT44Out2InDPO, cls).tearDownClass()
3773 def configure_xlat(self):
3774 self.dst_ip6_pfx = '1:2:3::'
3775 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3777 self.dst_ip6_pfx_len = 96
3778 self.src_ip6_pfx = '4:5:6::'
3779 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3781 self.src_ip6_pfx_len = 96
3782 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3783 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3784 '\x00\x00\x00\x00', 0, is_translation=1,
3787 def test_464xlat_ce(self):
3788 """ Test 464XLAT CE with NAT44 """
3790 self.configure_xlat()
3792 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3793 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3795 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3796 self.dst_ip6_pfx_len)
3797 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3798 self.src_ip6_pfx_len)
3801 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3802 self.pg0.add_stream(pkts)
3803 self.pg_enable_capture(self.pg_interfaces)
3805 capture = self.pg1.get_capture(len(pkts))
3806 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3809 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3811 self.pg1.add_stream(pkts)
3812 self.pg_enable_capture(self.pg_interfaces)
3814 capture = self.pg0.get_capture(len(pkts))
3815 self.verify_capture_in(capture, self.pg0)
3817 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3819 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3820 self.nat_addr_n, is_add=0)
3822 def test_464xlat_ce_no_nat(self):
3823 """ Test 464XLAT CE without NAT44 """
3825 self.configure_xlat()
3827 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3828 self.dst_ip6_pfx_len)
3829 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3830 self.src_ip6_pfx_len)
3832 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3833 self.pg0.add_stream(pkts)
3834 self.pg_enable_capture(self.pg_interfaces)
3836 capture = self.pg1.get_capture(len(pkts))
3837 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3838 nat_ip=out_dst_ip6, same_port=True)
3840 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3841 self.pg1.add_stream(pkts)
3842 self.pg_enable_capture(self.pg_interfaces)
3844 capture = self.pg0.get_capture(len(pkts))
3845 self.verify_capture_in(capture, self.pg0)
3848 class TestDeterministicNAT(MethodHolder):
3849 """ Deterministic NAT Test Cases """
3852 def setUpConstants(cls):
3853 super(TestDeterministicNAT, cls).setUpConstants()
3854 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3857 def setUpClass(cls):
3858 super(TestDeterministicNAT, cls).setUpClass()
3861 cls.tcp_port_in = 6303
3862 cls.tcp_external_port = 6303
3863 cls.udp_port_in = 6304
3864 cls.udp_external_port = 6304
3865 cls.icmp_id_in = 6305
3866 cls.nat_addr = '10.0.0.3'
3868 cls.create_pg_interfaces(range(3))
3869 cls.interfaces = list(cls.pg_interfaces)
3871 for i in cls.interfaces:
3876 cls.pg0.generate_remote_hosts(2)
3877 cls.pg0.configure_ipv4_neighbors()
3880 super(TestDeterministicNAT, cls).tearDownClass()
3883 def create_stream_in(self, in_if, out_if, ttl=64):
3885 Create packet stream for inside network
3887 :param in_if: Inside interface
3888 :param out_if: Outside interface
3889 :param ttl: TTL of generated packets
3893 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3894 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3895 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3899 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3900 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3901 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3905 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3906 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3907 ICMP(id=self.icmp_id_in, type='echo-request'))
3912 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3914 Create packet stream for outside network
3916 :param out_if: Outside interface
3917 :param dst_ip: Destination IP address (Default use global NAT address)
3918 :param ttl: TTL of generated packets
3921 dst_ip = self.nat_addr
3924 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3925 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3926 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3930 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3931 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3932 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3936 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3937 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3938 ICMP(id=self.icmp_external_id, type='echo-reply'))
3943 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3945 Verify captured packets on outside network
3947 :param capture: Captured packets
3948 :param nat_ip: Translated IP address (Default use global NAT address)
3949 :param same_port: Sorce port number is not translated (Default False)
3950 :param packet_num: Expected number of packets (Default 3)
3953 nat_ip = self.nat_addr
3954 self.assertEqual(packet_num, len(capture))
3955 for packet in capture:
3957 self.assertEqual(packet[IP].src, nat_ip)
3958 if packet.haslayer(TCP):
3959 self.tcp_port_out = packet[TCP].sport
3960 elif packet.haslayer(UDP):
3961 self.udp_port_out = packet[UDP].sport
3963 self.icmp_external_id = packet[ICMP].id
3965 self.logger.error(ppp("Unexpected or invalid packet "
3966 "(outside network):", packet))
3969 def initiate_tcp_session(self, in_if, out_if):
3971 Initiates TCP session
3973 :param in_if: Inside interface
3974 :param out_if: Outside interface
3977 # SYN packet in->out
3978 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3979 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3980 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3983 self.pg_enable_capture(self.pg_interfaces)
3985 capture = out_if.get_capture(1)
3987 self.tcp_port_out = p[TCP].sport
3989 # SYN + ACK packet out->in
3990 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3991 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3992 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3994 out_if.add_stream(p)
3995 self.pg_enable_capture(self.pg_interfaces)
3997 in_if.get_capture(1)
3999 # ACK packet in->out
4000 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4001 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4002 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4005 self.pg_enable_capture(self.pg_interfaces)
4007 out_if.get_capture(1)
4010 self.logger.error("TCP 3 way handshake failed")
4013 def verify_ipfix_max_entries_per_user(self, data):
4015 Verify IPFIX maximum entries per user exceeded event
4017 :param data: Decoded IPFIX data records
4019 self.assertEqual(1, len(data))
4022 self.assertEqual(ord(record[230]), 13)
4023 # natQuotaExceededEvent
4024 self.assertEqual('\x03\x00\x00\x00', record[466])
4026 self.assertEqual('\xe8\x03\x00\x00', record[473])
4028 self.assertEqual(self.pg0.remote_ip4n, record[8])
4030 def test_deterministic_mode(self):
4031 """ NAT plugin run deterministic mode """
4032 in_addr = '172.16.255.0'
4033 out_addr = '172.17.255.50'
4034 in_addr_t = '172.16.255.20'
4035 in_addr_n = socket.inet_aton(in_addr)
4036 out_addr_n = socket.inet_aton(out_addr)
4037 in_addr_t_n = socket.inet_aton(in_addr_t)
4041 nat_config = self.vapi.nat_show_config()
4042 self.assertEqual(1, nat_config.deterministic)
4044 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4046 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4047 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4048 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4049 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4051 deterministic_mappings = self.vapi.nat_det_map_dump()
4052 self.assertEqual(len(deterministic_mappings), 1)
4053 dsm = deterministic_mappings[0]
4054 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4055 self.assertEqual(in_plen, dsm.in_plen)
4056 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4057 self.assertEqual(out_plen, dsm.out_plen)
4059 self.clear_nat_det()
4060 deterministic_mappings = self.vapi.nat_det_map_dump()
4061 self.assertEqual(len(deterministic_mappings), 0)
4063 def test_set_timeouts(self):
4064 """ Set deterministic NAT timeouts """
4065 timeouts_before = self.vapi.nat_det_get_timeouts()
4067 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4068 timeouts_before.tcp_established + 10,
4069 timeouts_before.tcp_transitory + 10,
4070 timeouts_before.icmp + 10)
4072 timeouts_after = self.vapi.nat_det_get_timeouts()
4074 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4075 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4076 self.assertNotEqual(timeouts_before.tcp_established,
4077 timeouts_after.tcp_established)
4078 self.assertNotEqual(timeouts_before.tcp_transitory,
4079 timeouts_after.tcp_transitory)
4081 def test_det_in(self):
4082 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4084 nat_ip = "10.0.0.10"
4086 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4088 socket.inet_aton(nat_ip),
4090 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4091 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4095 pkts = self.create_stream_in(self.pg0, self.pg1)
4096 self.pg0.add_stream(pkts)
4097 self.pg_enable_capture(self.pg_interfaces)
4099 capture = self.pg1.get_capture(len(pkts))
4100 self.verify_capture_out(capture, nat_ip)
4103 pkts = self.create_stream_out(self.pg1, nat_ip)
4104 self.pg1.add_stream(pkts)
4105 self.pg_enable_capture(self.pg_interfaces)
4107 capture = self.pg0.get_capture(len(pkts))
4108 self.verify_capture_in(capture, self.pg0)
4111 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4112 self.assertEqual(len(sessions), 3)
4116 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4117 self.assertEqual(s.in_port, self.tcp_port_in)
4118 self.assertEqual(s.out_port, self.tcp_port_out)
4119 self.assertEqual(s.ext_port, self.tcp_external_port)
4123 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4124 self.assertEqual(s.in_port, self.udp_port_in)
4125 self.assertEqual(s.out_port, self.udp_port_out)
4126 self.assertEqual(s.ext_port, self.udp_external_port)
4130 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4131 self.assertEqual(s.in_port, self.icmp_id_in)
4132 self.assertEqual(s.out_port, self.icmp_external_id)
4134 def test_multiple_users(self):
4135 """ Deterministic NAT multiple users """
4137 nat_ip = "10.0.0.10"
4139 external_port = 6303
4141 host0 = self.pg0.remote_hosts[0]
4142 host1 = self.pg0.remote_hosts[1]
4144 self.vapi.nat_det_add_del_map(host0.ip4n,
4146 socket.inet_aton(nat_ip),
4148 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4149 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4153 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4154 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4155 TCP(sport=port_in, dport=external_port))
4156 self.pg0.add_stream(p)
4157 self.pg_enable_capture(self.pg_interfaces)
4159 capture = self.pg1.get_capture(1)
4164 self.assertEqual(ip.src, nat_ip)
4165 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4166 self.assertEqual(tcp.dport, external_port)
4167 port_out0 = tcp.sport
4169 self.logger.error(ppp("Unexpected or invalid packet:", p))
4173 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4174 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4175 TCP(sport=port_in, dport=external_port))
4176 self.pg0.add_stream(p)
4177 self.pg_enable_capture(self.pg_interfaces)
4179 capture = self.pg1.get_capture(1)
4184 self.assertEqual(ip.src, nat_ip)
4185 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4186 self.assertEqual(tcp.dport, external_port)
4187 port_out1 = tcp.sport
4189 self.logger.error(ppp("Unexpected or invalid packet:", p))
4192 dms = self.vapi.nat_det_map_dump()
4193 self.assertEqual(1, len(dms))
4194 self.assertEqual(2, dms[0].ses_num)
4197 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4198 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4199 TCP(sport=external_port, dport=port_out0))
4200 self.pg1.add_stream(p)
4201 self.pg_enable_capture(self.pg_interfaces)
4203 capture = self.pg0.get_capture(1)
4208 self.assertEqual(ip.src, self.pg1.remote_ip4)
4209 self.assertEqual(ip.dst, host0.ip4)
4210 self.assertEqual(tcp.dport, port_in)
4211 self.assertEqual(tcp.sport, external_port)
4213 self.logger.error(ppp("Unexpected or invalid packet:", p))
4217 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4218 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4219 TCP(sport=external_port, dport=port_out1))
4220 self.pg1.add_stream(p)
4221 self.pg_enable_capture(self.pg_interfaces)
4223 capture = self.pg0.get_capture(1)
4228 self.assertEqual(ip.src, self.pg1.remote_ip4)
4229 self.assertEqual(ip.dst, host1.ip4)
4230 self.assertEqual(tcp.dport, port_in)
4231 self.assertEqual(tcp.sport, external_port)
4233 self.logger.error(ppp("Unexpected or invalid packet", p))
4236 # session close api test
4237 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4239 self.pg1.remote_ip4n,
4241 dms = self.vapi.nat_det_map_dump()
4242 self.assertEqual(dms[0].ses_num, 1)
4244 self.vapi.nat_det_close_session_in(host0.ip4n,
4246 self.pg1.remote_ip4n,
4248 dms = self.vapi.nat_det_map_dump()
4249 self.assertEqual(dms[0].ses_num, 0)
4251 def test_tcp_session_close_detection_in(self):
4252 """ Deterministic NAT TCP session close from inside network """
4253 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4255 socket.inet_aton(self.nat_addr),
4257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4261 self.initiate_tcp_session(self.pg0, self.pg1)
4263 # close the session from inside
4265 # FIN packet in -> out
4266 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4267 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4268 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4270 self.pg0.add_stream(p)
4271 self.pg_enable_capture(self.pg_interfaces)
4273 self.pg1.get_capture(1)
4277 # ACK packet out -> in
4278 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4279 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4280 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4284 # FIN packet out -> in
4285 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4286 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4287 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4291 self.pg1.add_stream(pkts)
4292 self.pg_enable_capture(self.pg_interfaces)
4294 self.pg0.get_capture(2)
4296 # ACK packet in -> out
4297 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4299 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4301 self.pg0.add_stream(p)
4302 self.pg_enable_capture(self.pg_interfaces)
4304 self.pg1.get_capture(1)
4306 # Check if deterministic NAT44 closed the session
4307 dms = self.vapi.nat_det_map_dump()
4308 self.assertEqual(0, dms[0].ses_num)
4310 self.logger.error("TCP session termination failed")
4313 def test_tcp_session_close_detection_out(self):
4314 """ Deterministic NAT TCP session close from outside network """
4315 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4317 socket.inet_aton(self.nat_addr),
4319 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4320 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4323 self.initiate_tcp_session(self.pg0, self.pg1)
4325 # close the session from outside
4327 # FIN packet out -> in
4328 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4329 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4330 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4332 self.pg1.add_stream(p)
4333 self.pg_enable_capture(self.pg_interfaces)
4335 self.pg0.get_capture(1)
4339 # ACK packet in -> out
4340 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4341 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4342 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4346 # ACK packet in -> out
4347 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4348 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4349 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4353 self.pg0.add_stream(pkts)
4354 self.pg_enable_capture(self.pg_interfaces)
4356 self.pg1.get_capture(2)
4358 # ACK packet out -> in
4359 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4360 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4361 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4363 self.pg1.add_stream(p)
4364 self.pg_enable_capture(self.pg_interfaces)
4366 self.pg0.get_capture(1)
4368 # Check if deterministic NAT44 closed the session
4369 dms = self.vapi.nat_det_map_dump()
4370 self.assertEqual(0, dms[0].ses_num)
4372 self.logger.error("TCP session termination failed")
4375 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4376 def test_session_timeout(self):
4377 """ Deterministic NAT session timeouts """
4378 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4380 socket.inet_aton(self.nat_addr),
4382 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4383 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4386 self.initiate_tcp_session(self.pg0, self.pg1)
4387 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4388 pkts = self.create_stream_in(self.pg0, self.pg1)
4389 self.pg0.add_stream(pkts)
4390 self.pg_enable_capture(self.pg_interfaces)
4392 capture = self.pg1.get_capture(len(pkts))
4395 dms = self.vapi.nat_det_map_dump()
4396 self.assertEqual(0, dms[0].ses_num)
4398 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4399 def test_session_limit_per_user(self):
4400 """ Deterministic NAT maximum sessions per user limit """
4401 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4403 socket.inet_aton(self.nat_addr),
4405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4408 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4409 src_address=self.pg2.local_ip4n,
4411 template_interval=10)
4412 self.vapi.nat_ipfix()
4415 for port in range(1025, 2025):
4416 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4417 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4418 UDP(sport=port, dport=port))
4421 self.pg0.add_stream(pkts)
4422 self.pg_enable_capture(self.pg_interfaces)
4424 capture = self.pg1.get_capture(len(pkts))
4426 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4427 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4428 UDP(sport=3001, dport=3002))
4429 self.pg0.add_stream(p)
4430 self.pg_enable_capture(self.pg_interfaces)
4432 capture = self.pg1.assert_nothing_captured()
4434 # verify ICMP error packet
4435 capture = self.pg0.get_capture(1)
4437 self.assertTrue(p.haslayer(ICMP))
4439 self.assertEqual(icmp.type, 3)
4440 self.assertEqual(icmp.code, 1)
4441 self.assertTrue(icmp.haslayer(IPerror))
4442 inner_ip = icmp[IPerror]
4443 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4444 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4446 dms = self.vapi.nat_det_map_dump()
4448 self.assertEqual(1000, dms[0].ses_num)
4450 # verify IPFIX logging
4451 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4453 capture = self.pg2.get_capture(2)
4454 ipfix = IPFIXDecoder()
4455 # first load template
4457 self.assertTrue(p.haslayer(IPFIX))
4458 if p.haslayer(Template):
4459 ipfix.add_template(p.getlayer(Template))
4460 # verify events in data set
4462 if p.haslayer(Data):
4463 data = ipfix.decode_data_set(p.getlayer(Set))
4464 self.verify_ipfix_max_entries_per_user(data)
4466 def clear_nat_det(self):
4468 Clear deterministic NAT configuration.
4470 self.vapi.nat_ipfix(enable=0)
4471 self.vapi.nat_det_set_timeouts()
4472 deterministic_mappings = self.vapi.nat_det_map_dump()
4473 for dsm in deterministic_mappings:
4474 self.vapi.nat_det_add_del_map(dsm.in_addr,
4480 interfaces = self.vapi.nat44_interface_dump()
4481 for intf in interfaces:
4482 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4487 super(TestDeterministicNAT, self).tearDown()
4488 if not self.vpp_dead:
4489 self.logger.info(self.vapi.cli("show nat44 detail"))
4490 self.clear_nat_det()
4493 class TestNAT64(MethodHolder):
4494 """ NAT64 Test Cases """
4497 def setUpConstants(cls):
4498 super(TestNAT64, cls).setUpConstants()
4499 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4500 "nat64 st hash buckets 256", "}"])
4503 def setUpClass(cls):
4504 super(TestNAT64, cls).setUpClass()
4507 cls.tcp_port_in = 6303
4508 cls.tcp_port_out = 6303
4509 cls.udp_port_in = 6304
4510 cls.udp_port_out = 6304
4511 cls.icmp_id_in = 6305
4512 cls.icmp_id_out = 6305
4513 cls.nat_addr = '10.0.0.3'
4514 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4516 cls.vrf1_nat_addr = '10.0.10.3'
4517 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4519 cls.ipfix_src_port = 4739
4520 cls.ipfix_domain_id = 1
4522 cls.create_pg_interfaces(range(5))
4523 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4524 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4525 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4527 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4529 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4531 cls.pg0.generate_remote_hosts(2)
4533 for i in cls.ip6_interfaces:
4536 i.configure_ipv6_neighbors()
4538 for i in cls.ip4_interfaces:
4544 cls.pg3.config_ip4()
4545 cls.pg3.resolve_arp()
4546 cls.pg3.config_ip6()
4547 cls.pg3.configure_ipv6_neighbors()
4550 super(TestNAT64, cls).tearDownClass()
4553 def test_pool(self):
4554 """ Add/delete address to NAT64 pool """
4555 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4557 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4559 addresses = self.vapi.nat64_pool_addr_dump()
4560 self.assertEqual(len(addresses), 1)
4561 self.assertEqual(addresses[0].address, nat_addr)
4563 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4565 addresses = self.vapi.nat64_pool_addr_dump()
4566 self.assertEqual(len(addresses), 0)
4568 def test_interface(self):
4569 """ Enable/disable NAT64 feature on the interface """
4570 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4571 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4573 interfaces = self.vapi.nat64_interface_dump()
4574 self.assertEqual(len(interfaces), 2)
4577 for intf in interfaces:
4578 if intf.sw_if_index == self.pg0.sw_if_index:
4579 self.assertEqual(intf.is_inside, 1)
4581 elif intf.sw_if_index == self.pg1.sw_if_index:
4582 self.assertEqual(intf.is_inside, 0)
4584 self.assertTrue(pg0_found)
4585 self.assertTrue(pg1_found)
4587 features = self.vapi.cli("show interface features pg0")
4588 self.assertNotEqual(features.find('nat64-in2out'), -1)
4589 features = self.vapi.cli("show interface features pg1")
4590 self.assertNotEqual(features.find('nat64-out2in'), -1)
4592 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4593 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4595 interfaces = self.vapi.nat64_interface_dump()
4596 self.assertEqual(len(interfaces), 0)
4598 def test_static_bib(self):
4599 """ Add/delete static BIB entry """
4600 in_addr = socket.inet_pton(socket.AF_INET6,
4601 '2001:db8:85a3::8a2e:370:7334')
4602 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4605 proto = IP_PROTOS.tcp
4607 self.vapi.nat64_add_del_static_bib(in_addr,
4612 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4617 self.assertEqual(bibe.i_addr, in_addr)
4618 self.assertEqual(bibe.o_addr, out_addr)
4619 self.assertEqual(bibe.i_port, in_port)
4620 self.assertEqual(bibe.o_port, out_port)
4621 self.assertEqual(static_bib_num, 1)
4623 self.vapi.nat64_add_del_static_bib(in_addr,
4629 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4634 self.assertEqual(static_bib_num, 0)
4636 def test_set_timeouts(self):
4637 """ Set NAT64 timeouts """
4638 # verify default values
4639 timeouts = self.vapi.nat64_get_timeouts()
4640 self.assertEqual(timeouts.udp, 300)
4641 self.assertEqual(timeouts.icmp, 60)
4642 self.assertEqual(timeouts.tcp_trans, 240)
4643 self.assertEqual(timeouts.tcp_est, 7440)
4644 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4646 # set and verify custom values
4647 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4648 tcp_est=7450, tcp_incoming_syn=10)
4649 timeouts = self.vapi.nat64_get_timeouts()
4650 self.assertEqual(timeouts.udp, 200)
4651 self.assertEqual(timeouts.icmp, 30)
4652 self.assertEqual(timeouts.tcp_trans, 250)
4653 self.assertEqual(timeouts.tcp_est, 7450)
4654 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4656 def test_dynamic(self):
4657 """ NAT64 dynamic translation test """
4658 self.tcp_port_in = 6303
4659 self.udp_port_in = 6304
4660 self.icmp_id_in = 6305
4662 ses_num_start = self.nat64_get_ses_num()
4664 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4666 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4667 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4670 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4671 self.pg0.add_stream(pkts)
4672 self.pg_enable_capture(self.pg_interfaces)
4674 capture = self.pg1.get_capture(len(pkts))
4675 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4676 dst_ip=self.pg1.remote_ip4)
4679 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4680 self.pg1.add_stream(pkts)
4681 self.pg_enable_capture(self.pg_interfaces)
4683 capture = self.pg0.get_capture(len(pkts))
4684 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4685 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4688 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4689 self.pg0.add_stream(pkts)
4690 self.pg_enable_capture(self.pg_interfaces)
4692 capture = self.pg1.get_capture(len(pkts))
4693 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4694 dst_ip=self.pg1.remote_ip4)
4697 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4698 self.pg1.add_stream(pkts)
4699 self.pg_enable_capture(self.pg_interfaces)
4701 capture = self.pg0.get_capture(len(pkts))
4702 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4704 ses_num_end = self.nat64_get_ses_num()
4706 self.assertEqual(ses_num_end - ses_num_start, 3)
4708 # tenant with specific VRF
4709 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4710 self.vrf1_nat_addr_n,
4711 vrf_id=self.vrf1_id)
4712 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4714 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4715 self.pg2.add_stream(pkts)
4716 self.pg_enable_capture(self.pg_interfaces)
4718 capture = self.pg1.get_capture(len(pkts))
4719 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4720 dst_ip=self.pg1.remote_ip4)
4722 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4723 self.pg1.add_stream(pkts)
4724 self.pg_enable_capture(self.pg_interfaces)
4726 capture = self.pg2.get_capture(len(pkts))
4727 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4729 def test_static(self):
4730 """ NAT64 static translation test """
4731 self.tcp_port_in = 60303
4732 self.udp_port_in = 60304
4733 self.icmp_id_in = 60305
4734 self.tcp_port_out = 60303
4735 self.udp_port_out = 60304
4736 self.icmp_id_out = 60305
4738 ses_num_start = self.nat64_get_ses_num()
4740 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4742 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4743 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4745 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4750 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4755 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4762 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4763 self.pg0.add_stream(pkts)
4764 self.pg_enable_capture(self.pg_interfaces)
4766 capture = self.pg1.get_capture(len(pkts))
4767 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4768 dst_ip=self.pg1.remote_ip4, same_port=True)
4771 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4772 self.pg1.add_stream(pkts)
4773 self.pg_enable_capture(self.pg_interfaces)
4775 capture = self.pg0.get_capture(len(pkts))
4776 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4777 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4779 ses_num_end = self.nat64_get_ses_num()
4781 self.assertEqual(ses_num_end - ses_num_start, 3)
4783 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4784 def test_session_timeout(self):
4785 """ NAT64 session timeout """
4786 self.icmp_id_in = 1234
4787 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4789 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4790 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4791 self.vapi.nat64_set_timeouts(icmp=5)
4793 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4794 self.pg0.add_stream(pkts)
4795 self.pg_enable_capture(self.pg_interfaces)
4797 capture = self.pg1.get_capture(len(pkts))
4799 ses_num_before_timeout = self.nat64_get_ses_num()
4803 # ICMP session after timeout
4804 ses_num_after_timeout = self.nat64_get_ses_num()
4805 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4807 def test_icmp_error(self):
4808 """ NAT64 ICMP Error message translation """
4809 self.tcp_port_in = 6303
4810 self.udp_port_in = 6304
4811 self.icmp_id_in = 6305
4813 ses_num_start = self.nat64_get_ses_num()
4815 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4817 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4818 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4820 # send some packets to create sessions
4821 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4822 self.pg0.add_stream(pkts)
4823 self.pg_enable_capture(self.pg_interfaces)
4825 capture_ip4 = self.pg1.get_capture(len(pkts))
4826 self.verify_capture_out(capture_ip4,
4827 nat_ip=self.nat_addr,
4828 dst_ip=self.pg1.remote_ip4)
4830 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4831 self.pg1.add_stream(pkts)
4832 self.pg_enable_capture(self.pg_interfaces)
4834 capture_ip6 = self.pg0.get_capture(len(pkts))
4835 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4836 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4837 self.pg0.remote_ip6)
4840 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4841 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4842 ICMPv6DestUnreach(code=1) /
4843 packet[IPv6] for packet in capture_ip6]
4844 self.pg0.add_stream(pkts)
4845 self.pg_enable_capture(self.pg_interfaces)
4847 capture = self.pg1.get_capture(len(pkts))
4848 for packet in capture:
4850 self.assertEqual(packet[IP].src, self.nat_addr)
4851 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4852 self.assertEqual(packet[ICMP].type, 3)
4853 self.assertEqual(packet[ICMP].code, 13)
4854 inner = packet[IPerror]
4855 self.assertEqual(inner.src, self.pg1.remote_ip4)
4856 self.assertEqual(inner.dst, self.nat_addr)
4857 self.check_icmp_checksum(packet)
4858 if inner.haslayer(TCPerror):
4859 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4860 elif inner.haslayer(UDPerror):
4861 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4863 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4865 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4869 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4870 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4871 ICMP(type=3, code=13) /
4872 packet[IP] for packet in capture_ip4]
4873 self.pg1.add_stream(pkts)
4874 self.pg_enable_capture(self.pg_interfaces)
4876 capture = self.pg0.get_capture(len(pkts))
4877 for packet in capture:
4879 self.assertEqual(packet[IPv6].src, ip.src)
4880 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4881 icmp = packet[ICMPv6DestUnreach]
4882 self.assertEqual(icmp.code, 1)
4883 inner = icmp[IPerror6]
4884 self.assertEqual(inner.src, self.pg0.remote_ip6)
4885 self.assertEqual(inner.dst, ip.src)
4886 self.check_icmpv6_checksum(packet)
4887 if inner.haslayer(TCPerror):
4888 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4889 elif inner.haslayer(UDPerror):
4890 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4892 self.assertEqual(inner[ICMPv6EchoRequest].id,
4895 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4898 def test_hairpinning(self):
4899 """ NAT64 hairpinning """
4901 client = self.pg0.remote_hosts[0]
4902 server = self.pg0.remote_hosts[1]
4903 server_tcp_in_port = 22
4904 server_tcp_out_port = 4022
4905 server_udp_in_port = 23
4906 server_udp_out_port = 4023
4907 client_tcp_in_port = 1234
4908 client_udp_in_port = 1235
4909 client_tcp_out_port = 0
4910 client_udp_out_port = 0
4911 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4912 nat_addr_ip6 = ip.src
4914 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4916 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4917 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4919 self.vapi.nat64_add_del_static_bib(server.ip6n,
4922 server_tcp_out_port,
4924 self.vapi.nat64_add_del_static_bib(server.ip6n,
4927 server_udp_out_port,
4932 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4933 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4934 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4936 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4937 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4938 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4940 self.pg0.add_stream(pkts)
4941 self.pg_enable_capture(self.pg_interfaces)
4943 capture = self.pg0.get_capture(len(pkts))
4944 for packet in capture:
4946 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4947 self.assertEqual(packet[IPv6].dst, server.ip6)
4948 if packet.haslayer(TCP):
4949 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4950 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4951 self.check_tcp_checksum(packet)
4952 client_tcp_out_port = packet[TCP].sport
4954 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4955 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4956 self.check_udp_checksum(packet)
4957 client_udp_out_port = packet[UDP].sport
4959 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4964 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4965 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4966 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4968 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4969 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4970 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4972 self.pg0.add_stream(pkts)
4973 self.pg_enable_capture(self.pg_interfaces)
4975 capture = self.pg0.get_capture(len(pkts))
4976 for packet in capture:
4978 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4979 self.assertEqual(packet[IPv6].dst, client.ip6)
4980 if packet.haslayer(TCP):
4981 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4982 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4983 self.check_tcp_checksum(packet)
4985 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4986 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4987 self.check_udp_checksum(packet)
4989 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4994 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4995 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4996 ICMPv6DestUnreach(code=1) /
4997 packet[IPv6] for packet in capture]
4998 self.pg0.add_stream(pkts)
4999 self.pg_enable_capture(self.pg_interfaces)
5001 capture = self.pg0.get_capture(len(pkts))
5002 for packet in capture:
5004 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5005 self.assertEqual(packet[IPv6].dst, server.ip6)
5006 icmp = packet[ICMPv6DestUnreach]
5007 self.assertEqual(icmp.code, 1)
5008 inner = icmp[IPerror6]
5009 self.assertEqual(inner.src, server.ip6)
5010 self.assertEqual(inner.dst, nat_addr_ip6)
5011 self.check_icmpv6_checksum(packet)
5012 if inner.haslayer(TCPerror):
5013 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5014 self.assertEqual(inner[TCPerror].dport,
5015 client_tcp_out_port)
5017 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5018 self.assertEqual(inner[UDPerror].dport,
5019 client_udp_out_port)
5021 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5024 def test_prefix(self):
5025 """ NAT64 Network-Specific Prefix """
5027 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5029 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5030 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5031 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5032 self.vrf1_nat_addr_n,
5033 vrf_id=self.vrf1_id)
5034 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5037 global_pref64 = "2001:db8::"
5038 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5039 global_pref64_len = 32
5040 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5042 prefix = self.vapi.nat64_prefix_dump()
5043 self.assertEqual(len(prefix), 1)
5044 self.assertEqual(prefix[0].prefix, global_pref64_n)
5045 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5046 self.assertEqual(prefix[0].vrf_id, 0)
5048 # Add tenant specific prefix
5049 vrf1_pref64 = "2001:db8:122:300::"
5050 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5051 vrf1_pref64_len = 56
5052 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5054 vrf_id=self.vrf1_id)
5055 prefix = self.vapi.nat64_prefix_dump()
5056 self.assertEqual(len(prefix), 2)
5059 pkts = self.create_stream_in_ip6(self.pg0,
5062 plen=global_pref64_len)
5063 self.pg0.add_stream(pkts)
5064 self.pg_enable_capture(self.pg_interfaces)
5066 capture = self.pg1.get_capture(len(pkts))
5067 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5068 dst_ip=self.pg1.remote_ip4)
5070 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5071 self.pg1.add_stream(pkts)
5072 self.pg_enable_capture(self.pg_interfaces)
5074 capture = self.pg0.get_capture(len(pkts))
5075 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5078 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5080 # Tenant specific prefix
5081 pkts = self.create_stream_in_ip6(self.pg2,
5084 plen=vrf1_pref64_len)
5085 self.pg2.add_stream(pkts)
5086 self.pg_enable_capture(self.pg_interfaces)
5088 capture = self.pg1.get_capture(len(pkts))
5089 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5090 dst_ip=self.pg1.remote_ip4)
5092 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5093 self.pg1.add_stream(pkts)
5094 self.pg_enable_capture(self.pg_interfaces)
5096 capture = self.pg2.get_capture(len(pkts))
5097 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5100 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5102 def test_unknown_proto(self):
5103 """ NAT64 translate packet with unknown protocol """
5105 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5107 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5108 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5109 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5112 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5113 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5114 TCP(sport=self.tcp_port_in, dport=20))
5115 self.pg0.add_stream(p)
5116 self.pg_enable_capture(self.pg_interfaces)
5118 p = self.pg1.get_capture(1)
5120 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5121 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5123 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5124 TCP(sport=1234, dport=1234))
5125 self.pg0.add_stream(p)
5126 self.pg_enable_capture(self.pg_interfaces)
5128 p = self.pg1.get_capture(1)
5131 self.assertEqual(packet[IP].src, self.nat_addr)
5132 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5133 self.assertTrue(packet.haslayer(GRE))
5134 self.check_ip_checksum(packet)
5136 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5140 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5141 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5143 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5144 TCP(sport=1234, dport=1234))
5145 self.pg1.add_stream(p)
5146 self.pg_enable_capture(self.pg_interfaces)
5148 p = self.pg0.get_capture(1)
5151 self.assertEqual(packet[IPv6].src, remote_ip6)
5152 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5153 self.assertEqual(packet[IPv6].nh, 47)
5155 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5158 def test_hairpinning_unknown_proto(self):
5159 """ NAT64 translate packet with unknown protocol - hairpinning """
5161 client = self.pg0.remote_hosts[0]
5162 server = self.pg0.remote_hosts[1]
5163 server_tcp_in_port = 22
5164 server_tcp_out_port = 4022
5165 client_tcp_in_port = 1234
5166 client_tcp_out_port = 1235
5167 server_nat_ip = "10.0.0.100"
5168 client_nat_ip = "10.0.0.110"
5169 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5170 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5171 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5172 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5174 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5176 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5177 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5179 self.vapi.nat64_add_del_static_bib(server.ip6n,
5182 server_tcp_out_port,
5185 self.vapi.nat64_add_del_static_bib(server.ip6n,
5191 self.vapi.nat64_add_del_static_bib(client.ip6n,
5194 client_tcp_out_port,
5198 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5199 IPv6(src=client.ip6, dst=server_nat_ip6) /
5200 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5201 self.pg0.add_stream(p)
5202 self.pg_enable_capture(self.pg_interfaces)
5204 p = self.pg0.get_capture(1)
5206 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5207 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5209 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5210 TCP(sport=1234, dport=1234))
5211 self.pg0.add_stream(p)
5212 self.pg_enable_capture(self.pg_interfaces)
5214 p = self.pg0.get_capture(1)
5217 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5218 self.assertEqual(packet[IPv6].dst, server.ip6)
5219 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5221 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5225 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5226 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5228 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5229 TCP(sport=1234, dport=1234))
5230 self.pg0.add_stream(p)
5231 self.pg_enable_capture(self.pg_interfaces)
5233 p = self.pg0.get_capture(1)
5236 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5237 self.assertEqual(packet[IPv6].dst, client.ip6)
5238 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5240 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5243 def test_one_armed_nat64(self):
5244 """ One armed NAT64 """
5246 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5250 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5252 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5253 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5256 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5257 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5258 TCP(sport=12345, dport=80))
5259 self.pg3.add_stream(p)
5260 self.pg_enable_capture(self.pg_interfaces)
5262 capture = self.pg3.get_capture(1)
5267 self.assertEqual(ip.src, self.nat_addr)
5268 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5269 self.assertNotEqual(tcp.sport, 12345)
5270 external_port = tcp.sport
5271 self.assertEqual(tcp.dport, 80)
5272 self.check_tcp_checksum(p)
5273 self.check_ip_checksum(p)
5275 self.logger.error(ppp("Unexpected or invalid packet:", p))
5279 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5280 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5281 TCP(sport=80, dport=external_port))
5282 self.pg3.add_stream(p)
5283 self.pg_enable_capture(self.pg_interfaces)
5285 capture = self.pg3.get_capture(1)
5290 self.assertEqual(ip.src, remote_host_ip6)
5291 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5292 self.assertEqual(tcp.sport, 80)
5293 self.assertEqual(tcp.dport, 12345)
5294 self.check_tcp_checksum(p)
5296 self.logger.error(ppp("Unexpected or invalid packet:", p))
5299 def test_frag_in_order(self):
5300 """ NAT64 translate fragments arriving in order """
5301 self.tcp_port_in = random.randint(1025, 65535)
5303 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5305 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5306 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5308 reass = self.vapi.nat_reass_dump()
5309 reass_n_start = len(reass)
5313 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5314 self.tcp_port_in, 20, data)
5315 self.pg0.add_stream(pkts)
5316 self.pg_enable_capture(self.pg_interfaces)
5318 frags = self.pg1.get_capture(len(pkts))
5319 p = self.reass_frags_and_verify(frags,
5321 self.pg1.remote_ip4)
5322 self.assertEqual(p[TCP].dport, 20)
5323 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5324 self.tcp_port_out = p[TCP].sport
5325 self.assertEqual(data, p[Raw].load)
5328 data = "A" * 4 + "b" * 16 + "C" * 3
5329 pkts = self.create_stream_frag(self.pg1,
5334 self.pg1.add_stream(pkts)
5335 self.pg_enable_capture(self.pg_interfaces)
5337 frags = self.pg0.get_capture(len(pkts))
5338 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5339 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5340 self.assertEqual(p[TCP].sport, 20)
5341 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5342 self.assertEqual(data, p[Raw].load)
5344 reass = self.vapi.nat_reass_dump()
5345 reass_n_end = len(reass)
5347 self.assertEqual(reass_n_end - reass_n_start, 2)
5349 def test_reass_hairpinning(self):
5350 """ NAT64 fragments hairpinning """
5352 client = self.pg0.remote_hosts[0]
5353 server = self.pg0.remote_hosts[1]
5354 server_in_port = random.randint(1025, 65535)
5355 server_out_port = random.randint(1025, 65535)
5356 client_in_port = random.randint(1025, 65535)
5357 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5358 nat_addr_ip6 = ip.src
5360 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5362 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5363 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5365 # add static BIB entry for server
5366 self.vapi.nat64_add_del_static_bib(server.ip6n,
5372 # send packet from host to server
5373 pkts = self.create_stream_frag_ip6(self.pg0,
5378 self.pg0.add_stream(pkts)
5379 self.pg_enable_capture(self.pg_interfaces)
5381 frags = self.pg0.get_capture(len(pkts))
5382 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5383 self.assertNotEqual(p[TCP].sport, client_in_port)
5384 self.assertEqual(p[TCP].dport, server_in_port)
5385 self.assertEqual(data, p[Raw].load)
5387 def test_frag_out_of_order(self):
5388 """ NAT64 translate fragments arriving out of order """
5389 self.tcp_port_in = random.randint(1025, 65535)
5391 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5393 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5394 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5398 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5399 self.tcp_port_in, 20, data)
5401 self.pg0.add_stream(pkts)
5402 self.pg_enable_capture(self.pg_interfaces)
5404 frags = self.pg1.get_capture(len(pkts))
5405 p = self.reass_frags_and_verify(frags,
5407 self.pg1.remote_ip4)
5408 self.assertEqual(p[TCP].dport, 20)
5409 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5410 self.tcp_port_out = p[TCP].sport
5411 self.assertEqual(data, p[Raw].load)
5414 data = "A" * 4 + "B" * 16 + "C" * 3
5415 pkts = self.create_stream_frag(self.pg1,
5421 self.pg1.add_stream(pkts)
5422 self.pg_enable_capture(self.pg_interfaces)
5424 frags = self.pg0.get_capture(len(pkts))
5425 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5426 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5427 self.assertEqual(p[TCP].sport, 20)
5428 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5429 self.assertEqual(data, p[Raw].load)
5431 def test_interface_addr(self):
5432 """ Acquire NAT64 pool addresses from interface """
5433 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5435 # no address in NAT64 pool
5436 adresses = self.vapi.nat44_address_dump()
5437 self.assertEqual(0, len(adresses))
5439 # configure interface address and check NAT64 address pool
5440 self.pg4.config_ip4()
5441 addresses = self.vapi.nat64_pool_addr_dump()
5442 self.assertEqual(len(addresses), 1)
5443 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5445 # remove interface address and check NAT64 address pool
5446 self.pg4.unconfig_ip4()
5447 addresses = self.vapi.nat64_pool_addr_dump()
5448 self.assertEqual(0, len(adresses))
5450 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5451 def test_ipfix_max_bibs_sessions(self):
5452 """ IPFIX logging maximum session and BIB entries exceeded """
5455 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5459 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5461 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5462 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5466 for i in range(0, max_bibs):
5467 src = "fd01:aa::%x" % (i)
5468 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5469 IPv6(src=src, dst=remote_host_ip6) /
5470 TCP(sport=12345, dport=80))
5472 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5473 IPv6(src=src, dst=remote_host_ip6) /
5474 TCP(sport=12345, dport=22))
5476 self.pg0.add_stream(pkts)
5477 self.pg_enable_capture(self.pg_interfaces)
5479 self.pg1.get_capture(max_sessions)
5481 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5482 src_address=self.pg3.local_ip4n,
5484 template_interval=10)
5485 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5486 src_port=self.ipfix_src_port)
5488 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5489 IPv6(src=src, dst=remote_host_ip6) /
5490 TCP(sport=12345, dport=25))
5491 self.pg0.add_stream(p)
5492 self.pg_enable_capture(self.pg_interfaces)
5494 self.pg1.get_capture(0)
5495 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5496 capture = self.pg3.get_capture(9)
5497 ipfix = IPFIXDecoder()
5498 # first load template
5500 self.assertTrue(p.haslayer(IPFIX))
5501 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5502 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5503 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5504 self.assertEqual(p[UDP].dport, 4739)
5505 self.assertEqual(p[IPFIX].observationDomainID,
5506 self.ipfix_domain_id)
5507 if p.haslayer(Template):
5508 ipfix.add_template(p.getlayer(Template))
5509 # verify events in data set
5511 if p.haslayer(Data):
5512 data = ipfix.decode_data_set(p.getlayer(Set))
5513 self.verify_ipfix_max_sessions(data, max_sessions)
5515 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5516 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5517 TCP(sport=12345, dport=80))
5518 self.pg0.add_stream(p)
5519 self.pg_enable_capture(self.pg_interfaces)
5521 self.pg1.get_capture(0)
5522 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5523 capture = self.pg3.get_capture(1)
5524 # verify events in data set
5526 self.assertTrue(p.haslayer(IPFIX))
5527 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5528 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5529 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5530 self.assertEqual(p[UDP].dport, 4739)
5531 self.assertEqual(p[IPFIX].observationDomainID,
5532 self.ipfix_domain_id)
5533 if p.haslayer(Data):
5534 data = ipfix.decode_data_set(p.getlayer(Set))
5535 self.verify_ipfix_max_bibs(data, max_bibs)
5537 def test_ipfix_max_frags(self):
5538 """ IPFIX logging maximum fragments pending reassembly exceeded """
5539 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5541 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5542 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5543 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5544 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5545 src_address=self.pg3.local_ip4n,
5547 template_interval=10)
5548 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5549 src_port=self.ipfix_src_port)
5552 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5553 self.tcp_port_in, 20, data)
5554 self.pg0.add_stream(pkts[-1])
5555 self.pg_enable_capture(self.pg_interfaces)
5557 self.pg1.get_capture(0)
5558 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5559 capture = self.pg3.get_capture(9)
5560 ipfix = IPFIXDecoder()
5561 # first load template
5563 self.assertTrue(p.haslayer(IPFIX))
5564 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5565 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5566 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5567 self.assertEqual(p[UDP].dport, 4739)
5568 self.assertEqual(p[IPFIX].observationDomainID,
5569 self.ipfix_domain_id)
5570 if p.haslayer(Template):
5571 ipfix.add_template(p.getlayer(Template))
5572 # verify events in data set
5574 if p.haslayer(Data):
5575 data = ipfix.decode_data_set(p.getlayer(Set))
5576 self.verify_ipfix_max_fragments_ip6(data, 0,
5577 self.pg0.remote_ip6n)
5579 def test_ipfix_bib_ses(self):
5580 """ IPFIX logging NAT64 BIB/session create and delete events """
5581 self.tcp_port_in = random.randint(1025, 65535)
5582 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5586 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5588 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5589 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5590 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5591 src_address=self.pg3.local_ip4n,
5593 template_interval=10)
5594 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5595 src_port=self.ipfix_src_port)
5598 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5599 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5600 TCP(sport=self.tcp_port_in, dport=25))
5601 self.pg0.add_stream(p)
5602 self.pg_enable_capture(self.pg_interfaces)
5604 p = self.pg1.get_capture(1)
5605 self.tcp_port_out = p[0][TCP].sport
5606 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5607 capture = self.pg3.get_capture(10)
5608 ipfix = IPFIXDecoder()
5609 # first load template
5611 self.assertTrue(p.haslayer(IPFIX))
5612 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5613 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5614 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5615 self.assertEqual(p[UDP].dport, 4739)
5616 self.assertEqual(p[IPFIX].observationDomainID,
5617 self.ipfix_domain_id)
5618 if p.haslayer(Template):
5619 ipfix.add_template(p.getlayer(Template))
5620 # verify events in data set
5622 if p.haslayer(Data):
5623 data = ipfix.decode_data_set(p.getlayer(Set))
5624 if ord(data[0][230]) == 10:
5625 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5626 elif ord(data[0][230]) == 6:
5627 self.verify_ipfix_nat64_ses(data,
5629 self.pg0.remote_ip6n,
5630 self.pg1.remote_ip4,
5633 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5636 self.pg_enable_capture(self.pg_interfaces)
5637 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5640 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5641 capture = self.pg3.get_capture(2)
5642 # verify events in data set
5644 self.assertTrue(p.haslayer(IPFIX))
5645 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5646 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5647 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5648 self.assertEqual(p[UDP].dport, 4739)
5649 self.assertEqual(p[IPFIX].observationDomainID,
5650 self.ipfix_domain_id)
5651 if p.haslayer(Data):
5652 data = ipfix.decode_data_set(p.getlayer(Set))
5653 if ord(data[0][230]) == 11:
5654 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5655 elif ord(data[0][230]) == 7:
5656 self.verify_ipfix_nat64_ses(data,
5658 self.pg0.remote_ip6n,
5659 self.pg1.remote_ip4,
5662 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5664 def nat64_get_ses_num(self):
5666 Return number of active NAT64 sessions.
5668 st = self.vapi.nat64_st_dump()
5671 def clear_nat64(self):
5673 Clear NAT64 configuration.
5675 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5676 domain_id=self.ipfix_domain_id)
5677 self.ipfix_src_port = 4739
5678 self.ipfix_domain_id = 1
5680 self.vapi.nat64_set_timeouts()
5682 interfaces = self.vapi.nat64_interface_dump()
5683 for intf in interfaces:
5684 if intf.is_inside > 1:
5685 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5688 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5692 bib = self.vapi.nat64_bib_dump(255)
5695 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5703 adresses = self.vapi.nat64_pool_addr_dump()
5704 for addr in adresses:
5705 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5710 prefixes = self.vapi.nat64_prefix_dump()
5711 for prefix in prefixes:
5712 self.vapi.nat64_add_del_prefix(prefix.prefix,
5714 vrf_id=prefix.vrf_id,
5718 super(TestNAT64, self).tearDown()
5719 if not self.vpp_dead:
5720 self.logger.info(self.vapi.cli("show nat64 pool"))
5721 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5722 self.logger.info(self.vapi.cli("show nat64 prefix"))
5723 self.logger.info(self.vapi.cli("show nat64 bib all"))
5724 self.logger.info(self.vapi.cli("show nat64 session table all"))
5725 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5729 class TestDSlite(MethodHolder):
5730 """ DS-Lite Test Cases """
5733 def setUpClass(cls):
5734 super(TestDSlite, cls).setUpClass()
5737 cls.nat_addr = '10.0.0.3'
5738 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5740 cls.create_pg_interfaces(range(2))
5742 cls.pg0.config_ip4()
5743 cls.pg0.resolve_arp()
5745 cls.pg1.config_ip6()
5746 cls.pg1.generate_remote_hosts(2)
5747 cls.pg1.configure_ipv6_neighbors()
5750 super(TestDSlite, cls).tearDownClass()
5753 def test_dslite(self):
5754 """ Test DS-Lite """
5755 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5757 aftr_ip4 = '192.0.0.1'
5758 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5759 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5760 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5761 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5764 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5765 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5766 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5767 UDP(sport=20000, dport=10000))
5768 self.pg1.add_stream(p)
5769 self.pg_enable_capture(self.pg_interfaces)
5771 capture = self.pg0.get_capture(1)
5772 capture = capture[0]
5773 self.assertFalse(capture.haslayer(IPv6))
5774 self.assertEqual(capture[IP].src, self.nat_addr)
5775 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5776 self.assertNotEqual(capture[UDP].sport, 20000)
5777 self.assertEqual(capture[UDP].dport, 10000)
5778 self.check_ip_checksum(capture)
5779 out_port = capture[UDP].sport
5781 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5782 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5783 UDP(sport=10000, dport=out_port))
5784 self.pg0.add_stream(p)
5785 self.pg_enable_capture(self.pg_interfaces)
5787 capture = self.pg1.get_capture(1)
5788 capture = capture[0]
5789 self.assertEqual(capture[IPv6].src, aftr_ip6)
5790 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5791 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5792 self.assertEqual(capture[IP].dst, '192.168.1.1')
5793 self.assertEqual(capture[UDP].sport, 10000)
5794 self.assertEqual(capture[UDP].dport, 20000)
5795 self.check_ip_checksum(capture)
5798 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5799 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5800 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5801 TCP(sport=20001, dport=10001))
5802 self.pg1.add_stream(p)
5803 self.pg_enable_capture(self.pg_interfaces)
5805 capture = self.pg0.get_capture(1)
5806 capture = capture[0]
5807 self.assertFalse(capture.haslayer(IPv6))
5808 self.assertEqual(capture[IP].src, self.nat_addr)
5809 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5810 self.assertNotEqual(capture[TCP].sport, 20001)
5811 self.assertEqual(capture[TCP].dport, 10001)
5812 self.check_ip_checksum(capture)
5813 self.check_tcp_checksum(capture)
5814 out_port = capture[TCP].sport
5816 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5817 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5818 TCP(sport=10001, dport=out_port))
5819 self.pg0.add_stream(p)
5820 self.pg_enable_capture(self.pg_interfaces)
5822 capture = self.pg1.get_capture(1)
5823 capture = capture[0]
5824 self.assertEqual(capture[IPv6].src, aftr_ip6)
5825 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5826 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5827 self.assertEqual(capture[IP].dst, '192.168.1.1')
5828 self.assertEqual(capture[TCP].sport, 10001)
5829 self.assertEqual(capture[TCP].dport, 20001)
5830 self.check_ip_checksum(capture)
5831 self.check_tcp_checksum(capture)
5834 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5835 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5836 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5837 ICMP(id=4000, type='echo-request'))
5838 self.pg1.add_stream(p)
5839 self.pg_enable_capture(self.pg_interfaces)
5841 capture = self.pg0.get_capture(1)
5842 capture = capture[0]
5843 self.assertFalse(capture.haslayer(IPv6))
5844 self.assertEqual(capture[IP].src, self.nat_addr)
5845 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5846 self.assertNotEqual(capture[ICMP].id, 4000)
5847 self.check_ip_checksum(capture)
5848 self.check_icmp_checksum(capture)
5849 out_id = capture[ICMP].id
5851 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5852 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5853 ICMP(id=out_id, type='echo-reply'))
5854 self.pg0.add_stream(p)
5855 self.pg_enable_capture(self.pg_interfaces)
5857 capture = self.pg1.get_capture(1)
5858 capture = capture[0]
5859 self.assertEqual(capture[IPv6].src, aftr_ip6)
5860 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5861 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5862 self.assertEqual(capture[IP].dst, '192.168.1.1')
5863 self.assertEqual(capture[ICMP].id, 4000)
5864 self.check_ip_checksum(capture)
5865 self.check_icmp_checksum(capture)
5867 # ping DS-Lite AFTR tunnel endpoint address
5868 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5869 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5870 ICMPv6EchoRequest())
5871 self.pg1.add_stream(p)
5872 self.pg_enable_capture(self.pg_interfaces)
5874 capture = self.pg1.get_capture(1)
5875 self.assertEqual(1, len(capture))
5876 capture = capture[0]
5877 self.assertEqual(capture[IPv6].src, aftr_ip6)
5878 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5879 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5882 super(TestDSlite, self).tearDown()
5883 if not self.vpp_dead:
5884 self.logger.info(self.vapi.cli("show dslite pool"))
5886 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5887 self.logger.info(self.vapi.cli("show dslite sessions"))
5890 class TestDSliteCE(MethodHolder):
5891 """ DS-Lite CE Test Cases """
5894 def setUpConstants(cls):
5895 super(TestDSliteCE, cls).setUpConstants()
5896 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5899 def setUpClass(cls):
5900 super(TestDSliteCE, cls).setUpClass()
5903 cls.create_pg_interfaces(range(2))
5905 cls.pg0.config_ip4()
5906 cls.pg0.resolve_arp()
5908 cls.pg1.config_ip6()
5909 cls.pg1.generate_remote_hosts(1)
5910 cls.pg1.configure_ipv6_neighbors()
5913 super(TestDSliteCE, cls).tearDownClass()
5916 def test_dslite_ce(self):
5917 """ Test DS-Lite CE """
5919 b4_ip4 = '192.0.0.2'
5920 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5921 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5922 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5923 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5925 aftr_ip4 = '192.0.0.1'
5926 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5927 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5928 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5929 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5931 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5932 dst_address_length=128,
5933 next_hop_address=self.pg1.remote_ip6n,
5934 next_hop_sw_if_index=self.pg1.sw_if_index,
5938 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5939 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5940 UDP(sport=10000, dport=20000))
5941 self.pg0.add_stream(p)
5942 self.pg_enable_capture(self.pg_interfaces)
5944 capture = self.pg1.get_capture(1)
5945 capture = capture[0]
5946 self.assertEqual(capture[IPv6].src, b4_ip6)
5947 self.assertEqual(capture[IPv6].dst, aftr_ip6)
5948 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5949 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5950 self.assertEqual(capture[UDP].sport, 10000)
5951 self.assertEqual(capture[UDP].dport, 20000)
5952 self.check_ip_checksum(capture)
5955 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5956 IPv6(dst=b4_ip6, src=aftr_ip6) /
5957 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5958 UDP(sport=20000, dport=10000))
5959 self.pg1.add_stream(p)
5960 self.pg_enable_capture(self.pg_interfaces)
5962 capture = self.pg0.get_capture(1)
5963 capture = capture[0]
5964 self.assertFalse(capture.haslayer(IPv6))
5965 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5966 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5967 self.assertEqual(capture[UDP].sport, 20000)
5968 self.assertEqual(capture[UDP].dport, 10000)
5969 self.check_ip_checksum(capture)
5971 # ping DS-Lite B4 tunnel endpoint address
5972 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5973 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5974 ICMPv6EchoRequest())
5975 self.pg1.add_stream(p)
5976 self.pg_enable_capture(self.pg_interfaces)
5978 capture = self.pg1.get_capture(1)
5979 self.assertEqual(1, len(capture))
5980 capture = capture[0]
5981 self.assertEqual(capture[IPv6].src, b4_ip6)
5982 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5983 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5986 super(TestDSliteCE, self).tearDown()
5987 if not self.vpp_dead:
5989 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5991 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5993 if __name__ == '__main__':
5994 unittest.main(testRunner=VppTestRunner)