9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1035 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1036 for lb_sm in lb_static_mappings:
1037 self.vapi.nat44_add_del_lb_static_mapping(
1038 lb_sm.external_addr,
1039 lb_sm.external_port,
1041 vrf_id=lb_sm.vrf_id,
1042 twice_nat=lb_sm.twice_nat,
1043 out2in_only=lb_sm.out2in_only,
1048 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1049 for id_m in identity_mappings:
1050 self.vapi.nat44_add_del_identity_mapping(
1051 addr_only=id_m.addr_only,
1054 sw_if_index=id_m.sw_if_index,
1056 protocol=id_m.protocol,
1059 adresses = self.vapi.nat44_address_dump()
1060 for addr in adresses:
1061 self.vapi.nat44_add_del_address_range(addr.ip_address,
1063 twice_nat=addr.twice_nat,
1066 self.vapi.nat_set_reass()
1067 self.vapi.nat_set_reass(is_ip6=1)
1069 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1070 local_port=0, external_port=0, vrf_id=0,
1071 is_add=1, external_sw_if_index=0xFFFFFFFF,
1072 proto=0, twice_nat=0, out2in_only=0):
1074 Add/delete NAT44 static mapping
1076 :param local_ip: Local IP address
1077 :param external_ip: External IP address
1078 :param local_port: Local port number (Optional)
1079 :param external_port: External port number (Optional)
1080 :param vrf_id: VRF ID (Default 0)
1081 :param is_add: 1 if add, 0 if delete (Default add)
1082 :param external_sw_if_index: External interface instead of IP address
1083 :param proto: IP protocol (Mandatory if port specified)
1084 :param twice_nat: 1 if translate external host address and port
1085 :param out2in_only: if 1 rule is matching only out2in direction
1088 if local_port and external_port:
1090 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1091 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1092 self.vapi.nat44_add_del_static_mapping(
1095 external_sw_if_index,
1105 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1107 Add/delete NAT44 address
1109 :param ip: IP address
1110 :param is_add: 1 if add, 0 if delete (Default add)
1111 :param twice_nat: twice NAT address for extenal hosts
1113 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1114 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1116 twice_nat=twice_nat)
1118 def test_dynamic(self):
1119 """ NAT44 dynamic translation test """
1121 self.nat44_add_address(self.nat_addr)
1122 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1123 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1127 pkts = self.create_stream_in(self.pg0, self.pg1)
1128 self.pg0.add_stream(pkts)
1129 self.pg_enable_capture(self.pg_interfaces)
1131 capture = self.pg1.get_capture(len(pkts))
1132 self.verify_capture_out(capture)
1135 pkts = self.create_stream_out(self.pg1)
1136 self.pg1.add_stream(pkts)
1137 self.pg_enable_capture(self.pg_interfaces)
1139 capture = self.pg0.get_capture(len(pkts))
1140 self.verify_capture_in(capture, self.pg0)
1142 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1143 """ NAT44 handling of client packets with TTL=1 """
1145 self.nat44_add_address(self.nat_addr)
1146 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1147 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1150 # Client side - generate traffic
1151 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1152 self.pg0.add_stream(pkts)
1153 self.pg_enable_capture(self.pg_interfaces)
1156 # Client side - verify ICMP type 11 packets
1157 capture = self.pg0.get_capture(len(pkts))
1158 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1160 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1161 """ NAT44 handling of server packets with TTL=1 """
1163 self.nat44_add_address(self.nat_addr)
1164 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1165 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1168 # Client side - create sessions
1169 pkts = self.create_stream_in(self.pg0, self.pg1)
1170 self.pg0.add_stream(pkts)
1171 self.pg_enable_capture(self.pg_interfaces)
1174 # Server side - generate traffic
1175 capture = self.pg1.get_capture(len(pkts))
1176 self.verify_capture_out(capture)
1177 pkts = self.create_stream_out(self.pg1, ttl=1)
1178 self.pg1.add_stream(pkts)
1179 self.pg_enable_capture(self.pg_interfaces)
1182 # Server side - verify ICMP type 11 packets
1183 capture = self.pg1.get_capture(len(pkts))
1184 self.verify_capture_out_with_icmp_errors(capture,
1185 src_ip=self.pg1.local_ip4)
1187 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1188 """ NAT44 handling of error responses to client packets with TTL=2 """
1190 self.nat44_add_address(self.nat_addr)
1191 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1192 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1195 # Client side - generate traffic
1196 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1197 self.pg0.add_stream(pkts)
1198 self.pg_enable_capture(self.pg_interfaces)
1201 # Server side - simulate ICMP type 11 response
1202 capture = self.pg1.get_capture(len(pkts))
1203 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1204 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1205 ICMP(type=11) / packet[IP] for packet in capture]
1206 self.pg1.add_stream(pkts)
1207 self.pg_enable_capture(self.pg_interfaces)
1210 # Client side - verify ICMP type 11 packets
1211 capture = self.pg0.get_capture(len(pkts))
1212 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1214 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1215 """ NAT44 handling of error responses to server packets with TTL=2 """
1217 self.nat44_add_address(self.nat_addr)
1218 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1219 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1222 # Client side - create sessions
1223 pkts = self.create_stream_in(self.pg0, self.pg1)
1224 self.pg0.add_stream(pkts)
1225 self.pg_enable_capture(self.pg_interfaces)
1228 # Server side - generate traffic
1229 capture = self.pg1.get_capture(len(pkts))
1230 self.verify_capture_out(capture)
1231 pkts = self.create_stream_out(self.pg1, ttl=2)
1232 self.pg1.add_stream(pkts)
1233 self.pg_enable_capture(self.pg_interfaces)
1236 # Client side - simulate ICMP type 11 response
1237 capture = self.pg0.get_capture(len(pkts))
1238 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1239 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1240 ICMP(type=11) / packet[IP] for packet in capture]
1241 self.pg0.add_stream(pkts)
1242 self.pg_enable_capture(self.pg_interfaces)
1245 # Server side - verify ICMP type 11 packets
1246 capture = self.pg1.get_capture(len(pkts))
1247 self.verify_capture_out_with_icmp_errors(capture)
1249 def test_ping_out_interface_from_outside(self):
1250 """ Ping NAT44 out interface from outside network """
1252 self.nat44_add_address(self.nat_addr)
1253 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1254 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1257 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1258 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1259 ICMP(id=self.icmp_id_out, type='echo-request'))
1261 self.pg1.add_stream(pkts)
1262 self.pg_enable_capture(self.pg_interfaces)
1264 capture = self.pg1.get_capture(len(pkts))
1265 self.assertEqual(1, len(capture))
1268 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1269 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1270 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1271 self.assertEqual(packet[ICMP].type, 0) # echo reply
1273 self.logger.error(ppp("Unexpected or invalid packet "
1274 "(outside network):", packet))
1277 def test_ping_internal_host_from_outside(self):
1278 """ Ping internal host from outside network """
1280 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1281 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1282 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1286 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1287 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1288 ICMP(id=self.icmp_id_out, type='echo-request'))
1289 self.pg1.add_stream(pkt)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(1)
1293 self.verify_capture_in(capture, self.pg0, packet_num=1)
1294 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1297 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1298 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1299 ICMP(id=self.icmp_id_in, type='echo-reply'))
1300 self.pg0.add_stream(pkt)
1301 self.pg_enable_capture(self.pg_interfaces)
1303 capture = self.pg1.get_capture(1)
1304 self.verify_capture_out(capture, same_port=True, packet_num=1)
1305 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1307 def test_forwarding(self):
1308 """ NAT44 forwarding test """
1310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1311 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1313 self.vapi.nat44_forwarding_enable_disable(1)
1315 real_ip = self.pg0.remote_ip4n
1316 alias_ip = self.nat_addr_n
1317 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1318 external_ip=alias_ip)
1321 # in2out - static mapping match
1323 pkts = self.create_stream_out(self.pg1)
1324 self.pg1.add_stream(pkts)
1325 self.pg_enable_capture(self.pg_interfaces)
1327 capture = self.pg0.get_capture(len(pkts))
1328 self.verify_capture_in(capture, self.pg0)
1330 pkts = self.create_stream_in(self.pg0, self.pg1)
1331 self.pg0.add_stream(pkts)
1332 self.pg_enable_capture(self.pg_interfaces)
1334 capture = self.pg1.get_capture(len(pkts))
1335 self.verify_capture_out(capture, same_port=True)
1337 # in2out - no static mapping match
1339 host0 = self.pg0.remote_hosts[0]
1340 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1342 pkts = self.create_stream_out(self.pg1,
1343 dst_ip=self.pg0.remote_ip4,
1344 use_inside_ports=True)
1345 self.pg1.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg0.get_capture(len(pkts))
1349 self.verify_capture_in(capture, self.pg0)
1351 pkts = self.create_stream_in(self.pg0, self.pg1)
1352 self.pg0.add_stream(pkts)
1353 self.pg_enable_capture(self.pg_interfaces)
1355 capture = self.pg1.get_capture(len(pkts))
1356 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1359 self.pg0.remote_hosts[0] = host0
1362 self.vapi.nat44_forwarding_enable_disable(0)
1363 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1364 external_ip=alias_ip,
1367 def test_static_in(self):
1368 """ 1:1 NAT initialized from inside network """
1370 nat_ip = "10.0.0.10"
1371 self.tcp_port_out = 6303
1372 self.udp_port_out = 6304
1373 self.icmp_id_out = 6305
1375 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1376 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1377 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1381 pkts = self.create_stream_in(self.pg0, self.pg1)
1382 self.pg0.add_stream(pkts)
1383 self.pg_enable_capture(self.pg_interfaces)
1385 capture = self.pg1.get_capture(len(pkts))
1386 self.verify_capture_out(capture, nat_ip, True)
1389 pkts = self.create_stream_out(self.pg1, nat_ip)
1390 self.pg1.add_stream(pkts)
1391 self.pg_enable_capture(self.pg_interfaces)
1393 capture = self.pg0.get_capture(len(pkts))
1394 self.verify_capture_in(capture, self.pg0)
1396 def test_static_out(self):
1397 """ 1:1 NAT initialized from outside network """
1399 nat_ip = "10.0.0.20"
1400 self.tcp_port_out = 6303
1401 self.udp_port_out = 6304
1402 self.icmp_id_out = 6305
1404 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1410 pkts = self.create_stream_out(self.pg1, nat_ip)
1411 self.pg1.add_stream(pkts)
1412 self.pg_enable_capture(self.pg_interfaces)
1414 capture = self.pg0.get_capture(len(pkts))
1415 self.verify_capture_in(capture, self.pg0)
1418 pkts = self.create_stream_in(self.pg0, self.pg1)
1419 self.pg0.add_stream(pkts)
1420 self.pg_enable_capture(self.pg_interfaces)
1422 capture = self.pg1.get_capture(len(pkts))
1423 self.verify_capture_out(capture, nat_ip, True)
1425 def test_static_with_port_in(self):
1426 """ 1:1 NAPT initialized from inside network """
1428 self.tcp_port_out = 3606
1429 self.udp_port_out = 3607
1430 self.icmp_id_out = 3608
1432 self.nat44_add_address(self.nat_addr)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.tcp_port_in, self.tcp_port_out,
1435 proto=IP_PROTOS.tcp)
1436 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437 self.udp_port_in, self.udp_port_out,
1438 proto=IP_PROTOS.udp)
1439 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1440 self.icmp_id_in, self.icmp_id_out,
1441 proto=IP_PROTOS.icmp)
1442 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1443 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1447 pkts = self.create_stream_in(self.pg0, self.pg1)
1448 self.pg0.add_stream(pkts)
1449 self.pg_enable_capture(self.pg_interfaces)
1451 capture = self.pg1.get_capture(len(pkts))
1452 self.verify_capture_out(capture)
1455 pkts = self.create_stream_out(self.pg1)
1456 self.pg1.add_stream(pkts)
1457 self.pg_enable_capture(self.pg_interfaces)
1459 capture = self.pg0.get_capture(len(pkts))
1460 self.verify_capture_in(capture, self.pg0)
1462 def test_static_with_port_out(self):
1463 """ 1:1 NAPT initialized from outside network """
1465 self.tcp_port_out = 30606
1466 self.udp_port_out = 30607
1467 self.icmp_id_out = 30608
1469 self.nat44_add_address(self.nat_addr)
1470 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471 self.tcp_port_in, self.tcp_port_out,
1472 proto=IP_PROTOS.tcp)
1473 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474 self.udp_port_in, self.udp_port_out,
1475 proto=IP_PROTOS.udp)
1476 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1477 self.icmp_id_in, self.icmp_id_out,
1478 proto=IP_PROTOS.icmp)
1479 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1480 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1484 pkts = self.create_stream_out(self.pg1)
1485 self.pg1.add_stream(pkts)
1486 self.pg_enable_capture(self.pg_interfaces)
1488 capture = self.pg0.get_capture(len(pkts))
1489 self.verify_capture_in(capture, self.pg0)
1492 pkts = self.create_stream_in(self.pg0, self.pg1)
1493 self.pg0.add_stream(pkts)
1494 self.pg_enable_capture(self.pg_interfaces)
1496 capture = self.pg1.get_capture(len(pkts))
1497 self.verify_capture_out(capture)
1499 def test_static_with_port_out2(self):
1500 """ 1:1 NAPT symmetrical rule """
1505 self.vapi.nat44_forwarding_enable_disable(1)
1506 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1507 local_port, external_port,
1508 proto=IP_PROTOS.tcp, out2in_only=1)
1509 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1510 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1513 # from client to service
1514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1515 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1516 TCP(sport=12345, dport=external_port))
1517 self.pg1.add_stream(p)
1518 self.pg_enable_capture(self.pg_interfaces)
1520 capture = self.pg0.get_capture(1)
1526 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1527 self.assertEqual(tcp.dport, local_port)
1528 self.check_tcp_checksum(p)
1529 self.check_ip_checksum(p)
1531 self.logger.error(ppp("Unexpected or invalid packet:", p))
1534 # from service back to client
1535 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1536 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1537 TCP(sport=local_port, dport=12345))
1538 self.pg0.add_stream(p)
1539 self.pg_enable_capture(self.pg_interfaces)
1541 capture = self.pg1.get_capture(1)
1546 self.assertEqual(ip.src, self.nat_addr)
1547 self.assertEqual(tcp.sport, external_port)
1548 self.check_tcp_checksum(p)
1549 self.check_ip_checksum(p)
1551 self.logger.error(ppp("Unexpected or invalid packet:", p))
1554 # from client to server (no translation)
1555 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557 TCP(sport=12346, dport=local_port))
1558 self.pg1.add_stream(p)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg0.get_capture(1)
1567 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1568 self.assertEqual(tcp.dport, local_port)
1569 self.check_tcp_checksum(p)
1570 self.check_ip_checksum(p)
1572 self.logger.error(ppp("Unexpected or invalid packet:", p))
1575 # from service back to client (no translation)
1576 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1577 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1578 TCP(sport=local_port, dport=12346))
1579 self.pg0.add_stream(p)
1580 self.pg_enable_capture(self.pg_interfaces)
1582 capture = self.pg1.get_capture(1)
1587 self.assertEqual(ip.src, self.pg0.remote_ip4)
1588 self.assertEqual(tcp.sport, local_port)
1589 self.check_tcp_checksum(p)
1590 self.check_ip_checksum(p)
1592 self.logger.error(ppp("Unexpected or invalid packet:", p))
1595 def test_static_vrf_aware(self):
1596 """ 1:1 NAT VRF awareness """
1598 nat_ip1 = "10.0.0.30"
1599 nat_ip2 = "10.0.0.40"
1600 self.tcp_port_out = 6303
1601 self.udp_port_out = 6304
1602 self.icmp_id_out = 6305
1604 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1606 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1608 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1611 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1613 # inside interface VRF match NAT44 static mapping VRF
1614 pkts = self.create_stream_in(self.pg4, self.pg3)
1615 self.pg4.add_stream(pkts)
1616 self.pg_enable_capture(self.pg_interfaces)
1618 capture = self.pg3.get_capture(len(pkts))
1619 self.verify_capture_out(capture, nat_ip1, True)
1621 # inside interface VRF don't match NAT44 static mapping VRF (packets
1623 pkts = self.create_stream_in(self.pg0, self.pg3)
1624 self.pg0.add_stream(pkts)
1625 self.pg_enable_capture(self.pg_interfaces)
1627 self.pg3.assert_nothing_captured()
1629 def test_identity_nat(self):
1630 """ Identity NAT """
1632 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1633 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1634 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1637 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1638 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1639 TCP(sport=12345, dport=56789))
1640 self.pg1.add_stream(p)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg0.get_capture(1)
1648 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1649 self.assertEqual(ip.src, self.pg1.remote_ip4)
1650 self.assertEqual(tcp.dport, 56789)
1651 self.assertEqual(tcp.sport, 12345)
1652 self.check_tcp_checksum(p)
1653 self.check_ip_checksum(p)
1655 self.logger.error(ppp("Unexpected or invalid packet:", p))
1658 def test_static_lb(self):
1659 """ NAT44 local service load balancing """
1660 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1663 server1 = self.pg0.remote_hosts[0]
1664 server2 = self.pg0.remote_hosts[1]
1666 locals = [{'addr': server1.ip4n,
1669 {'addr': server2.ip4n,
1673 self.nat44_add_address(self.nat_addr)
1674 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1677 local_num=len(locals),
1679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1683 # from client to service
1684 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1685 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1686 TCP(sport=12345, dport=external_port))
1687 self.pg1.add_stream(p)
1688 self.pg_enable_capture(self.pg_interfaces)
1690 capture = self.pg0.get_capture(1)
1696 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1697 if ip.dst == server1.ip4:
1701 self.assertEqual(tcp.dport, local_port)
1702 self.check_tcp_checksum(p)
1703 self.check_ip_checksum(p)
1705 self.logger.error(ppp("Unexpected or invalid packet:", p))
1708 # from service back to client
1709 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1710 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1711 TCP(sport=local_port, dport=12345))
1712 self.pg0.add_stream(p)
1713 self.pg_enable_capture(self.pg_interfaces)
1715 capture = self.pg1.get_capture(1)
1720 self.assertEqual(ip.src, self.nat_addr)
1721 self.assertEqual(tcp.sport, external_port)
1722 self.check_tcp_checksum(p)
1723 self.check_ip_checksum(p)
1725 self.logger.error(ppp("Unexpected or invalid packet:", p))
1731 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1733 for client in clients:
1734 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1735 IP(src=client, dst=self.nat_addr) /
1736 TCP(sport=12345, dport=external_port))
1738 self.pg1.add_stream(pkts)
1739 self.pg_enable_capture(self.pg_interfaces)
1741 capture = self.pg0.get_capture(len(pkts))
1743 if p[IP].dst == server1.ip4:
1747 self.assertTrue(server1_n > server2_n)
1749 def test_static_lb_2(self):
1750 """ NAT44 local service load balancing (asymmetrical rule) """
1751 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1754 server1 = self.pg0.remote_hosts[0]
1755 server2 = self.pg0.remote_hosts[1]
1757 locals = [{'addr': server1.ip4n,
1760 {'addr': server2.ip4n,
1764 self.vapi.nat44_forwarding_enable_disable(1)
1765 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1769 local_num=len(locals),
1771 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1772 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1775 # from client to service
1776 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1777 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1778 TCP(sport=12345, dport=external_port))
1779 self.pg1.add_stream(p)
1780 self.pg_enable_capture(self.pg_interfaces)
1782 capture = self.pg0.get_capture(1)
1788 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1789 if ip.dst == server1.ip4:
1793 self.assertEqual(tcp.dport, local_port)
1794 self.check_tcp_checksum(p)
1795 self.check_ip_checksum(p)
1797 self.logger.error(ppp("Unexpected or invalid packet:", p))
1800 # from service back to client
1801 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1802 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1803 TCP(sport=local_port, dport=12345))
1804 self.pg0.add_stream(p)
1805 self.pg_enable_capture(self.pg_interfaces)
1807 capture = self.pg1.get_capture(1)
1812 self.assertEqual(ip.src, self.nat_addr)
1813 self.assertEqual(tcp.sport, external_port)
1814 self.check_tcp_checksum(p)
1815 self.check_ip_checksum(p)
1817 self.logger.error(ppp("Unexpected or invalid packet:", p))
1820 # from client to server (no translation)
1821 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1822 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1823 TCP(sport=12346, dport=local_port))
1824 self.pg1.add_stream(p)
1825 self.pg_enable_capture(self.pg_interfaces)
1827 capture = self.pg0.get_capture(1)
1833 self.assertEqual(ip.dst, server1.ip4)
1834 self.assertEqual(tcp.dport, local_port)
1835 self.check_tcp_checksum(p)
1836 self.check_ip_checksum(p)
1838 self.logger.error(ppp("Unexpected or invalid packet:", p))
1841 # from service back to client (no translation)
1842 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1843 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1844 TCP(sport=local_port, dport=12346))
1845 self.pg0.add_stream(p)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg1.get_capture(1)
1853 self.assertEqual(ip.src, server1.ip4)
1854 self.assertEqual(tcp.sport, local_port)
1855 self.check_tcp_checksum(p)
1856 self.check_ip_checksum(p)
1858 self.logger.error(ppp("Unexpected or invalid packet:", p))
1861 def test_multiple_inside_interfaces(self):
1862 """ NAT44 multiple non-overlapping address space inside interfaces """
1864 self.nat44_add_address(self.nat_addr)
1865 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1866 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1867 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1870 # between two NAT44 inside interfaces (no translation)
1871 pkts = self.create_stream_in(self.pg0, self.pg1)
1872 self.pg0.add_stream(pkts)
1873 self.pg_enable_capture(self.pg_interfaces)
1875 capture = self.pg1.get_capture(len(pkts))
1876 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1878 # from NAT44 inside to interface without NAT44 feature (no translation)
1879 pkts = self.create_stream_in(self.pg0, self.pg2)
1880 self.pg0.add_stream(pkts)
1881 self.pg_enable_capture(self.pg_interfaces)
1883 capture = self.pg2.get_capture(len(pkts))
1884 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1886 # in2out 1st interface
1887 pkts = self.create_stream_in(self.pg0, self.pg3)
1888 self.pg0.add_stream(pkts)
1889 self.pg_enable_capture(self.pg_interfaces)
1891 capture = self.pg3.get_capture(len(pkts))
1892 self.verify_capture_out(capture)
1894 # out2in 1st interface
1895 pkts = self.create_stream_out(self.pg3)
1896 self.pg3.add_stream(pkts)
1897 self.pg_enable_capture(self.pg_interfaces)
1899 capture = self.pg0.get_capture(len(pkts))
1900 self.verify_capture_in(capture, self.pg0)
1902 # in2out 2nd interface
1903 pkts = self.create_stream_in(self.pg1, self.pg3)
1904 self.pg1.add_stream(pkts)
1905 self.pg_enable_capture(self.pg_interfaces)
1907 capture = self.pg3.get_capture(len(pkts))
1908 self.verify_capture_out(capture)
1910 # out2in 2nd interface
1911 pkts = self.create_stream_out(self.pg3)
1912 self.pg3.add_stream(pkts)
1913 self.pg_enable_capture(self.pg_interfaces)
1915 capture = self.pg1.get_capture(len(pkts))
1916 self.verify_capture_in(capture, self.pg1)
1918 def test_inside_overlapping_interfaces(self):
1919 """ NAT44 multiple inside interfaces with overlapping address space """
1921 static_nat_ip = "10.0.0.10"
1922 self.nat44_add_address(self.nat_addr)
1923 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1925 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1926 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1927 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1928 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1931 # between NAT44 inside interfaces with same VRF (no translation)
1932 pkts = self.create_stream_in(self.pg4, self.pg5)
1933 self.pg4.add_stream(pkts)
1934 self.pg_enable_capture(self.pg_interfaces)
1936 capture = self.pg5.get_capture(len(pkts))
1937 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1939 # between NAT44 inside interfaces with different VRF (hairpinning)
1940 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1941 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1942 TCP(sport=1234, dport=5678))
1943 self.pg4.add_stream(p)
1944 self.pg_enable_capture(self.pg_interfaces)
1946 capture = self.pg6.get_capture(1)
1951 self.assertEqual(ip.src, self.nat_addr)
1952 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1953 self.assertNotEqual(tcp.sport, 1234)
1954 self.assertEqual(tcp.dport, 5678)
1956 self.logger.error(ppp("Unexpected or invalid packet:", p))
1959 # in2out 1st interface
1960 pkts = self.create_stream_in(self.pg4, self.pg3)
1961 self.pg4.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg3.get_capture(len(pkts))
1965 self.verify_capture_out(capture)
1967 # out2in 1st interface
1968 pkts = self.create_stream_out(self.pg3)
1969 self.pg3.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg4.get_capture(len(pkts))
1973 self.verify_capture_in(capture, self.pg4)
1975 # in2out 2nd interface
1976 pkts = self.create_stream_in(self.pg5, self.pg3)
1977 self.pg5.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg3.get_capture(len(pkts))
1981 self.verify_capture_out(capture)
1983 # out2in 2nd interface
1984 pkts = self.create_stream_out(self.pg3)
1985 self.pg3.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg5.get_capture(len(pkts))
1989 self.verify_capture_in(capture, self.pg5)
1992 addresses = self.vapi.nat44_address_dump()
1993 self.assertEqual(len(addresses), 1)
1994 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1995 self.assertEqual(len(sessions), 3)
1996 for session in sessions:
1997 self.assertFalse(session.is_static)
1998 self.assertEqual(session.inside_ip_address[0:4],
1999 self.pg5.remote_ip4n)
2000 self.assertEqual(session.outside_ip_address,
2001 addresses[0].ip_address)
2002 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2003 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2004 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2005 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2006 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2007 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2008 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2009 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2010 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2012 # in2out 3rd interface
2013 pkts = self.create_stream_in(self.pg6, self.pg3)
2014 self.pg6.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg3.get_capture(len(pkts))
2018 self.verify_capture_out(capture, static_nat_ip, True)
2020 # out2in 3rd interface
2021 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2022 self.pg3.add_stream(pkts)
2023 self.pg_enable_capture(self.pg_interfaces)
2025 capture = self.pg6.get_capture(len(pkts))
2026 self.verify_capture_in(capture, self.pg6)
2028 # general user and session dump verifications
2029 users = self.vapi.nat44_user_dump()
2030 self.assertTrue(len(users) >= 3)
2031 addresses = self.vapi.nat44_address_dump()
2032 self.assertEqual(len(addresses), 1)
2034 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2036 for session in sessions:
2037 self.assertEqual(user.ip_address, session.inside_ip_address)
2038 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2039 self.assertTrue(session.protocol in
2040 [IP_PROTOS.tcp, IP_PROTOS.udp,
2044 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2045 self.assertTrue(len(sessions) >= 4)
2046 for session in sessions:
2047 self.assertFalse(session.is_static)
2048 self.assertEqual(session.inside_ip_address[0:4],
2049 self.pg4.remote_ip4n)
2050 self.assertEqual(session.outside_ip_address,
2051 addresses[0].ip_address)
2054 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2055 self.assertTrue(len(sessions) >= 3)
2056 for session in sessions:
2057 self.assertTrue(session.is_static)
2058 self.assertEqual(session.inside_ip_address[0:4],
2059 self.pg6.remote_ip4n)
2060 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2061 map(int, static_nat_ip.split('.')))
2062 self.assertTrue(session.inside_port in
2063 [self.tcp_port_in, self.udp_port_in,
2066 def test_hairpinning(self):
2067 """ NAT44 hairpinning - 1:1 NAPT """
2069 host = self.pg0.remote_hosts[0]
2070 server = self.pg0.remote_hosts[1]
2073 server_in_port = 5678
2074 server_out_port = 8765
2076 self.nat44_add_address(self.nat_addr)
2077 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2078 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2080 # add static mapping for server
2081 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2082 server_in_port, server_out_port,
2083 proto=IP_PROTOS.tcp)
2085 # send packet from host to server
2086 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2087 IP(src=host.ip4, dst=self.nat_addr) /
2088 TCP(sport=host_in_port, dport=server_out_port))
2089 self.pg0.add_stream(p)
2090 self.pg_enable_capture(self.pg_interfaces)
2092 capture = self.pg0.get_capture(1)
2097 self.assertEqual(ip.src, self.nat_addr)
2098 self.assertEqual(ip.dst, server.ip4)
2099 self.assertNotEqual(tcp.sport, host_in_port)
2100 self.assertEqual(tcp.dport, server_in_port)
2101 self.check_tcp_checksum(p)
2102 host_out_port = tcp.sport
2104 self.logger.error(ppp("Unexpected or invalid packet:", p))
2107 # send reply from server to host
2108 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2109 IP(src=server.ip4, dst=self.nat_addr) /
2110 TCP(sport=server_in_port, dport=host_out_port))
2111 self.pg0.add_stream(p)
2112 self.pg_enable_capture(self.pg_interfaces)
2114 capture = self.pg0.get_capture(1)
2119 self.assertEqual(ip.src, self.nat_addr)
2120 self.assertEqual(ip.dst, host.ip4)
2121 self.assertEqual(tcp.sport, server_out_port)
2122 self.assertEqual(tcp.dport, host_in_port)
2123 self.check_tcp_checksum(p)
2125 self.logger.error(ppp("Unexpected or invalid packet:", p))
2128 def test_hairpinning2(self):
2129 """ NAT44 hairpinning - 1:1 NAT"""
2131 server1_nat_ip = "10.0.0.10"
2132 server2_nat_ip = "10.0.0.11"
2133 host = self.pg0.remote_hosts[0]
2134 server1 = self.pg0.remote_hosts[1]
2135 server2 = self.pg0.remote_hosts[2]
2136 server_tcp_port = 22
2137 server_udp_port = 20
2139 self.nat44_add_address(self.nat_addr)
2140 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2141 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2144 # add static mapping for servers
2145 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2146 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2150 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2151 IP(src=host.ip4, dst=server1_nat_ip) /
2152 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2154 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2155 IP(src=host.ip4, dst=server1_nat_ip) /
2156 UDP(sport=self.udp_port_in, dport=server_udp_port))
2158 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2159 IP(src=host.ip4, dst=server1_nat_ip) /
2160 ICMP(id=self.icmp_id_in, type='echo-request'))
2162 self.pg0.add_stream(pkts)
2163 self.pg_enable_capture(self.pg_interfaces)
2165 capture = self.pg0.get_capture(len(pkts))
2166 for packet in capture:
2168 self.assertEqual(packet[IP].src, self.nat_addr)
2169 self.assertEqual(packet[IP].dst, server1.ip4)
2170 if packet.haslayer(TCP):
2171 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2172 self.assertEqual(packet[TCP].dport, server_tcp_port)
2173 self.tcp_port_out = packet[TCP].sport
2174 self.check_tcp_checksum(packet)
2175 elif packet.haslayer(UDP):
2176 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2177 self.assertEqual(packet[UDP].dport, server_udp_port)
2178 self.udp_port_out = packet[UDP].sport
2180 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2181 self.icmp_id_out = packet[ICMP].id
2183 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2188 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2189 IP(src=server1.ip4, dst=self.nat_addr) /
2190 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2192 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2193 IP(src=server1.ip4, dst=self.nat_addr) /
2194 UDP(sport=server_udp_port, dport=self.udp_port_out))
2196 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2197 IP(src=server1.ip4, dst=self.nat_addr) /
2198 ICMP(id=self.icmp_id_out, type='echo-reply'))
2200 self.pg0.add_stream(pkts)
2201 self.pg_enable_capture(self.pg_interfaces)
2203 capture = self.pg0.get_capture(len(pkts))
2204 for packet in capture:
2206 self.assertEqual(packet[IP].src, server1_nat_ip)
2207 self.assertEqual(packet[IP].dst, host.ip4)
2208 if packet.haslayer(TCP):
2209 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2210 self.assertEqual(packet[TCP].sport, server_tcp_port)
2211 self.check_tcp_checksum(packet)
2212 elif packet.haslayer(UDP):
2213 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2214 self.assertEqual(packet[UDP].sport, server_udp_port)
2216 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2218 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2221 # server2 to server1
2223 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2224 IP(src=server2.ip4, dst=server1_nat_ip) /
2225 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2227 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2228 IP(src=server2.ip4, dst=server1_nat_ip) /
2229 UDP(sport=self.udp_port_in, dport=server_udp_port))
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=server2.ip4, dst=server1_nat_ip) /
2233 ICMP(id=self.icmp_id_in, type='echo-request'))
2235 self.pg0.add_stream(pkts)
2236 self.pg_enable_capture(self.pg_interfaces)
2238 capture = self.pg0.get_capture(len(pkts))
2239 for packet in capture:
2241 self.assertEqual(packet[IP].src, server2_nat_ip)
2242 self.assertEqual(packet[IP].dst, server1.ip4)
2243 if packet.haslayer(TCP):
2244 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2245 self.assertEqual(packet[TCP].dport, server_tcp_port)
2246 self.tcp_port_out = packet[TCP].sport
2247 self.check_tcp_checksum(packet)
2248 elif packet.haslayer(UDP):
2249 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2250 self.assertEqual(packet[UDP].dport, server_udp_port)
2251 self.udp_port_out = packet[UDP].sport
2253 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2254 self.icmp_id_out = packet[ICMP].id
2256 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2259 # server1 to server2
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=server1.ip4, dst=server2_nat_ip) /
2263 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=server1.ip4, dst=server2_nat_ip) /
2267 UDP(sport=server_udp_port, dport=self.udp_port_out))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=server2_nat_ip) /
2271 ICMP(id=self.icmp_id_out, type='echo-reply'))
2273 self.pg0.add_stream(pkts)
2274 self.pg_enable_capture(self.pg_interfaces)
2276 capture = self.pg0.get_capture(len(pkts))
2277 for packet in capture:
2279 self.assertEqual(packet[IP].src, server1_nat_ip)
2280 self.assertEqual(packet[IP].dst, server2.ip4)
2281 if packet.haslayer(TCP):
2282 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2283 self.assertEqual(packet[TCP].sport, server_tcp_port)
2284 self.check_tcp_checksum(packet)
2285 elif packet.haslayer(UDP):
2286 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2287 self.assertEqual(packet[UDP].sport, server_udp_port)
2289 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2291 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2294 def test_max_translations_per_user(self):
2295 """ MAX translations per user - recycle the least recently used """
2297 self.nat44_add_address(self.nat_addr)
2298 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2299 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2302 # get maximum number of translations per user
2303 nat44_config = self.vapi.nat_show_config()
2305 # send more than maximum number of translations per user packets
2306 pkts_num = nat44_config.max_translations_per_user + 5
2308 for port in range(0, pkts_num):
2309 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2310 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2311 TCP(sport=1025 + port))
2313 self.pg0.add_stream(pkts)
2314 self.pg_enable_capture(self.pg_interfaces)
2317 # verify number of translated packet
2318 self.pg1.get_capture(pkts_num)
2320 def test_interface_addr(self):
2321 """ Acquire NAT44 addresses from interface """
2322 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2324 # no address in NAT pool
2325 adresses = self.vapi.nat44_address_dump()
2326 self.assertEqual(0, len(adresses))
2328 # configure interface address and check NAT address pool
2329 self.pg7.config_ip4()
2330 adresses = self.vapi.nat44_address_dump()
2331 self.assertEqual(1, len(adresses))
2332 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2334 # remove interface address and check NAT address pool
2335 self.pg7.unconfig_ip4()
2336 adresses = self.vapi.nat44_address_dump()
2337 self.assertEqual(0, len(adresses))
2339 def test_interface_addr_static_mapping(self):
2340 """ Static mapping with addresses from interface """
2341 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2342 self.nat44_add_static_mapping(
2344 external_sw_if_index=self.pg7.sw_if_index)
2346 # static mappings with external interface
2347 static_mappings = self.vapi.nat44_static_mapping_dump()
2348 self.assertEqual(1, len(static_mappings))
2349 self.assertEqual(self.pg7.sw_if_index,
2350 static_mappings[0].external_sw_if_index)
2352 # configure interface address and check static mappings
2353 self.pg7.config_ip4()
2354 static_mappings = self.vapi.nat44_static_mapping_dump()
2355 self.assertEqual(1, len(static_mappings))
2356 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2357 self.pg7.local_ip4n)
2358 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2360 # remove interface address and check static mappings
2361 self.pg7.unconfig_ip4()
2362 static_mappings = self.vapi.nat44_static_mapping_dump()
2363 self.assertEqual(0, len(static_mappings))
2365 def test_interface_addr_identity_nat(self):
2366 """ Identity NAT with addresses from interface """
2369 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2370 self.vapi.nat44_add_del_identity_mapping(
2371 sw_if_index=self.pg7.sw_if_index,
2373 protocol=IP_PROTOS.tcp,
2376 # identity mappings with external interface
2377 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2378 self.assertEqual(1, len(identity_mappings))
2379 self.assertEqual(self.pg7.sw_if_index,
2380 identity_mappings[0].sw_if_index)
2382 # configure interface address and check identity mappings
2383 self.pg7.config_ip4()
2384 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2385 self.assertEqual(1, len(identity_mappings))
2386 self.assertEqual(identity_mappings[0].ip_address,
2387 self.pg7.local_ip4n)
2388 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2389 self.assertEqual(port, identity_mappings[0].port)
2390 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2392 # remove interface address and check identity mappings
2393 self.pg7.unconfig_ip4()
2394 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2395 self.assertEqual(0, len(identity_mappings))
2397 def test_ipfix_nat44_sess(self):
2398 """ IPFIX logging NAT44 session created/delted """
2399 self.ipfix_domain_id = 10
2400 self.ipfix_src_port = 20202
2401 colector_port = 30303
2402 bind_layers(UDP, IPFIX, dport=30303)
2403 self.nat44_add_address(self.nat_addr)
2404 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2405 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2407 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2408 src_address=self.pg3.local_ip4n,
2410 template_interval=10,
2411 collector_port=colector_port)
2412 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2413 src_port=self.ipfix_src_port)
2415 pkts = self.create_stream_in(self.pg0, self.pg1)
2416 self.pg0.add_stream(pkts)
2417 self.pg_enable_capture(self.pg_interfaces)
2419 capture = self.pg1.get_capture(len(pkts))
2420 self.verify_capture_out(capture)
2421 self.nat44_add_address(self.nat_addr, is_add=0)
2422 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2423 capture = self.pg3.get_capture(9)
2424 ipfix = IPFIXDecoder()
2425 # first load template
2427 self.assertTrue(p.haslayer(IPFIX))
2428 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2429 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2430 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2431 self.assertEqual(p[UDP].dport, colector_port)
2432 self.assertEqual(p[IPFIX].observationDomainID,
2433 self.ipfix_domain_id)
2434 if p.haslayer(Template):
2435 ipfix.add_template(p.getlayer(Template))
2436 # verify events in data set
2438 if p.haslayer(Data):
2439 data = ipfix.decode_data_set(p.getlayer(Set))
2440 self.verify_ipfix_nat44_ses(data)
2442 def test_ipfix_addr_exhausted(self):
2443 """ IPFIX logging NAT addresses exhausted """
2444 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2445 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2447 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2448 src_address=self.pg3.local_ip4n,
2450 template_interval=10)
2451 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2452 src_port=self.ipfix_src_port)
2454 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2455 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2457 self.pg0.add_stream(p)
2458 self.pg_enable_capture(self.pg_interfaces)
2460 capture = self.pg1.get_capture(0)
2461 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2462 capture = self.pg3.get_capture(9)
2463 ipfix = IPFIXDecoder()
2464 # first load template
2466 self.assertTrue(p.haslayer(IPFIX))
2467 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2468 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2469 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2470 self.assertEqual(p[UDP].dport, 4739)
2471 self.assertEqual(p[IPFIX].observationDomainID,
2472 self.ipfix_domain_id)
2473 if p.haslayer(Template):
2474 ipfix.add_template(p.getlayer(Template))
2475 # verify events in data set
2477 if p.haslayer(Data):
2478 data = ipfix.decode_data_set(p.getlayer(Set))
2479 self.verify_ipfix_addr_exhausted(data)
2481 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2482 def test_ipfix_max_sessions(self):
2483 """ IPFIX logging maximum session entries exceeded """
2484 self.nat44_add_address(self.nat_addr)
2485 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2486 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2489 nat44_config = self.vapi.nat_show_config()
2490 max_sessions = 10 * nat44_config.translation_buckets
2493 for i in range(0, max_sessions):
2494 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2495 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2496 IP(src=src, dst=self.pg1.remote_ip4) /
2499 self.pg0.add_stream(pkts)
2500 self.pg_enable_capture(self.pg_interfaces)
2503 self.pg1.get_capture(max_sessions)
2504 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2505 src_address=self.pg3.local_ip4n,
2507 template_interval=10)
2508 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2509 src_port=self.ipfix_src_port)
2511 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2512 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2514 self.pg0.add_stream(p)
2515 self.pg_enable_capture(self.pg_interfaces)
2517 self.pg1.get_capture(0)
2518 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2519 capture = self.pg3.get_capture(9)
2520 ipfix = IPFIXDecoder()
2521 # first load template
2523 self.assertTrue(p.haslayer(IPFIX))
2524 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2525 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2526 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2527 self.assertEqual(p[UDP].dport, 4739)
2528 self.assertEqual(p[IPFIX].observationDomainID,
2529 self.ipfix_domain_id)
2530 if p.haslayer(Template):
2531 ipfix.add_template(p.getlayer(Template))
2532 # verify events in data set
2534 if p.haslayer(Data):
2535 data = ipfix.decode_data_set(p.getlayer(Set))
2536 self.verify_ipfix_max_sessions(data, max_sessions)
2538 def test_pool_addr_fib(self):
2539 """ NAT44 add pool addresses to FIB """
2540 static_addr = '10.0.0.10'
2541 self.nat44_add_address(self.nat_addr)
2542 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2543 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2545 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2548 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2549 ARP(op=ARP.who_has, pdst=self.nat_addr,
2550 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2551 self.pg1.add_stream(p)
2552 self.pg_enable_capture(self.pg_interfaces)
2554 capture = self.pg1.get_capture(1)
2555 self.assertTrue(capture[0].haslayer(ARP))
2556 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2559 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2560 ARP(op=ARP.who_has, pdst=static_addr,
2561 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2562 self.pg1.add_stream(p)
2563 self.pg_enable_capture(self.pg_interfaces)
2565 capture = self.pg1.get_capture(1)
2566 self.assertTrue(capture[0].haslayer(ARP))
2567 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2569 # send ARP to non-NAT44 interface
2570 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2571 ARP(op=ARP.who_has, pdst=self.nat_addr,
2572 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2573 self.pg2.add_stream(p)
2574 self.pg_enable_capture(self.pg_interfaces)
2576 capture = self.pg1.get_capture(0)
2578 # remove addresses and verify
2579 self.nat44_add_address(self.nat_addr, is_add=0)
2580 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2583 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2584 ARP(op=ARP.who_has, pdst=self.nat_addr,
2585 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2586 self.pg1.add_stream(p)
2587 self.pg_enable_capture(self.pg_interfaces)
2589 capture = self.pg1.get_capture(0)
2591 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2592 ARP(op=ARP.who_has, pdst=static_addr,
2593 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2594 self.pg1.add_stream(p)
2595 self.pg_enable_capture(self.pg_interfaces)
2597 capture = self.pg1.get_capture(0)
2599 def test_vrf_mode(self):
2600 """ NAT44 tenant VRF aware address pool mode """
2604 nat_ip1 = "10.0.0.10"
2605 nat_ip2 = "10.0.0.11"
2607 self.pg0.unconfig_ip4()
2608 self.pg1.unconfig_ip4()
2609 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2610 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2611 self.pg0.set_table_ip4(vrf_id1)
2612 self.pg1.set_table_ip4(vrf_id2)
2613 self.pg0.config_ip4()
2614 self.pg1.config_ip4()
2616 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2617 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2618 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2619 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2620 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2624 pkts = self.create_stream_in(self.pg0, self.pg2)
2625 self.pg0.add_stream(pkts)
2626 self.pg_enable_capture(self.pg_interfaces)
2628 capture = self.pg2.get_capture(len(pkts))
2629 self.verify_capture_out(capture, nat_ip1)
2632 pkts = self.create_stream_in(self.pg1, self.pg2)
2633 self.pg1.add_stream(pkts)
2634 self.pg_enable_capture(self.pg_interfaces)
2636 capture = self.pg2.get_capture(len(pkts))
2637 self.verify_capture_out(capture, nat_ip2)
2639 self.pg0.unconfig_ip4()
2640 self.pg1.unconfig_ip4()
2641 self.pg0.set_table_ip4(0)
2642 self.pg1.set_table_ip4(0)
2643 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2644 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2646 def test_vrf_feature_independent(self):
2647 """ NAT44 tenant VRF independent address pool mode """
2649 nat_ip1 = "10.0.0.10"
2650 nat_ip2 = "10.0.0.11"
2652 self.nat44_add_address(nat_ip1)
2653 self.nat44_add_address(nat_ip2, vrf_id=99)
2654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2656 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2660 pkts = self.create_stream_in(self.pg0, self.pg2)
2661 self.pg0.add_stream(pkts)
2662 self.pg_enable_capture(self.pg_interfaces)
2664 capture = self.pg2.get_capture(len(pkts))
2665 self.verify_capture_out(capture, nat_ip1)
2668 pkts = self.create_stream_in(self.pg1, self.pg2)
2669 self.pg1.add_stream(pkts)
2670 self.pg_enable_capture(self.pg_interfaces)
2672 capture = self.pg2.get_capture(len(pkts))
2673 self.verify_capture_out(capture, nat_ip1)
2675 def test_dynamic_ipless_interfaces(self):
2676 """ NAT44 interfaces without configured IP address """
2678 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2679 mactobinary(self.pg7.remote_mac),
2680 self.pg7.remote_ip4n,
2682 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2683 mactobinary(self.pg8.remote_mac),
2684 self.pg8.remote_ip4n,
2687 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2688 dst_address_length=32,
2689 next_hop_address=self.pg7.remote_ip4n,
2690 next_hop_sw_if_index=self.pg7.sw_if_index)
2691 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2692 dst_address_length=32,
2693 next_hop_address=self.pg8.remote_ip4n,
2694 next_hop_sw_if_index=self.pg8.sw_if_index)
2696 self.nat44_add_address(self.nat_addr)
2697 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2698 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2702 pkts = self.create_stream_in(self.pg7, self.pg8)
2703 self.pg7.add_stream(pkts)
2704 self.pg_enable_capture(self.pg_interfaces)
2706 capture = self.pg8.get_capture(len(pkts))
2707 self.verify_capture_out(capture)
2710 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2711 self.pg8.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg7.get_capture(len(pkts))
2715 self.verify_capture_in(capture, self.pg7)
2717 def test_static_ipless_interfaces(self):
2718 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2720 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2721 mactobinary(self.pg7.remote_mac),
2722 self.pg7.remote_ip4n,
2724 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2725 mactobinary(self.pg8.remote_mac),
2726 self.pg8.remote_ip4n,
2729 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2730 dst_address_length=32,
2731 next_hop_address=self.pg7.remote_ip4n,
2732 next_hop_sw_if_index=self.pg7.sw_if_index)
2733 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2734 dst_address_length=32,
2735 next_hop_address=self.pg8.remote_ip4n,
2736 next_hop_sw_if_index=self.pg8.sw_if_index)
2738 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2739 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2740 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2744 pkts = self.create_stream_out(self.pg8)
2745 self.pg8.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg7.get_capture(len(pkts))
2749 self.verify_capture_in(capture, self.pg7)
2752 pkts = self.create_stream_in(self.pg7, self.pg8)
2753 self.pg7.add_stream(pkts)
2754 self.pg_enable_capture(self.pg_interfaces)
2756 capture = self.pg8.get_capture(len(pkts))
2757 self.verify_capture_out(capture, self.nat_addr, True)
2759 def test_static_with_port_ipless_interfaces(self):
2760 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2762 self.tcp_port_out = 30606
2763 self.udp_port_out = 30607
2764 self.icmp_id_out = 30608
2766 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2767 mactobinary(self.pg7.remote_mac),
2768 self.pg7.remote_ip4n,
2770 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2771 mactobinary(self.pg8.remote_mac),
2772 self.pg8.remote_ip4n,
2775 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2776 dst_address_length=32,
2777 next_hop_address=self.pg7.remote_ip4n,
2778 next_hop_sw_if_index=self.pg7.sw_if_index)
2779 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2780 dst_address_length=32,
2781 next_hop_address=self.pg8.remote_ip4n,
2782 next_hop_sw_if_index=self.pg8.sw_if_index)
2784 self.nat44_add_address(self.nat_addr)
2785 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2786 self.tcp_port_in, self.tcp_port_out,
2787 proto=IP_PROTOS.tcp)
2788 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2789 self.udp_port_in, self.udp_port_out,
2790 proto=IP_PROTOS.udp)
2791 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2792 self.icmp_id_in, self.icmp_id_out,
2793 proto=IP_PROTOS.icmp)
2794 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2795 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2799 pkts = self.create_stream_out(self.pg8)
2800 self.pg8.add_stream(pkts)
2801 self.pg_enable_capture(self.pg_interfaces)
2803 capture = self.pg7.get_capture(len(pkts))
2804 self.verify_capture_in(capture, self.pg7)
2807 pkts = self.create_stream_in(self.pg7, self.pg8)
2808 self.pg7.add_stream(pkts)
2809 self.pg_enable_capture(self.pg_interfaces)
2811 capture = self.pg8.get_capture(len(pkts))
2812 self.verify_capture_out(capture)
2814 def test_static_unknown_proto(self):
2815 """ 1:1 NAT translate packet with unknown protocol """
2816 nat_ip = "10.0.0.10"
2817 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2818 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2819 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2823 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2824 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2826 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2827 TCP(sport=1234, dport=1234))
2828 self.pg0.add_stream(p)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 p = self.pg1.get_capture(1)
2834 self.assertEqual(packet[IP].src, nat_ip)
2835 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2836 self.assertTrue(packet.haslayer(GRE))
2837 self.check_ip_checksum(packet)
2839 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2843 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2844 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2846 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2847 TCP(sport=1234, dport=1234))
2848 self.pg1.add_stream(p)
2849 self.pg_enable_capture(self.pg_interfaces)
2851 p = self.pg0.get_capture(1)
2854 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2855 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2856 self.assertTrue(packet.haslayer(GRE))
2857 self.check_ip_checksum(packet)
2859 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2862 def test_hairpinning_static_unknown_proto(self):
2863 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2865 host = self.pg0.remote_hosts[0]
2866 server = self.pg0.remote_hosts[1]
2868 host_nat_ip = "10.0.0.10"
2869 server_nat_ip = "10.0.0.11"
2871 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2872 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2873 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2874 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2878 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2879 IP(src=host.ip4, dst=server_nat_ip) /
2881 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2882 TCP(sport=1234, dport=1234))
2883 self.pg0.add_stream(p)
2884 self.pg_enable_capture(self.pg_interfaces)
2886 p = self.pg0.get_capture(1)
2889 self.assertEqual(packet[IP].src, host_nat_ip)
2890 self.assertEqual(packet[IP].dst, server.ip4)
2891 self.assertTrue(packet.haslayer(GRE))
2892 self.check_ip_checksum(packet)
2894 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2898 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2899 IP(src=server.ip4, dst=host_nat_ip) /
2901 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2902 TCP(sport=1234, dport=1234))
2903 self.pg0.add_stream(p)
2904 self.pg_enable_capture(self.pg_interfaces)
2906 p = self.pg0.get_capture(1)
2909 self.assertEqual(packet[IP].src, server_nat_ip)
2910 self.assertEqual(packet[IP].dst, host.ip4)
2911 self.assertTrue(packet.haslayer(GRE))
2912 self.check_ip_checksum(packet)
2914 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2917 def test_unknown_proto(self):
2918 """ NAT44 translate packet with unknown protocol """
2919 self.nat44_add_address(self.nat_addr)
2920 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2921 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2925 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2926 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2927 TCP(sport=self.tcp_port_in, dport=20))
2928 self.pg0.add_stream(p)
2929 self.pg_enable_capture(self.pg_interfaces)
2931 p = self.pg1.get_capture(1)
2933 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2934 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2936 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2937 TCP(sport=1234, dport=1234))
2938 self.pg0.add_stream(p)
2939 self.pg_enable_capture(self.pg_interfaces)
2941 p = self.pg1.get_capture(1)
2944 self.assertEqual(packet[IP].src, self.nat_addr)
2945 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2946 self.assertTrue(packet.haslayer(GRE))
2947 self.check_ip_checksum(packet)
2949 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2953 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2954 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2956 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2957 TCP(sport=1234, dport=1234))
2958 self.pg1.add_stream(p)
2959 self.pg_enable_capture(self.pg_interfaces)
2961 p = self.pg0.get_capture(1)
2964 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2965 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2966 self.assertTrue(packet.haslayer(GRE))
2967 self.check_ip_checksum(packet)
2969 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2972 def test_hairpinning_unknown_proto(self):
2973 """ NAT44 translate packet with unknown protocol - hairpinning """
2974 host = self.pg0.remote_hosts[0]
2975 server = self.pg0.remote_hosts[1]
2978 server_in_port = 5678
2979 server_out_port = 8765
2980 server_nat_ip = "10.0.0.11"
2982 self.nat44_add_address(self.nat_addr)
2983 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2984 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2987 # add static mapping for server
2988 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2991 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2992 IP(src=host.ip4, dst=server_nat_ip) /
2993 TCP(sport=host_in_port, dport=server_out_port))
2994 self.pg0.add_stream(p)
2995 self.pg_enable_capture(self.pg_interfaces)
2997 capture = self.pg0.get_capture(1)
2999 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3000 IP(src=host.ip4, dst=server_nat_ip) /
3002 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3003 TCP(sport=1234, dport=1234))
3004 self.pg0.add_stream(p)
3005 self.pg_enable_capture(self.pg_interfaces)
3007 p = self.pg0.get_capture(1)
3010 self.assertEqual(packet[IP].src, self.nat_addr)
3011 self.assertEqual(packet[IP].dst, server.ip4)
3012 self.assertTrue(packet.haslayer(GRE))
3013 self.check_ip_checksum(packet)
3015 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3019 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3020 IP(src=server.ip4, dst=self.nat_addr) /
3022 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg0.get_capture(1)
3030 self.assertEqual(packet[IP].src, server_nat_ip)
3031 self.assertEqual(packet[IP].dst, host.ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3038 def test_output_feature(self):
3039 """ NAT44 interface output feature (in2out postrouting) """
3040 self.nat44_add_address(self.nat_addr)
3041 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3042 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3043 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3047 pkts = self.create_stream_in(self.pg0, self.pg3)
3048 self.pg0.add_stream(pkts)
3049 self.pg_enable_capture(self.pg_interfaces)
3051 capture = self.pg3.get_capture(len(pkts))
3052 self.verify_capture_out(capture)
3055 pkts = self.create_stream_out(self.pg3)
3056 self.pg3.add_stream(pkts)
3057 self.pg_enable_capture(self.pg_interfaces)
3059 capture = self.pg0.get_capture(len(pkts))
3060 self.verify_capture_in(capture, self.pg0)
3062 # from non-NAT interface to NAT inside interface
3063 pkts = self.create_stream_in(self.pg2, self.pg0)
3064 self.pg2.add_stream(pkts)
3065 self.pg_enable_capture(self.pg_interfaces)
3067 capture = self.pg0.get_capture(len(pkts))
3068 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3070 def test_output_feature_vrf_aware(self):
3071 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3072 nat_ip_vrf10 = "10.0.0.10"
3073 nat_ip_vrf20 = "10.0.0.20"
3075 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3076 dst_address_length=32,
3077 next_hop_address=self.pg3.remote_ip4n,
3078 next_hop_sw_if_index=self.pg3.sw_if_index,
3080 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3081 dst_address_length=32,
3082 next_hop_address=self.pg3.remote_ip4n,
3083 next_hop_sw_if_index=self.pg3.sw_if_index,
3086 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3087 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3088 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3089 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3090 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3094 pkts = self.create_stream_in(self.pg4, self.pg3)
3095 self.pg4.add_stream(pkts)
3096 self.pg_enable_capture(self.pg_interfaces)
3098 capture = self.pg3.get_capture(len(pkts))
3099 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3102 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3103 self.pg3.add_stream(pkts)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg4.get_capture(len(pkts))
3107 self.verify_capture_in(capture, self.pg4)
3110 pkts = self.create_stream_in(self.pg6, self.pg3)
3111 self.pg6.add_stream(pkts)
3112 self.pg_enable_capture(self.pg_interfaces)
3114 capture = self.pg3.get_capture(len(pkts))
3115 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3118 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3119 self.pg3.add_stream(pkts)
3120 self.pg_enable_capture(self.pg_interfaces)
3122 capture = self.pg6.get_capture(len(pkts))
3123 self.verify_capture_in(capture, self.pg6)
3125 def test_output_feature_hairpinning(self):
3126 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3127 host = self.pg0.remote_hosts[0]
3128 server = self.pg0.remote_hosts[1]
3131 server_in_port = 5678
3132 server_out_port = 8765
3134 self.nat44_add_address(self.nat_addr)
3135 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3136 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3139 # add static mapping for server
3140 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3141 server_in_port, server_out_port,
3142 proto=IP_PROTOS.tcp)
3144 # send packet from host to server
3145 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3146 IP(src=host.ip4, dst=self.nat_addr) /
3147 TCP(sport=host_in_port, dport=server_out_port))
3148 self.pg0.add_stream(p)
3149 self.pg_enable_capture(self.pg_interfaces)
3151 capture = self.pg0.get_capture(1)
3156 self.assertEqual(ip.src, self.nat_addr)
3157 self.assertEqual(ip.dst, server.ip4)
3158 self.assertNotEqual(tcp.sport, host_in_port)
3159 self.assertEqual(tcp.dport, server_in_port)
3160 self.check_tcp_checksum(p)
3161 host_out_port = tcp.sport
3163 self.logger.error(ppp("Unexpected or invalid packet:", p))
3166 # send reply from server to host
3167 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3168 IP(src=server.ip4, dst=self.nat_addr) /
3169 TCP(sport=server_in_port, dport=host_out_port))
3170 self.pg0.add_stream(p)
3171 self.pg_enable_capture(self.pg_interfaces)
3173 capture = self.pg0.get_capture(1)
3178 self.assertEqual(ip.src, self.nat_addr)
3179 self.assertEqual(ip.dst, host.ip4)
3180 self.assertEqual(tcp.sport, server_out_port)
3181 self.assertEqual(tcp.dport, host_in_port)
3182 self.check_tcp_checksum(p)
3184 self.logger.error(ppp("Unexpected or invalid packet:", p))
3187 def test_one_armed_nat44(self):
3188 """ One armed NAT44 """
3189 remote_host = self.pg9.remote_hosts[0]
3190 local_host = self.pg9.remote_hosts[1]
3193 self.nat44_add_address(self.nat_addr)
3194 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3195 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3199 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3200 IP(src=local_host.ip4, dst=remote_host.ip4) /
3201 TCP(sport=12345, dport=80))
3202 self.pg9.add_stream(p)
3203 self.pg_enable_capture(self.pg_interfaces)
3205 capture = self.pg9.get_capture(1)
3210 self.assertEqual(ip.src, self.nat_addr)
3211 self.assertEqual(ip.dst, remote_host.ip4)
3212 self.assertNotEqual(tcp.sport, 12345)
3213 external_port = tcp.sport
3214 self.assertEqual(tcp.dport, 80)
3215 self.check_tcp_checksum(p)
3216 self.check_ip_checksum(p)
3218 self.logger.error(ppp("Unexpected or invalid packet:", p))
3222 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3223 IP(src=remote_host.ip4, dst=self.nat_addr) /
3224 TCP(sport=80, dport=external_port))
3225 self.pg9.add_stream(p)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 capture = self.pg9.get_capture(1)
3233 self.assertEqual(ip.src, remote_host.ip4)
3234 self.assertEqual(ip.dst, local_host.ip4)
3235 self.assertEqual(tcp.sport, 80)
3236 self.assertEqual(tcp.dport, 12345)
3237 self.check_tcp_checksum(p)
3238 self.check_ip_checksum(p)
3240 self.logger.error(ppp("Unexpected or invalid packet:", p))
3243 def test_del_session(self):
3244 """ Delete NAT44 session """
3245 self.nat44_add_address(self.nat_addr)
3246 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3247 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3250 pkts = self.create_stream_in(self.pg0, self.pg1)
3251 self.pg0.add_stream(pkts)
3252 self.pg_enable_capture(self.pg_interfaces)
3254 capture = self.pg1.get_capture(len(pkts))
3256 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3257 nsessions = len(sessions)
3259 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3260 sessions[0].inside_port,
3261 sessions[0].protocol)
3262 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3263 sessions[1].outside_port,
3264 sessions[1].protocol,
3267 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3268 self.assertEqual(nsessions - len(sessions), 2)
3270 def test_set_get_reass(self):
3271 """ NAT44 set/get virtual fragmentation reassembly """
3272 reas_cfg1 = self.vapi.nat_get_reass()
3274 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3275 max_reass=reas_cfg1.ip4_max_reass * 2,
3276 max_frag=reas_cfg1.ip4_max_frag * 2)
3278 reas_cfg2 = self.vapi.nat_get_reass()
3280 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3281 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3282 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3284 self.vapi.nat_set_reass(drop_frag=1)
3285 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3287 def test_frag_in_order(self):
3288 """ NAT44 translate fragments arriving in order """
3289 self.nat44_add_address(self.nat_addr)
3290 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3291 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3294 data = "A" * 4 + "B" * 16 + "C" * 3
3295 self.tcp_port_in = random.randint(1025, 65535)
3297 reass = self.vapi.nat_reass_dump()
3298 reass_n_start = len(reass)
3301 pkts = self.create_stream_frag(self.pg0,
3302 self.pg1.remote_ip4,
3306 self.pg0.add_stream(pkts)
3307 self.pg_enable_capture(self.pg_interfaces)
3309 frags = self.pg1.get_capture(len(pkts))
3310 p = self.reass_frags_and_verify(frags,
3312 self.pg1.remote_ip4)
3313 self.assertEqual(p[TCP].dport, 20)
3314 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3315 self.tcp_port_out = p[TCP].sport
3316 self.assertEqual(data, p[Raw].load)
3319 pkts = self.create_stream_frag(self.pg1,
3324 self.pg1.add_stream(pkts)
3325 self.pg_enable_capture(self.pg_interfaces)
3327 frags = self.pg0.get_capture(len(pkts))
3328 p = self.reass_frags_and_verify(frags,
3329 self.pg1.remote_ip4,
3330 self.pg0.remote_ip4)
3331 self.assertEqual(p[TCP].sport, 20)
3332 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3333 self.assertEqual(data, p[Raw].load)
3335 reass = self.vapi.nat_reass_dump()
3336 reass_n_end = len(reass)
3338 self.assertEqual(reass_n_end - reass_n_start, 2)
3340 def test_reass_hairpinning(self):
3341 """ NAT44 fragments hairpinning """
3342 host = self.pg0.remote_hosts[0]
3343 server = self.pg0.remote_hosts[1]
3344 host_in_port = random.randint(1025, 65535)
3346 server_in_port = random.randint(1025, 65535)
3347 server_out_port = random.randint(1025, 65535)
3348 data = "A" * 4 + "B" * 16 + "C" * 3
3350 self.nat44_add_address(self.nat_addr)
3351 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3352 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3354 # add static mapping for server
3355 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3356 server_in_port, server_out_port,
3357 proto=IP_PROTOS.tcp)
3359 # send packet from host to server
3360 pkts = self.create_stream_frag(self.pg0,
3365 self.pg0.add_stream(pkts)
3366 self.pg_enable_capture(self.pg_interfaces)
3368 frags = self.pg0.get_capture(len(pkts))
3369 p = self.reass_frags_and_verify(frags,
3372 self.assertNotEqual(p[TCP].sport, host_in_port)
3373 self.assertEqual(p[TCP].dport, server_in_port)
3374 self.assertEqual(data, p[Raw].load)
3376 def test_frag_out_of_order(self):
3377 """ NAT44 translate fragments arriving out of order """
3378 self.nat44_add_address(self.nat_addr)
3379 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3380 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3383 data = "A" * 4 + "B" * 16 + "C" * 3
3384 random.randint(1025, 65535)
3387 pkts = self.create_stream_frag(self.pg0,
3388 self.pg1.remote_ip4,
3393 self.pg0.add_stream(pkts)
3394 self.pg_enable_capture(self.pg_interfaces)
3396 frags = self.pg1.get_capture(len(pkts))
3397 p = self.reass_frags_and_verify(frags,
3399 self.pg1.remote_ip4)
3400 self.assertEqual(p[TCP].dport, 20)
3401 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3402 self.tcp_port_out = p[TCP].sport
3403 self.assertEqual(data, p[Raw].load)
3406 pkts = self.create_stream_frag(self.pg1,
3412 self.pg1.add_stream(pkts)
3413 self.pg_enable_capture(self.pg_interfaces)
3415 frags = self.pg0.get_capture(len(pkts))
3416 p = self.reass_frags_and_verify(frags,
3417 self.pg1.remote_ip4,
3418 self.pg0.remote_ip4)
3419 self.assertEqual(p[TCP].sport, 20)
3420 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3421 self.assertEqual(data, p[Raw].load)
3423 def test_port_restricted(self):
3424 """ Port restricted NAT44 (MAP-E CE) """
3425 self.nat44_add_address(self.nat_addr)
3426 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3427 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3429 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3430 "psid-offset 6 psid-len 6")
3432 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3433 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3434 TCP(sport=4567, dport=22))
3435 self.pg0.add_stream(p)
3436 self.pg_enable_capture(self.pg_interfaces)
3438 capture = self.pg1.get_capture(1)
3443 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3444 self.assertEqual(ip.src, self.nat_addr)
3445 self.assertEqual(tcp.dport, 22)
3446 self.assertNotEqual(tcp.sport, 4567)
3447 self.assertEqual((tcp.sport >> 6) & 63, 10)
3448 self.check_tcp_checksum(p)
3449 self.check_ip_checksum(p)
3451 self.logger.error(ppp("Unexpected or invalid packet:", p))
3454 def test_twice_nat(self):
3456 twice_nat_addr = '10.0.1.3'
3461 self.nat44_add_address(self.nat_addr)
3462 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3463 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3464 port_in, port_out, proto=IP_PROTOS.tcp,
3466 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3467 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3470 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3471 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3472 TCP(sport=eh_port_out, dport=port_out))
3473 self.pg1.add_stream(p)
3474 self.pg_enable_capture(self.pg_interfaces)
3476 capture = self.pg0.get_capture(1)
3481 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3482 self.assertEqual(ip.src, twice_nat_addr)
3483 self.assertEqual(tcp.dport, port_in)
3484 self.assertNotEqual(tcp.sport, eh_port_out)
3485 eh_port_in = tcp.sport
3486 self.check_tcp_checksum(p)
3487 self.check_ip_checksum(p)
3489 self.logger.error(ppp("Unexpected or invalid packet:", p))
3492 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3493 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3494 TCP(sport=port_in, dport=eh_port_in))
3495 self.pg0.add_stream(p)
3496 self.pg_enable_capture(self.pg_interfaces)
3498 capture = self.pg1.get_capture(1)
3503 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3504 self.assertEqual(ip.src, self.nat_addr)
3505 self.assertEqual(tcp.dport, eh_port_out)
3506 self.assertEqual(tcp.sport, port_out)
3507 self.check_tcp_checksum(p)
3508 self.check_ip_checksum(p)
3510 self.logger.error(ppp("Unexpected or invalid packet:", p))
3513 def test_twice_nat_lb(self):
3514 """ Twice NAT44 local service load balancing """
3515 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3516 twice_nat_addr = '10.0.1.3'
3521 server1 = self.pg0.remote_hosts[0]
3522 server2 = self.pg0.remote_hosts[1]
3524 locals = [{'addr': server1.ip4n,
3527 {'addr': server2.ip4n,
3531 self.nat44_add_address(self.nat_addr)
3532 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3534 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3538 local_num=len(locals),
3540 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3541 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3544 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3545 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3546 TCP(sport=eh_port_out, dport=external_port))
3547 self.pg1.add_stream(p)
3548 self.pg_enable_capture(self.pg_interfaces)
3550 capture = self.pg0.get_capture(1)
3556 self.assertEqual(ip.src, twice_nat_addr)
3557 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3558 if ip.dst == server1.ip4:
3562 self.assertNotEqual(tcp.sport, eh_port_out)
3563 eh_port_in = tcp.sport
3564 self.assertEqual(tcp.dport, local_port)
3565 self.check_tcp_checksum(p)
3566 self.check_ip_checksum(p)
3568 self.logger.error(ppp("Unexpected or invalid packet:", p))
3571 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3572 IP(src=server.ip4, dst=twice_nat_addr) /
3573 TCP(sport=local_port, dport=eh_port_in))
3574 self.pg0.add_stream(p)
3575 self.pg_enable_capture(self.pg_interfaces)
3577 capture = self.pg1.get_capture(1)
3582 self.assertEqual(ip.src, self.nat_addr)
3583 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3584 self.assertEqual(tcp.sport, external_port)
3585 self.assertEqual(tcp.dport, eh_port_out)
3586 self.check_tcp_checksum(p)
3587 self.check_ip_checksum(p)
3589 self.logger.error(ppp("Unexpected or invalid packet:", p))
3592 def test_twice_nat_interface_addr(self):
3593 """ Acquire twice NAT44 addresses from interface """
3594 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3596 # no address in NAT pool
3597 adresses = self.vapi.nat44_address_dump()
3598 self.assertEqual(0, len(adresses))
3600 # configure interface address and check NAT address pool
3601 self.pg7.config_ip4()
3602 adresses = self.vapi.nat44_address_dump()
3603 self.assertEqual(1, len(adresses))
3604 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3605 self.assertEqual(adresses[0].twice_nat, 1)
3607 # remove interface address and check NAT address pool
3608 self.pg7.unconfig_ip4()
3609 adresses = self.vapi.nat44_address_dump()
3610 self.assertEqual(0, len(adresses))
3612 def test_ipfix_max_frags(self):
3613 """ IPFIX logging maximum fragments pending reassembly exceeded """
3614 self.nat44_add_address(self.nat_addr)
3615 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3616 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3618 self.vapi.nat_set_reass(max_frag=0)
3619 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3620 src_address=self.pg3.local_ip4n,
3622 template_interval=10)
3623 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3624 src_port=self.ipfix_src_port)
3626 data = "A" * 4 + "B" * 16 + "C" * 3
3627 self.tcp_port_in = random.randint(1025, 65535)
3628 pkts = self.create_stream_frag(self.pg0,
3629 self.pg1.remote_ip4,
3633 self.pg0.add_stream(pkts[-1])
3634 self.pg_enable_capture(self.pg_interfaces)
3636 frags = self.pg1.get_capture(0)
3637 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3638 capture = self.pg3.get_capture(9)
3639 ipfix = IPFIXDecoder()
3640 # first load template
3642 self.assertTrue(p.haslayer(IPFIX))
3643 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3644 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3645 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3646 self.assertEqual(p[UDP].dport, 4739)
3647 self.assertEqual(p[IPFIX].observationDomainID,
3648 self.ipfix_domain_id)
3649 if p.haslayer(Template):
3650 ipfix.add_template(p.getlayer(Template))
3651 # verify events in data set
3653 if p.haslayer(Data):
3654 data = ipfix.decode_data_set(p.getlayer(Set))
3655 self.verify_ipfix_max_fragments_ip4(data, 0,
3656 self.pg0.remote_ip4n)
3659 super(TestNAT44, self).tearDown()
3660 if not self.vpp_dead:
3661 self.logger.info(self.vapi.cli("show nat44 verbose"))
3662 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3663 self.vapi.cli("nat addr-port-assignment-alg default")
3667 class TestNAT44Out2InDPO(MethodHolder):
3668 """ NAT44 Test Cases using out2in DPO """
3671 def setUpConstants(cls):
3672 super(TestNAT44Out2InDPO, cls).setUpConstants()
3673 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3676 def setUpClass(cls):
3677 super(TestNAT44Out2InDPO, cls).setUpClass()
3680 cls.tcp_port_in = 6303
3681 cls.tcp_port_out = 6303
3682 cls.udp_port_in = 6304
3683 cls.udp_port_out = 6304
3684 cls.icmp_id_in = 6305
3685 cls.icmp_id_out = 6305
3686 cls.nat_addr = '10.0.0.3'
3687 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3688 cls.dst_ip4 = '192.168.70.1'
3690 cls.create_pg_interfaces(range(2))
3693 cls.pg0.config_ip4()
3694 cls.pg0.resolve_arp()
3697 cls.pg1.config_ip6()
3698 cls.pg1.resolve_ndp()
3700 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3701 dst_address_length=0,
3702 next_hop_address=cls.pg1.remote_ip6n,
3703 next_hop_sw_if_index=cls.pg1.sw_if_index)
3706 super(TestNAT44Out2InDPO, cls).tearDownClass()
3709 def configure_xlat(self):
3710 self.dst_ip6_pfx = '1:2:3::'
3711 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3713 self.dst_ip6_pfx_len = 96
3714 self.src_ip6_pfx = '4:5:6::'
3715 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3717 self.src_ip6_pfx_len = 96
3718 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3719 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3720 '\x00\x00\x00\x00', 0, is_translation=1,
3723 def test_464xlat_ce(self):
3724 """ Test 464XLAT CE with NAT44 """
3726 self.configure_xlat()
3728 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3729 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3731 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3732 self.dst_ip6_pfx_len)
3733 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3734 self.src_ip6_pfx_len)
3737 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3738 self.pg0.add_stream(pkts)
3739 self.pg_enable_capture(self.pg_interfaces)
3741 capture = self.pg1.get_capture(len(pkts))
3742 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3745 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3747 self.pg1.add_stream(pkts)
3748 self.pg_enable_capture(self.pg_interfaces)
3750 capture = self.pg0.get_capture(len(pkts))
3751 self.verify_capture_in(capture, self.pg0)
3753 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3755 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3756 self.nat_addr_n, is_add=0)
3758 def test_464xlat_ce_no_nat(self):
3759 """ Test 464XLAT CE without NAT44 """
3761 self.configure_xlat()
3763 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3764 self.dst_ip6_pfx_len)
3765 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3766 self.src_ip6_pfx_len)
3768 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3769 self.pg0.add_stream(pkts)
3770 self.pg_enable_capture(self.pg_interfaces)
3772 capture = self.pg1.get_capture(len(pkts))
3773 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3774 nat_ip=out_dst_ip6, same_port=True)
3776 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3777 self.pg1.add_stream(pkts)
3778 self.pg_enable_capture(self.pg_interfaces)
3780 capture = self.pg0.get_capture(len(pkts))
3781 self.verify_capture_in(capture, self.pg0)
3784 class TestDeterministicNAT(MethodHolder):
3785 """ Deterministic NAT Test Cases """
3788 def setUpConstants(cls):
3789 super(TestDeterministicNAT, cls).setUpConstants()
3790 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3793 def setUpClass(cls):
3794 super(TestDeterministicNAT, cls).setUpClass()
3797 cls.tcp_port_in = 6303
3798 cls.tcp_external_port = 6303
3799 cls.udp_port_in = 6304
3800 cls.udp_external_port = 6304
3801 cls.icmp_id_in = 6305
3802 cls.nat_addr = '10.0.0.3'
3804 cls.create_pg_interfaces(range(3))
3805 cls.interfaces = list(cls.pg_interfaces)
3807 for i in cls.interfaces:
3812 cls.pg0.generate_remote_hosts(2)
3813 cls.pg0.configure_ipv4_neighbors()
3816 super(TestDeterministicNAT, cls).tearDownClass()
3819 def create_stream_in(self, in_if, out_if, ttl=64):
3821 Create packet stream for inside network
3823 :param in_if: Inside interface
3824 :param out_if: Outside interface
3825 :param ttl: TTL of generated packets
3829 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3830 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3831 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3835 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3836 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3837 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3841 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3842 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3843 ICMP(id=self.icmp_id_in, type='echo-request'))
3848 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3850 Create packet stream for outside network
3852 :param out_if: Outside interface
3853 :param dst_ip: Destination IP address (Default use global NAT address)
3854 :param ttl: TTL of generated packets
3857 dst_ip = self.nat_addr
3860 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3861 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3862 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3866 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3867 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3868 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3872 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3873 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3874 ICMP(id=self.icmp_external_id, type='echo-reply'))
3879 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3881 Verify captured packets on outside network
3883 :param capture: Captured packets
3884 :param nat_ip: Translated IP address (Default use global NAT address)
3885 :param same_port: Sorce port number is not translated (Default False)
3886 :param packet_num: Expected number of packets (Default 3)
3889 nat_ip = self.nat_addr
3890 self.assertEqual(packet_num, len(capture))
3891 for packet in capture:
3893 self.assertEqual(packet[IP].src, nat_ip)
3894 if packet.haslayer(TCP):
3895 self.tcp_port_out = packet[TCP].sport
3896 elif packet.haslayer(UDP):
3897 self.udp_port_out = packet[UDP].sport
3899 self.icmp_external_id = packet[ICMP].id
3901 self.logger.error(ppp("Unexpected or invalid packet "
3902 "(outside network):", packet))
3905 def initiate_tcp_session(self, in_if, out_if):
3907 Initiates TCP session
3909 :param in_if: Inside interface
3910 :param out_if: Outside interface
3913 # SYN packet in->out
3914 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3915 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3916 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3919 self.pg_enable_capture(self.pg_interfaces)
3921 capture = out_if.get_capture(1)
3923 self.tcp_port_out = p[TCP].sport
3925 # SYN + ACK packet out->in
3926 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3927 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3928 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3930 out_if.add_stream(p)
3931 self.pg_enable_capture(self.pg_interfaces)
3933 in_if.get_capture(1)
3935 # ACK packet in->out
3936 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3937 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3938 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3941 self.pg_enable_capture(self.pg_interfaces)
3943 out_if.get_capture(1)
3946 self.logger.error("TCP 3 way handshake failed")
3949 def verify_ipfix_max_entries_per_user(self, data):
3951 Verify IPFIX maximum entries per user exceeded event
3953 :param data: Decoded IPFIX data records
3955 self.assertEqual(1, len(data))
3958 self.assertEqual(ord(record[230]), 13)
3959 # natQuotaExceededEvent
3960 self.assertEqual('\x03\x00\x00\x00', record[466])
3962 self.assertEqual('\xe8\x03\x00\x00', record[473])
3964 self.assertEqual(self.pg0.remote_ip4n, record[8])
3966 def test_deterministic_mode(self):
3967 """ NAT plugin run deterministic mode """
3968 in_addr = '172.16.255.0'
3969 out_addr = '172.17.255.50'
3970 in_addr_t = '172.16.255.20'
3971 in_addr_n = socket.inet_aton(in_addr)
3972 out_addr_n = socket.inet_aton(out_addr)
3973 in_addr_t_n = socket.inet_aton(in_addr_t)
3977 nat_config = self.vapi.nat_show_config()
3978 self.assertEqual(1, nat_config.deterministic)
3980 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3982 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3983 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3984 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3985 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3987 deterministic_mappings = self.vapi.nat_det_map_dump()
3988 self.assertEqual(len(deterministic_mappings), 1)
3989 dsm = deterministic_mappings[0]
3990 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3991 self.assertEqual(in_plen, dsm.in_plen)
3992 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3993 self.assertEqual(out_plen, dsm.out_plen)
3995 self.clear_nat_det()
3996 deterministic_mappings = self.vapi.nat_det_map_dump()
3997 self.assertEqual(len(deterministic_mappings), 0)
3999 def test_set_timeouts(self):
4000 """ Set deterministic NAT timeouts """
4001 timeouts_before = self.vapi.nat_det_get_timeouts()
4003 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4004 timeouts_before.tcp_established + 10,
4005 timeouts_before.tcp_transitory + 10,
4006 timeouts_before.icmp + 10)
4008 timeouts_after = self.vapi.nat_det_get_timeouts()
4010 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4011 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4012 self.assertNotEqual(timeouts_before.tcp_established,
4013 timeouts_after.tcp_established)
4014 self.assertNotEqual(timeouts_before.tcp_transitory,
4015 timeouts_after.tcp_transitory)
4017 def test_det_in(self):
4018 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4020 nat_ip = "10.0.0.10"
4022 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4024 socket.inet_aton(nat_ip),
4026 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4027 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4031 pkts = self.create_stream_in(self.pg0, self.pg1)
4032 self.pg0.add_stream(pkts)
4033 self.pg_enable_capture(self.pg_interfaces)
4035 capture = self.pg1.get_capture(len(pkts))
4036 self.verify_capture_out(capture, nat_ip)
4039 pkts = self.create_stream_out(self.pg1, nat_ip)
4040 self.pg1.add_stream(pkts)
4041 self.pg_enable_capture(self.pg_interfaces)
4043 capture = self.pg0.get_capture(len(pkts))
4044 self.verify_capture_in(capture, self.pg0)
4047 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4048 self.assertEqual(len(sessions), 3)
4052 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4053 self.assertEqual(s.in_port, self.tcp_port_in)
4054 self.assertEqual(s.out_port, self.tcp_port_out)
4055 self.assertEqual(s.ext_port, self.tcp_external_port)
4059 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4060 self.assertEqual(s.in_port, self.udp_port_in)
4061 self.assertEqual(s.out_port, self.udp_port_out)
4062 self.assertEqual(s.ext_port, self.udp_external_port)
4066 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4067 self.assertEqual(s.in_port, self.icmp_id_in)
4068 self.assertEqual(s.out_port, self.icmp_external_id)
4070 def test_multiple_users(self):
4071 """ Deterministic NAT multiple users """
4073 nat_ip = "10.0.0.10"
4075 external_port = 6303
4077 host0 = self.pg0.remote_hosts[0]
4078 host1 = self.pg0.remote_hosts[1]
4080 self.vapi.nat_det_add_del_map(host0.ip4n,
4082 socket.inet_aton(nat_ip),
4084 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4085 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4089 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4090 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4091 TCP(sport=port_in, dport=external_port))
4092 self.pg0.add_stream(p)
4093 self.pg_enable_capture(self.pg_interfaces)
4095 capture = self.pg1.get_capture(1)
4100 self.assertEqual(ip.src, nat_ip)
4101 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4102 self.assertEqual(tcp.dport, external_port)
4103 port_out0 = tcp.sport
4105 self.logger.error(ppp("Unexpected or invalid packet:", p))
4109 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4110 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4111 TCP(sport=port_in, dport=external_port))
4112 self.pg0.add_stream(p)
4113 self.pg_enable_capture(self.pg_interfaces)
4115 capture = self.pg1.get_capture(1)
4120 self.assertEqual(ip.src, nat_ip)
4121 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4122 self.assertEqual(tcp.dport, external_port)
4123 port_out1 = tcp.sport
4125 self.logger.error(ppp("Unexpected or invalid packet:", p))
4128 dms = self.vapi.nat_det_map_dump()
4129 self.assertEqual(1, len(dms))
4130 self.assertEqual(2, dms[0].ses_num)
4133 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4134 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4135 TCP(sport=external_port, dport=port_out0))
4136 self.pg1.add_stream(p)
4137 self.pg_enable_capture(self.pg_interfaces)
4139 capture = self.pg0.get_capture(1)
4144 self.assertEqual(ip.src, self.pg1.remote_ip4)
4145 self.assertEqual(ip.dst, host0.ip4)
4146 self.assertEqual(tcp.dport, port_in)
4147 self.assertEqual(tcp.sport, external_port)
4149 self.logger.error(ppp("Unexpected or invalid packet:", p))
4153 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4154 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4155 TCP(sport=external_port, dport=port_out1))
4156 self.pg1.add_stream(p)
4157 self.pg_enable_capture(self.pg_interfaces)
4159 capture = self.pg0.get_capture(1)
4164 self.assertEqual(ip.src, self.pg1.remote_ip4)
4165 self.assertEqual(ip.dst, host1.ip4)
4166 self.assertEqual(tcp.dport, port_in)
4167 self.assertEqual(tcp.sport, external_port)
4169 self.logger.error(ppp("Unexpected or invalid packet", p))
4172 # session close api test
4173 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4175 self.pg1.remote_ip4n,
4177 dms = self.vapi.nat_det_map_dump()
4178 self.assertEqual(dms[0].ses_num, 1)
4180 self.vapi.nat_det_close_session_in(host0.ip4n,
4182 self.pg1.remote_ip4n,
4184 dms = self.vapi.nat_det_map_dump()
4185 self.assertEqual(dms[0].ses_num, 0)
4187 def test_tcp_session_close_detection_in(self):
4188 """ Deterministic NAT TCP session close from inside network """
4189 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4191 socket.inet_aton(self.nat_addr),
4193 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4194 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4197 self.initiate_tcp_session(self.pg0, self.pg1)
4199 # close the session from inside
4201 # FIN packet in -> out
4202 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4203 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4204 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4206 self.pg0.add_stream(p)
4207 self.pg_enable_capture(self.pg_interfaces)
4209 self.pg1.get_capture(1)
4213 # ACK packet out -> in
4214 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4215 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4216 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4220 # FIN packet out -> in
4221 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4222 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4223 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4227 self.pg1.add_stream(pkts)
4228 self.pg_enable_capture(self.pg_interfaces)
4230 self.pg0.get_capture(2)
4232 # ACK packet in -> out
4233 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4234 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4235 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4237 self.pg0.add_stream(p)
4238 self.pg_enable_capture(self.pg_interfaces)
4240 self.pg1.get_capture(1)
4242 # Check if deterministic NAT44 closed the session
4243 dms = self.vapi.nat_det_map_dump()
4244 self.assertEqual(0, dms[0].ses_num)
4246 self.logger.error("TCP session termination failed")
4249 def test_tcp_session_close_detection_out(self):
4250 """ Deterministic NAT TCP session close from outside network """
4251 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4253 socket.inet_aton(self.nat_addr),
4255 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4256 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4259 self.initiate_tcp_session(self.pg0, self.pg1)
4261 # close the session from outside
4263 # FIN packet out -> in
4264 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4265 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4266 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4268 self.pg1.add_stream(p)
4269 self.pg_enable_capture(self.pg_interfaces)
4271 self.pg0.get_capture(1)
4275 # ACK packet in -> out
4276 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4277 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4278 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4282 # ACK packet in -> out
4283 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4284 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4285 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4289 self.pg0.add_stream(pkts)
4290 self.pg_enable_capture(self.pg_interfaces)
4292 self.pg1.get_capture(2)
4294 # ACK packet out -> in
4295 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4296 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4297 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4299 self.pg1.add_stream(p)
4300 self.pg_enable_capture(self.pg_interfaces)
4302 self.pg0.get_capture(1)
4304 # Check if deterministic NAT44 closed the session
4305 dms = self.vapi.nat_det_map_dump()
4306 self.assertEqual(0, dms[0].ses_num)
4308 self.logger.error("TCP session termination failed")
4311 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4312 def test_session_timeout(self):
4313 """ Deterministic NAT session timeouts """
4314 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4316 socket.inet_aton(self.nat_addr),
4318 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4319 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4322 self.initiate_tcp_session(self.pg0, self.pg1)
4323 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4324 pkts = self.create_stream_in(self.pg0, self.pg1)
4325 self.pg0.add_stream(pkts)
4326 self.pg_enable_capture(self.pg_interfaces)
4328 capture = self.pg1.get_capture(len(pkts))
4331 dms = self.vapi.nat_det_map_dump()
4332 self.assertEqual(0, dms[0].ses_num)
4334 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4335 def test_session_limit_per_user(self):
4336 """ Deterministic NAT maximum sessions per user limit """
4337 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4339 socket.inet_aton(self.nat_addr),
4341 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4342 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4344 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4345 src_address=self.pg2.local_ip4n,
4347 template_interval=10)
4348 self.vapi.nat_ipfix()
4351 for port in range(1025, 2025):
4352 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4353 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4354 UDP(sport=port, dport=port))
4357 self.pg0.add_stream(pkts)
4358 self.pg_enable_capture(self.pg_interfaces)
4360 capture = self.pg1.get_capture(len(pkts))
4362 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4363 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4364 UDP(sport=3001, dport=3002))
4365 self.pg0.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 capture = self.pg1.assert_nothing_captured()
4370 # verify ICMP error packet
4371 capture = self.pg0.get_capture(1)
4373 self.assertTrue(p.haslayer(ICMP))
4375 self.assertEqual(icmp.type, 3)
4376 self.assertEqual(icmp.code, 1)
4377 self.assertTrue(icmp.haslayer(IPerror))
4378 inner_ip = icmp[IPerror]
4379 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4380 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4382 dms = self.vapi.nat_det_map_dump()
4384 self.assertEqual(1000, dms[0].ses_num)
4386 # verify IPFIX logging
4387 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4389 capture = self.pg2.get_capture(2)
4390 ipfix = IPFIXDecoder()
4391 # first load template
4393 self.assertTrue(p.haslayer(IPFIX))
4394 if p.haslayer(Template):
4395 ipfix.add_template(p.getlayer(Template))
4396 # verify events in data set
4398 if p.haslayer(Data):
4399 data = ipfix.decode_data_set(p.getlayer(Set))
4400 self.verify_ipfix_max_entries_per_user(data)
4402 def clear_nat_det(self):
4404 Clear deterministic NAT configuration.
4406 self.vapi.nat_ipfix(enable=0)
4407 self.vapi.nat_det_set_timeouts()
4408 deterministic_mappings = self.vapi.nat_det_map_dump()
4409 for dsm in deterministic_mappings:
4410 self.vapi.nat_det_add_del_map(dsm.in_addr,
4416 interfaces = self.vapi.nat44_interface_dump()
4417 for intf in interfaces:
4418 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4423 super(TestDeterministicNAT, self).tearDown()
4424 if not self.vpp_dead:
4425 self.logger.info(self.vapi.cli("show nat44 detail"))
4426 self.clear_nat_det()
4429 class TestNAT64(MethodHolder):
4430 """ NAT64 Test Cases """
4433 def setUpConstants(cls):
4434 super(TestNAT64, cls).setUpConstants()
4435 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4436 "nat64 st hash buckets 256", "}"])
4439 def setUpClass(cls):
4440 super(TestNAT64, cls).setUpClass()
4443 cls.tcp_port_in = 6303
4444 cls.tcp_port_out = 6303
4445 cls.udp_port_in = 6304
4446 cls.udp_port_out = 6304
4447 cls.icmp_id_in = 6305
4448 cls.icmp_id_out = 6305
4449 cls.nat_addr = '10.0.0.3'
4450 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4452 cls.vrf1_nat_addr = '10.0.10.3'
4453 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4455 cls.ipfix_src_port = 4739
4456 cls.ipfix_domain_id = 1
4458 cls.create_pg_interfaces(range(5))
4459 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4460 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4461 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4463 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4465 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4467 cls.pg0.generate_remote_hosts(2)
4469 for i in cls.ip6_interfaces:
4472 i.configure_ipv6_neighbors()
4474 for i in cls.ip4_interfaces:
4480 cls.pg3.config_ip4()
4481 cls.pg3.resolve_arp()
4482 cls.pg3.config_ip6()
4483 cls.pg3.configure_ipv6_neighbors()
4486 super(TestNAT64, cls).tearDownClass()
4489 def test_pool(self):
4490 """ Add/delete address to NAT64 pool """
4491 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4493 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4495 addresses = self.vapi.nat64_pool_addr_dump()
4496 self.assertEqual(len(addresses), 1)
4497 self.assertEqual(addresses[0].address, nat_addr)
4499 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4501 addresses = self.vapi.nat64_pool_addr_dump()
4502 self.assertEqual(len(addresses), 0)
4504 def test_interface(self):
4505 """ Enable/disable NAT64 feature on the interface """
4506 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4507 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4509 interfaces = self.vapi.nat64_interface_dump()
4510 self.assertEqual(len(interfaces), 2)
4513 for intf in interfaces:
4514 if intf.sw_if_index == self.pg0.sw_if_index:
4515 self.assertEqual(intf.is_inside, 1)
4517 elif intf.sw_if_index == self.pg1.sw_if_index:
4518 self.assertEqual(intf.is_inside, 0)
4520 self.assertTrue(pg0_found)
4521 self.assertTrue(pg1_found)
4523 features = self.vapi.cli("show interface features pg0")
4524 self.assertNotEqual(features.find('nat64-in2out'), -1)
4525 features = self.vapi.cli("show interface features pg1")
4526 self.assertNotEqual(features.find('nat64-out2in'), -1)
4528 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4529 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4531 interfaces = self.vapi.nat64_interface_dump()
4532 self.assertEqual(len(interfaces), 0)
4534 def test_static_bib(self):
4535 """ Add/delete static BIB entry """
4536 in_addr = socket.inet_pton(socket.AF_INET6,
4537 '2001:db8:85a3::8a2e:370:7334')
4538 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4541 proto = IP_PROTOS.tcp
4543 self.vapi.nat64_add_del_static_bib(in_addr,
4548 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4553 self.assertEqual(bibe.i_addr, in_addr)
4554 self.assertEqual(bibe.o_addr, out_addr)
4555 self.assertEqual(bibe.i_port, in_port)
4556 self.assertEqual(bibe.o_port, out_port)
4557 self.assertEqual(static_bib_num, 1)
4559 self.vapi.nat64_add_del_static_bib(in_addr,
4565 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4570 self.assertEqual(static_bib_num, 0)
4572 def test_set_timeouts(self):
4573 """ Set NAT64 timeouts """
4574 # verify default values
4575 timeouts = self.vapi.nat64_get_timeouts()
4576 self.assertEqual(timeouts.udp, 300)
4577 self.assertEqual(timeouts.icmp, 60)
4578 self.assertEqual(timeouts.tcp_trans, 240)
4579 self.assertEqual(timeouts.tcp_est, 7440)
4580 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4582 # set and verify custom values
4583 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4584 tcp_est=7450, tcp_incoming_syn=10)
4585 timeouts = self.vapi.nat64_get_timeouts()
4586 self.assertEqual(timeouts.udp, 200)
4587 self.assertEqual(timeouts.icmp, 30)
4588 self.assertEqual(timeouts.tcp_trans, 250)
4589 self.assertEqual(timeouts.tcp_est, 7450)
4590 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4592 def test_dynamic(self):
4593 """ NAT64 dynamic translation test """
4594 self.tcp_port_in = 6303
4595 self.udp_port_in = 6304
4596 self.icmp_id_in = 6305
4598 ses_num_start = self.nat64_get_ses_num()
4600 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4602 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4603 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4606 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4607 self.pg0.add_stream(pkts)
4608 self.pg_enable_capture(self.pg_interfaces)
4610 capture = self.pg1.get_capture(len(pkts))
4611 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4612 dst_ip=self.pg1.remote_ip4)
4615 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4616 self.pg1.add_stream(pkts)
4617 self.pg_enable_capture(self.pg_interfaces)
4619 capture = self.pg0.get_capture(len(pkts))
4620 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4621 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4624 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4625 self.pg0.add_stream(pkts)
4626 self.pg_enable_capture(self.pg_interfaces)
4628 capture = self.pg1.get_capture(len(pkts))
4629 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4630 dst_ip=self.pg1.remote_ip4)
4633 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4634 self.pg1.add_stream(pkts)
4635 self.pg_enable_capture(self.pg_interfaces)
4637 capture = self.pg0.get_capture(len(pkts))
4638 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4640 ses_num_end = self.nat64_get_ses_num()
4642 self.assertEqual(ses_num_end - ses_num_start, 3)
4644 # tenant with specific VRF
4645 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4646 self.vrf1_nat_addr_n,
4647 vrf_id=self.vrf1_id)
4648 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4650 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4651 self.pg2.add_stream(pkts)
4652 self.pg_enable_capture(self.pg_interfaces)
4654 capture = self.pg1.get_capture(len(pkts))
4655 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4656 dst_ip=self.pg1.remote_ip4)
4658 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4659 self.pg1.add_stream(pkts)
4660 self.pg_enable_capture(self.pg_interfaces)
4662 capture = self.pg2.get_capture(len(pkts))
4663 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4665 def test_static(self):
4666 """ NAT64 static translation test """
4667 self.tcp_port_in = 60303
4668 self.udp_port_in = 60304
4669 self.icmp_id_in = 60305
4670 self.tcp_port_out = 60303
4671 self.udp_port_out = 60304
4672 self.icmp_id_out = 60305
4674 ses_num_start = self.nat64_get_ses_num()
4676 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4678 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4679 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4681 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4686 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4691 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4698 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4699 self.pg0.add_stream(pkts)
4700 self.pg_enable_capture(self.pg_interfaces)
4702 capture = self.pg1.get_capture(len(pkts))
4703 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4704 dst_ip=self.pg1.remote_ip4, same_port=True)
4707 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4708 self.pg1.add_stream(pkts)
4709 self.pg_enable_capture(self.pg_interfaces)
4711 capture = self.pg0.get_capture(len(pkts))
4712 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4713 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4715 ses_num_end = self.nat64_get_ses_num()
4717 self.assertEqual(ses_num_end - ses_num_start, 3)
4719 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4720 def test_session_timeout(self):
4721 """ NAT64 session timeout """
4722 self.icmp_id_in = 1234
4723 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4725 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4726 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4727 self.vapi.nat64_set_timeouts(icmp=5)
4729 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4730 self.pg0.add_stream(pkts)
4731 self.pg_enable_capture(self.pg_interfaces)
4733 capture = self.pg1.get_capture(len(pkts))
4735 ses_num_before_timeout = self.nat64_get_ses_num()
4739 # ICMP session after timeout
4740 ses_num_after_timeout = self.nat64_get_ses_num()
4741 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4743 def test_icmp_error(self):
4744 """ NAT64 ICMP Error message translation """
4745 self.tcp_port_in = 6303
4746 self.udp_port_in = 6304
4747 self.icmp_id_in = 6305
4749 ses_num_start = self.nat64_get_ses_num()
4751 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4753 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4754 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4756 # send some packets to create sessions
4757 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4758 self.pg0.add_stream(pkts)
4759 self.pg_enable_capture(self.pg_interfaces)
4761 capture_ip4 = self.pg1.get_capture(len(pkts))
4762 self.verify_capture_out(capture_ip4,
4763 nat_ip=self.nat_addr,
4764 dst_ip=self.pg1.remote_ip4)
4766 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4767 self.pg1.add_stream(pkts)
4768 self.pg_enable_capture(self.pg_interfaces)
4770 capture_ip6 = self.pg0.get_capture(len(pkts))
4771 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4772 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4773 self.pg0.remote_ip6)
4776 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4777 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4778 ICMPv6DestUnreach(code=1) /
4779 packet[IPv6] for packet in capture_ip6]
4780 self.pg0.add_stream(pkts)
4781 self.pg_enable_capture(self.pg_interfaces)
4783 capture = self.pg1.get_capture(len(pkts))
4784 for packet in capture:
4786 self.assertEqual(packet[IP].src, self.nat_addr)
4787 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4788 self.assertEqual(packet[ICMP].type, 3)
4789 self.assertEqual(packet[ICMP].code, 13)
4790 inner = packet[IPerror]
4791 self.assertEqual(inner.src, self.pg1.remote_ip4)
4792 self.assertEqual(inner.dst, self.nat_addr)
4793 self.check_icmp_checksum(packet)
4794 if inner.haslayer(TCPerror):
4795 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4796 elif inner.haslayer(UDPerror):
4797 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4799 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4801 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4805 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4806 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4807 ICMP(type=3, code=13) /
4808 packet[IP] for packet in capture_ip4]
4809 self.pg1.add_stream(pkts)
4810 self.pg_enable_capture(self.pg_interfaces)
4812 capture = self.pg0.get_capture(len(pkts))
4813 for packet in capture:
4815 self.assertEqual(packet[IPv6].src, ip.src)
4816 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4817 icmp = packet[ICMPv6DestUnreach]
4818 self.assertEqual(icmp.code, 1)
4819 inner = icmp[IPerror6]
4820 self.assertEqual(inner.src, self.pg0.remote_ip6)
4821 self.assertEqual(inner.dst, ip.src)
4822 self.check_icmpv6_checksum(packet)
4823 if inner.haslayer(TCPerror):
4824 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4825 elif inner.haslayer(UDPerror):
4826 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4828 self.assertEqual(inner[ICMPv6EchoRequest].id,
4831 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4834 def test_hairpinning(self):
4835 """ NAT64 hairpinning """
4837 client = self.pg0.remote_hosts[0]
4838 server = self.pg0.remote_hosts[1]
4839 server_tcp_in_port = 22
4840 server_tcp_out_port = 4022
4841 server_udp_in_port = 23
4842 server_udp_out_port = 4023
4843 client_tcp_in_port = 1234
4844 client_udp_in_port = 1235
4845 client_tcp_out_port = 0
4846 client_udp_out_port = 0
4847 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4848 nat_addr_ip6 = ip.src
4850 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4852 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4853 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4855 self.vapi.nat64_add_del_static_bib(server.ip6n,
4858 server_tcp_out_port,
4860 self.vapi.nat64_add_del_static_bib(server.ip6n,
4863 server_udp_out_port,
4868 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4869 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4870 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4872 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4873 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4874 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4876 self.pg0.add_stream(pkts)
4877 self.pg_enable_capture(self.pg_interfaces)
4879 capture = self.pg0.get_capture(len(pkts))
4880 for packet in capture:
4882 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4883 self.assertEqual(packet[IPv6].dst, server.ip6)
4884 if packet.haslayer(TCP):
4885 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4886 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4887 self.check_tcp_checksum(packet)
4888 client_tcp_out_port = packet[TCP].sport
4890 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4891 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4892 self.check_udp_checksum(packet)
4893 client_udp_out_port = packet[UDP].sport
4895 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4900 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4901 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4902 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4904 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4905 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4906 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4908 self.pg0.add_stream(pkts)
4909 self.pg_enable_capture(self.pg_interfaces)
4911 capture = self.pg0.get_capture(len(pkts))
4912 for packet in capture:
4914 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4915 self.assertEqual(packet[IPv6].dst, client.ip6)
4916 if packet.haslayer(TCP):
4917 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4918 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4919 self.check_tcp_checksum(packet)
4921 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4922 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4923 self.check_udp_checksum(packet)
4925 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4930 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4931 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4932 ICMPv6DestUnreach(code=1) /
4933 packet[IPv6] for packet in capture]
4934 self.pg0.add_stream(pkts)
4935 self.pg_enable_capture(self.pg_interfaces)
4937 capture = self.pg0.get_capture(len(pkts))
4938 for packet in capture:
4940 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4941 self.assertEqual(packet[IPv6].dst, server.ip6)
4942 icmp = packet[ICMPv6DestUnreach]
4943 self.assertEqual(icmp.code, 1)
4944 inner = icmp[IPerror6]
4945 self.assertEqual(inner.src, server.ip6)
4946 self.assertEqual(inner.dst, nat_addr_ip6)
4947 self.check_icmpv6_checksum(packet)
4948 if inner.haslayer(TCPerror):
4949 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4950 self.assertEqual(inner[TCPerror].dport,
4951 client_tcp_out_port)
4953 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4954 self.assertEqual(inner[UDPerror].dport,
4955 client_udp_out_port)
4957 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4960 def test_prefix(self):
4961 """ NAT64 Network-Specific Prefix """
4963 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4965 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4966 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4967 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4968 self.vrf1_nat_addr_n,
4969 vrf_id=self.vrf1_id)
4970 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4973 global_pref64 = "2001:db8::"
4974 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4975 global_pref64_len = 32
4976 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4978 prefix = self.vapi.nat64_prefix_dump()
4979 self.assertEqual(len(prefix), 1)
4980 self.assertEqual(prefix[0].prefix, global_pref64_n)
4981 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4982 self.assertEqual(prefix[0].vrf_id, 0)
4984 # Add tenant specific prefix
4985 vrf1_pref64 = "2001:db8:122:300::"
4986 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4987 vrf1_pref64_len = 56
4988 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4990 vrf_id=self.vrf1_id)
4991 prefix = self.vapi.nat64_prefix_dump()
4992 self.assertEqual(len(prefix), 2)
4995 pkts = self.create_stream_in_ip6(self.pg0,
4998 plen=global_pref64_len)
4999 self.pg0.add_stream(pkts)
5000 self.pg_enable_capture(self.pg_interfaces)
5002 capture = self.pg1.get_capture(len(pkts))
5003 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5004 dst_ip=self.pg1.remote_ip4)
5006 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5007 self.pg1.add_stream(pkts)
5008 self.pg_enable_capture(self.pg_interfaces)
5010 capture = self.pg0.get_capture(len(pkts))
5011 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5014 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5016 # Tenant specific prefix
5017 pkts = self.create_stream_in_ip6(self.pg2,
5020 plen=vrf1_pref64_len)
5021 self.pg2.add_stream(pkts)
5022 self.pg_enable_capture(self.pg_interfaces)
5024 capture = self.pg1.get_capture(len(pkts))
5025 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5026 dst_ip=self.pg1.remote_ip4)
5028 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5029 self.pg1.add_stream(pkts)
5030 self.pg_enable_capture(self.pg_interfaces)
5032 capture = self.pg2.get_capture(len(pkts))
5033 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5036 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5038 def test_unknown_proto(self):
5039 """ NAT64 translate packet with unknown protocol """
5041 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5043 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5044 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5045 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5048 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5049 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5050 TCP(sport=self.tcp_port_in, dport=20))
5051 self.pg0.add_stream(p)
5052 self.pg_enable_capture(self.pg_interfaces)
5054 p = self.pg1.get_capture(1)
5056 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5057 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5059 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5060 TCP(sport=1234, dport=1234))
5061 self.pg0.add_stream(p)
5062 self.pg_enable_capture(self.pg_interfaces)
5064 p = self.pg1.get_capture(1)
5067 self.assertEqual(packet[IP].src, self.nat_addr)
5068 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5069 self.assertTrue(packet.haslayer(GRE))
5070 self.check_ip_checksum(packet)
5072 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5076 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5077 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5079 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5080 TCP(sport=1234, dport=1234))
5081 self.pg1.add_stream(p)
5082 self.pg_enable_capture(self.pg_interfaces)
5084 p = self.pg0.get_capture(1)
5087 self.assertEqual(packet[IPv6].src, remote_ip6)
5088 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5089 self.assertEqual(packet[IPv6].nh, 47)
5091 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5094 def test_hairpinning_unknown_proto(self):
5095 """ NAT64 translate packet with unknown protocol - hairpinning """
5097 client = self.pg0.remote_hosts[0]
5098 server = self.pg0.remote_hosts[1]
5099 server_tcp_in_port = 22
5100 server_tcp_out_port = 4022
5101 client_tcp_in_port = 1234
5102 client_tcp_out_port = 1235
5103 server_nat_ip = "10.0.0.100"
5104 client_nat_ip = "10.0.0.110"
5105 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5106 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5107 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5108 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5110 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5112 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5113 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5115 self.vapi.nat64_add_del_static_bib(server.ip6n,
5118 server_tcp_out_port,
5121 self.vapi.nat64_add_del_static_bib(server.ip6n,
5127 self.vapi.nat64_add_del_static_bib(client.ip6n,
5130 client_tcp_out_port,
5134 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5135 IPv6(src=client.ip6, dst=server_nat_ip6) /
5136 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5137 self.pg0.add_stream(p)
5138 self.pg_enable_capture(self.pg_interfaces)
5140 p = self.pg0.get_capture(1)
5142 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5143 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5145 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5146 TCP(sport=1234, dport=1234))
5147 self.pg0.add_stream(p)
5148 self.pg_enable_capture(self.pg_interfaces)
5150 p = self.pg0.get_capture(1)
5153 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5154 self.assertEqual(packet[IPv6].dst, server.ip6)
5155 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5157 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5162 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5164 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5165 TCP(sport=1234, dport=1234))
5166 self.pg0.add_stream(p)
5167 self.pg_enable_capture(self.pg_interfaces)
5169 p = self.pg0.get_capture(1)
5172 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5173 self.assertEqual(packet[IPv6].dst, client.ip6)
5174 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5176 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5179 def test_one_armed_nat64(self):
5180 """ One armed NAT64 """
5182 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5186 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5188 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5189 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5192 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5193 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5194 TCP(sport=12345, dport=80))
5195 self.pg3.add_stream(p)
5196 self.pg_enable_capture(self.pg_interfaces)
5198 capture = self.pg3.get_capture(1)
5203 self.assertEqual(ip.src, self.nat_addr)
5204 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5205 self.assertNotEqual(tcp.sport, 12345)
5206 external_port = tcp.sport
5207 self.assertEqual(tcp.dport, 80)
5208 self.check_tcp_checksum(p)
5209 self.check_ip_checksum(p)
5211 self.logger.error(ppp("Unexpected or invalid packet:", p))
5215 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5216 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5217 TCP(sport=80, dport=external_port))
5218 self.pg3.add_stream(p)
5219 self.pg_enable_capture(self.pg_interfaces)
5221 capture = self.pg3.get_capture(1)
5226 self.assertEqual(ip.src, remote_host_ip6)
5227 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5228 self.assertEqual(tcp.sport, 80)
5229 self.assertEqual(tcp.dport, 12345)
5230 self.check_tcp_checksum(p)
5232 self.logger.error(ppp("Unexpected or invalid packet:", p))
5235 def test_frag_in_order(self):
5236 """ NAT64 translate fragments arriving in order """
5237 self.tcp_port_in = random.randint(1025, 65535)
5239 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5241 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5242 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5244 reass = self.vapi.nat_reass_dump()
5245 reass_n_start = len(reass)
5249 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5250 self.tcp_port_in, 20, data)
5251 self.pg0.add_stream(pkts)
5252 self.pg_enable_capture(self.pg_interfaces)
5254 frags = self.pg1.get_capture(len(pkts))
5255 p = self.reass_frags_and_verify(frags,
5257 self.pg1.remote_ip4)
5258 self.assertEqual(p[TCP].dport, 20)
5259 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5260 self.tcp_port_out = p[TCP].sport
5261 self.assertEqual(data, p[Raw].load)
5264 data = "A" * 4 + "b" * 16 + "C" * 3
5265 pkts = self.create_stream_frag(self.pg1,
5270 self.pg1.add_stream(pkts)
5271 self.pg_enable_capture(self.pg_interfaces)
5273 frags = self.pg0.get_capture(len(pkts))
5274 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5275 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5276 self.assertEqual(p[TCP].sport, 20)
5277 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5278 self.assertEqual(data, p[Raw].load)
5280 reass = self.vapi.nat_reass_dump()
5281 reass_n_end = len(reass)
5283 self.assertEqual(reass_n_end - reass_n_start, 2)
5285 def test_reass_hairpinning(self):
5286 """ NAT64 fragments hairpinning """
5288 client = self.pg0.remote_hosts[0]
5289 server = self.pg0.remote_hosts[1]
5290 server_in_port = random.randint(1025, 65535)
5291 server_out_port = random.randint(1025, 65535)
5292 client_in_port = random.randint(1025, 65535)
5293 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5294 nat_addr_ip6 = ip.src
5296 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5298 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5299 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5301 # add static BIB entry for server
5302 self.vapi.nat64_add_del_static_bib(server.ip6n,
5308 # send packet from host to server
5309 pkts = self.create_stream_frag_ip6(self.pg0,
5314 self.pg0.add_stream(pkts)
5315 self.pg_enable_capture(self.pg_interfaces)
5317 frags = self.pg0.get_capture(len(pkts))
5318 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5319 self.assertNotEqual(p[TCP].sport, client_in_port)
5320 self.assertEqual(p[TCP].dport, server_in_port)
5321 self.assertEqual(data, p[Raw].load)
5323 def test_frag_out_of_order(self):
5324 """ NAT64 translate fragments arriving out of order """
5325 self.tcp_port_in = random.randint(1025, 65535)
5327 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5329 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5330 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5334 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5335 self.tcp_port_in, 20, data)
5337 self.pg0.add_stream(pkts)
5338 self.pg_enable_capture(self.pg_interfaces)
5340 frags = self.pg1.get_capture(len(pkts))
5341 p = self.reass_frags_and_verify(frags,
5343 self.pg1.remote_ip4)
5344 self.assertEqual(p[TCP].dport, 20)
5345 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5346 self.tcp_port_out = p[TCP].sport
5347 self.assertEqual(data, p[Raw].load)
5350 data = "A" * 4 + "B" * 16 + "C" * 3
5351 pkts = self.create_stream_frag(self.pg1,
5357 self.pg1.add_stream(pkts)
5358 self.pg_enable_capture(self.pg_interfaces)
5360 frags = self.pg0.get_capture(len(pkts))
5361 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5362 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5363 self.assertEqual(p[TCP].sport, 20)
5364 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5365 self.assertEqual(data, p[Raw].load)
5367 def test_interface_addr(self):
5368 """ Acquire NAT64 pool addresses from interface """
5369 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5371 # no address in NAT64 pool
5372 adresses = self.vapi.nat44_address_dump()
5373 self.assertEqual(0, len(adresses))
5375 # configure interface address and check NAT64 address pool
5376 self.pg4.config_ip4()
5377 addresses = self.vapi.nat64_pool_addr_dump()
5378 self.assertEqual(len(addresses), 1)
5379 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5381 # remove interface address and check NAT64 address pool
5382 self.pg4.unconfig_ip4()
5383 addresses = self.vapi.nat64_pool_addr_dump()
5384 self.assertEqual(0, len(adresses))
5386 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5387 def test_ipfix_max_bibs_sessions(self):
5388 """ IPFIX logging maximum session and BIB entries exceeded """
5391 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5395 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5397 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5398 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5402 for i in range(0, max_bibs):
5403 src = "fd01:aa::%x" % (i)
5404 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5405 IPv6(src=src, dst=remote_host_ip6) /
5406 TCP(sport=12345, dport=80))
5408 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5409 IPv6(src=src, dst=remote_host_ip6) /
5410 TCP(sport=12345, dport=22))
5412 self.pg0.add_stream(pkts)
5413 self.pg_enable_capture(self.pg_interfaces)
5415 self.pg1.get_capture(max_sessions)
5417 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5418 src_address=self.pg3.local_ip4n,
5420 template_interval=10)
5421 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5422 src_port=self.ipfix_src_port)
5424 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5425 IPv6(src=src, dst=remote_host_ip6) /
5426 TCP(sport=12345, dport=25))
5427 self.pg0.add_stream(p)
5428 self.pg_enable_capture(self.pg_interfaces)
5430 self.pg1.get_capture(0)
5431 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5432 capture = self.pg3.get_capture(9)
5433 ipfix = IPFIXDecoder()
5434 # first load template
5436 self.assertTrue(p.haslayer(IPFIX))
5437 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5438 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5439 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5440 self.assertEqual(p[UDP].dport, 4739)
5441 self.assertEqual(p[IPFIX].observationDomainID,
5442 self.ipfix_domain_id)
5443 if p.haslayer(Template):
5444 ipfix.add_template(p.getlayer(Template))
5445 # verify events in data set
5447 if p.haslayer(Data):
5448 data = ipfix.decode_data_set(p.getlayer(Set))
5449 self.verify_ipfix_max_sessions(data, max_sessions)
5451 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5452 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5453 TCP(sport=12345, dport=80))
5454 self.pg0.add_stream(p)
5455 self.pg_enable_capture(self.pg_interfaces)
5457 self.pg1.get_capture(0)
5458 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5459 capture = self.pg3.get_capture(1)
5460 # verify events in data set
5462 self.assertTrue(p.haslayer(IPFIX))
5463 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5464 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5465 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5466 self.assertEqual(p[UDP].dport, 4739)
5467 self.assertEqual(p[IPFIX].observationDomainID,
5468 self.ipfix_domain_id)
5469 if p.haslayer(Data):
5470 data = ipfix.decode_data_set(p.getlayer(Set))
5471 self.verify_ipfix_max_bibs(data, max_bibs)
5473 def test_ipfix_max_frags(self):
5474 """ IPFIX logging maximum fragments pending reassembly exceeded """
5475 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5477 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5478 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5479 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5480 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5481 src_address=self.pg3.local_ip4n,
5483 template_interval=10)
5484 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5485 src_port=self.ipfix_src_port)
5488 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5489 self.tcp_port_in, 20, data)
5490 self.pg0.add_stream(pkts[-1])
5491 self.pg_enable_capture(self.pg_interfaces)
5493 self.pg1.get_capture(0)
5494 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5495 capture = self.pg3.get_capture(9)
5496 ipfix = IPFIXDecoder()
5497 # first load template
5499 self.assertTrue(p.haslayer(IPFIX))
5500 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5501 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5502 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5503 self.assertEqual(p[UDP].dport, 4739)
5504 self.assertEqual(p[IPFIX].observationDomainID,
5505 self.ipfix_domain_id)
5506 if p.haslayer(Template):
5507 ipfix.add_template(p.getlayer(Template))
5508 # verify events in data set
5510 if p.haslayer(Data):
5511 data = ipfix.decode_data_set(p.getlayer(Set))
5512 self.verify_ipfix_max_fragments_ip6(data, 0,
5513 self.pg0.remote_ip6n)
5515 def test_ipfix_bib_ses(self):
5516 """ IPFIX logging NAT64 BIB/session create and delete events """
5517 self.tcp_port_in = random.randint(1025, 65535)
5518 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5522 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5524 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5525 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5526 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5527 src_address=self.pg3.local_ip4n,
5529 template_interval=10)
5530 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5531 src_port=self.ipfix_src_port)
5534 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5535 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5536 TCP(sport=self.tcp_port_in, dport=25))
5537 self.pg0.add_stream(p)
5538 self.pg_enable_capture(self.pg_interfaces)
5540 p = self.pg1.get_capture(1)
5541 self.tcp_port_out = p[0][TCP].sport
5542 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5543 capture = self.pg3.get_capture(10)
5544 ipfix = IPFIXDecoder()
5545 # first load template
5547 self.assertTrue(p.haslayer(IPFIX))
5548 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5549 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5550 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5551 self.assertEqual(p[UDP].dport, 4739)
5552 self.assertEqual(p[IPFIX].observationDomainID,
5553 self.ipfix_domain_id)
5554 if p.haslayer(Template):
5555 ipfix.add_template(p.getlayer(Template))
5556 # verify events in data set
5558 if p.haslayer(Data):
5559 data = ipfix.decode_data_set(p.getlayer(Set))
5560 if ord(data[0][230]) == 10:
5561 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5562 elif ord(data[0][230]) == 6:
5563 self.verify_ipfix_nat64_ses(data,
5565 self.pg0.remote_ip6n,
5566 self.pg1.remote_ip4,
5569 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5572 self.pg_enable_capture(self.pg_interfaces)
5573 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5576 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5577 capture = self.pg3.get_capture(2)
5578 # verify events in data set
5580 self.assertTrue(p.haslayer(IPFIX))
5581 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5582 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5583 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5584 self.assertEqual(p[UDP].dport, 4739)
5585 self.assertEqual(p[IPFIX].observationDomainID,
5586 self.ipfix_domain_id)
5587 if p.haslayer(Data):
5588 data = ipfix.decode_data_set(p.getlayer(Set))
5589 if ord(data[0][230]) == 11:
5590 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5591 elif ord(data[0][230]) == 7:
5592 self.verify_ipfix_nat64_ses(data,
5594 self.pg0.remote_ip6n,
5595 self.pg1.remote_ip4,
5598 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5600 def nat64_get_ses_num(self):
5602 Return number of active NAT64 sessions.
5604 st = self.vapi.nat64_st_dump()
5607 def clear_nat64(self):
5609 Clear NAT64 configuration.
5611 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5612 domain_id=self.ipfix_domain_id)
5613 self.ipfix_src_port = 4739
5614 self.ipfix_domain_id = 1
5616 self.vapi.nat64_set_timeouts()
5618 interfaces = self.vapi.nat64_interface_dump()
5619 for intf in interfaces:
5620 if intf.is_inside > 1:
5621 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5624 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5628 bib = self.vapi.nat64_bib_dump(255)
5631 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5639 adresses = self.vapi.nat64_pool_addr_dump()
5640 for addr in adresses:
5641 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5646 prefixes = self.vapi.nat64_prefix_dump()
5647 for prefix in prefixes:
5648 self.vapi.nat64_add_del_prefix(prefix.prefix,
5650 vrf_id=prefix.vrf_id,
5654 super(TestNAT64, self).tearDown()
5655 if not self.vpp_dead:
5656 self.logger.info(self.vapi.cli("show nat64 pool"))
5657 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5658 self.logger.info(self.vapi.cli("show nat64 prefix"))
5659 self.logger.info(self.vapi.cli("show nat64 bib all"))
5660 self.logger.info(self.vapi.cli("show nat64 session table all"))
5661 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5665 class TestDSlite(MethodHolder):
5666 """ DS-Lite Test Cases """
5669 def setUpClass(cls):
5670 super(TestDSlite, cls).setUpClass()
5673 cls.nat_addr = '10.0.0.3'
5674 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5676 cls.create_pg_interfaces(range(2))
5678 cls.pg0.config_ip4()
5679 cls.pg0.resolve_arp()
5681 cls.pg1.config_ip6()
5682 cls.pg1.generate_remote_hosts(2)
5683 cls.pg1.configure_ipv6_neighbors()
5686 super(TestDSlite, cls).tearDownClass()
5689 def test_dslite(self):
5690 """ Test DS-Lite """
5691 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5693 aftr_ip4 = '192.0.0.1'
5694 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5695 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5696 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5697 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5700 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5701 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5702 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5703 UDP(sport=20000, dport=10000))
5704 self.pg1.add_stream(p)
5705 self.pg_enable_capture(self.pg_interfaces)
5707 capture = self.pg0.get_capture(1)
5708 capture = capture[0]
5709 self.assertFalse(capture.haslayer(IPv6))
5710 self.assertEqual(capture[IP].src, self.nat_addr)
5711 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5712 self.assertNotEqual(capture[UDP].sport, 20000)
5713 self.assertEqual(capture[UDP].dport, 10000)
5714 self.check_ip_checksum(capture)
5715 out_port = capture[UDP].sport
5717 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5718 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5719 UDP(sport=10000, dport=out_port))
5720 self.pg0.add_stream(p)
5721 self.pg_enable_capture(self.pg_interfaces)
5723 capture = self.pg1.get_capture(1)
5724 capture = capture[0]
5725 self.assertEqual(capture[IPv6].src, aftr_ip6)
5726 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5727 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5728 self.assertEqual(capture[IP].dst, '192.168.1.1')
5729 self.assertEqual(capture[UDP].sport, 10000)
5730 self.assertEqual(capture[UDP].dport, 20000)
5731 self.check_ip_checksum(capture)
5734 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5735 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5736 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5737 TCP(sport=20001, dport=10001))
5738 self.pg1.add_stream(p)
5739 self.pg_enable_capture(self.pg_interfaces)
5741 capture = self.pg0.get_capture(1)
5742 capture = capture[0]
5743 self.assertFalse(capture.haslayer(IPv6))
5744 self.assertEqual(capture[IP].src, self.nat_addr)
5745 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5746 self.assertNotEqual(capture[TCP].sport, 20001)
5747 self.assertEqual(capture[TCP].dport, 10001)
5748 self.check_ip_checksum(capture)
5749 self.check_tcp_checksum(capture)
5750 out_port = capture[TCP].sport
5752 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5753 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5754 TCP(sport=10001, dport=out_port))
5755 self.pg0.add_stream(p)
5756 self.pg_enable_capture(self.pg_interfaces)
5758 capture = self.pg1.get_capture(1)
5759 capture = capture[0]
5760 self.assertEqual(capture[IPv6].src, aftr_ip6)
5761 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5762 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5763 self.assertEqual(capture[IP].dst, '192.168.1.1')
5764 self.assertEqual(capture[TCP].sport, 10001)
5765 self.assertEqual(capture[TCP].dport, 20001)
5766 self.check_ip_checksum(capture)
5767 self.check_tcp_checksum(capture)
5770 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5771 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5772 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5773 ICMP(id=4000, type='echo-request'))
5774 self.pg1.add_stream(p)
5775 self.pg_enable_capture(self.pg_interfaces)
5777 capture = self.pg0.get_capture(1)
5778 capture = capture[0]
5779 self.assertFalse(capture.haslayer(IPv6))
5780 self.assertEqual(capture[IP].src, self.nat_addr)
5781 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5782 self.assertNotEqual(capture[ICMP].id, 4000)
5783 self.check_ip_checksum(capture)
5784 self.check_icmp_checksum(capture)
5785 out_id = capture[ICMP].id
5787 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5788 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5789 ICMP(id=out_id, type='echo-reply'))
5790 self.pg0.add_stream(p)
5791 self.pg_enable_capture(self.pg_interfaces)
5793 capture = self.pg1.get_capture(1)
5794 capture = capture[0]
5795 self.assertEqual(capture[IPv6].src, aftr_ip6)
5796 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5797 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5798 self.assertEqual(capture[IP].dst, '192.168.1.1')
5799 self.assertEqual(capture[ICMP].id, 4000)
5800 self.check_ip_checksum(capture)
5801 self.check_icmp_checksum(capture)
5803 # ping DS-Lite AFTR tunnel endpoint address
5804 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5805 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5806 ICMPv6EchoRequest())
5807 self.pg1.add_stream(p)
5808 self.pg_enable_capture(self.pg_interfaces)
5810 capture = self.pg1.get_capture(1)
5811 self.assertEqual(1, len(capture))
5812 capture = capture[0]
5813 self.assertEqual(capture[IPv6].src, aftr_ip6)
5814 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5815 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5818 super(TestDSlite, self).tearDown()
5819 if not self.vpp_dead:
5820 self.logger.info(self.vapi.cli("show dslite pool"))
5822 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5823 self.logger.info(self.vapi.cli("show dslite sessions"))
5826 class TestDSliteCE(MethodHolder):
5827 """ DS-Lite CE Test Cases """
5830 def setUpConstants(cls):
5831 super(TestDSliteCE, cls).setUpConstants()
5832 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5835 def setUpClass(cls):
5836 super(TestDSliteCE, cls).setUpClass()
5839 cls.create_pg_interfaces(range(2))
5841 cls.pg0.config_ip4()
5842 cls.pg0.resolve_arp()
5844 cls.pg1.config_ip6()
5845 cls.pg1.generate_remote_hosts(1)
5846 cls.pg1.configure_ipv6_neighbors()
5849 super(TestDSliteCE, cls).tearDownClass()
5852 def test_dslite_ce(self):
5853 """ Test DS-Lite CE """
5855 b4_ip4 = '192.0.0.2'
5856 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5857 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5858 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5859 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5861 aftr_ip4 = '192.0.0.1'
5862 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5863 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5864 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5865 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5867 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5868 dst_address_length=128,
5869 next_hop_address=self.pg1.remote_ip6n,
5870 next_hop_sw_if_index=self.pg1.sw_if_index,
5874 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5875 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5876 UDP(sport=10000, dport=20000))
5877 self.pg0.add_stream(p)
5878 self.pg_enable_capture(self.pg_interfaces)
5880 capture = self.pg1.get_capture(1)
5881 capture = capture[0]
5882 self.assertEqual(capture[IPv6].src, b4_ip6)
5883 self.assertEqual(capture[IPv6].dst, aftr_ip6)
5884 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5885 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5886 self.assertEqual(capture[UDP].sport, 10000)
5887 self.assertEqual(capture[UDP].dport, 20000)
5888 self.check_ip_checksum(capture)
5891 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5892 IPv6(dst=b4_ip6, src=aftr_ip6) /
5893 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5894 UDP(sport=20000, dport=10000))
5895 self.pg1.add_stream(p)
5896 self.pg_enable_capture(self.pg_interfaces)
5898 capture = self.pg0.get_capture(1)
5899 capture = capture[0]
5900 self.assertFalse(capture.haslayer(IPv6))
5901 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5902 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5903 self.assertEqual(capture[UDP].sport, 20000)
5904 self.assertEqual(capture[UDP].dport, 10000)
5905 self.check_ip_checksum(capture)
5907 # ping DS-Lite B4 tunnel endpoint address
5908 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5909 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5910 ICMPv6EchoRequest())
5911 self.pg1.add_stream(p)
5912 self.pg_enable_capture(self.pg_interfaces)
5914 capture = self.pg1.get_capture(1)
5915 self.assertEqual(1, len(capture))
5916 capture = capture[0]
5917 self.assertEqual(capture[IPv6].src, b4_ip6)
5918 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5919 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5922 super(TestDSliteCE, self).tearDown()
5923 if not self.vpp_dead:
5925 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5927 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5929 if __name__ == '__main__':
5930 unittest.main(testRunner=VppTestRunner)