9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1034 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1035 for lb_sm in lb_static_mappings:
1036 self.vapi.nat44_add_del_lb_static_mapping(
1037 lb_sm.external_addr,
1038 lb_sm.external_port,
1040 vrf_id=lb_sm.vrf_id,
1041 twice_nat=lb_sm.twice_nat,
1042 out2in_only=lb_sm.out2in_only,
1047 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1048 for id_m in identity_mappings:
1049 self.vapi.nat44_add_del_identity_mapping(
1050 addr_only=id_m.addr_only,
1053 sw_if_index=id_m.sw_if_index,
1055 protocol=id_m.protocol,
1058 adresses = self.vapi.nat44_address_dump()
1059 for addr in adresses:
1060 self.vapi.nat44_add_del_address_range(addr.ip_address,
1062 twice_nat=addr.twice_nat,
1065 self.vapi.nat_set_reass()
1066 self.vapi.nat_set_reass(is_ip6=1)
1068 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1069 local_port=0, external_port=0, vrf_id=0,
1070 is_add=1, external_sw_if_index=0xFFFFFFFF,
1071 proto=0, twice_nat=0):
1073 Add/delete NAT44 static mapping
1075 :param local_ip: Local IP address
1076 :param external_ip: External IP address
1077 :param local_port: Local port number (Optional)
1078 :param external_port: External port number (Optional)
1079 :param vrf_id: VRF ID (Default 0)
1080 :param is_add: 1 if add, 0 if delete (Default add)
1081 :param external_sw_if_index: External interface instead of IP address
1082 :param proto: IP protocol (Mandatory if port specified)
1083 :param twice_nat: 1 if translate external host address and port
1086 if local_port and external_port:
1088 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1089 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1090 self.vapi.nat44_add_del_static_mapping(
1093 external_sw_if_index,
1102 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1104 Add/delete NAT44 address
1106 :param ip: IP address
1107 :param is_add: 1 if add, 0 if delete (Default add)
1108 :param twice_nat: twice NAT address for extenal hosts
1110 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1111 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1113 twice_nat=twice_nat)
1115 def test_dynamic(self):
1116 """ NAT44 dynamic translation test """
1118 self.nat44_add_address(self.nat_addr)
1119 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1120 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1124 pkts = self.create_stream_in(self.pg0, self.pg1)
1125 self.pg0.add_stream(pkts)
1126 self.pg_enable_capture(self.pg_interfaces)
1128 capture = self.pg1.get_capture(len(pkts))
1129 self.verify_capture_out(capture)
1132 pkts = self.create_stream_out(self.pg1)
1133 self.pg1.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1136 capture = self.pg0.get_capture(len(pkts))
1137 self.verify_capture_in(capture, self.pg0)
1139 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1140 """ NAT44 handling of client packets with TTL=1 """
1142 self.nat44_add_address(self.nat_addr)
1143 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1144 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1147 # Client side - generate traffic
1148 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1149 self.pg0.add_stream(pkts)
1150 self.pg_enable_capture(self.pg_interfaces)
1153 # Client side - verify ICMP type 11 packets
1154 capture = self.pg0.get_capture(len(pkts))
1155 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1157 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1158 """ NAT44 handling of server packets with TTL=1 """
1160 self.nat44_add_address(self.nat_addr)
1161 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1162 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1165 # Client side - create sessions
1166 pkts = self.create_stream_in(self.pg0, self.pg1)
1167 self.pg0.add_stream(pkts)
1168 self.pg_enable_capture(self.pg_interfaces)
1171 # Server side - generate traffic
1172 capture = self.pg1.get_capture(len(pkts))
1173 self.verify_capture_out(capture)
1174 pkts = self.create_stream_out(self.pg1, ttl=1)
1175 self.pg1.add_stream(pkts)
1176 self.pg_enable_capture(self.pg_interfaces)
1179 # Server side - verify ICMP type 11 packets
1180 capture = self.pg1.get_capture(len(pkts))
1181 self.verify_capture_out_with_icmp_errors(capture,
1182 src_ip=self.pg1.local_ip4)
1184 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1185 """ NAT44 handling of error responses to client packets with TTL=2 """
1187 self.nat44_add_address(self.nat_addr)
1188 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1189 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1192 # Client side - generate traffic
1193 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1194 self.pg0.add_stream(pkts)
1195 self.pg_enable_capture(self.pg_interfaces)
1198 # Server side - simulate ICMP type 11 response
1199 capture = self.pg1.get_capture(len(pkts))
1200 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1201 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1202 ICMP(type=11) / packet[IP] for packet in capture]
1203 self.pg1.add_stream(pkts)
1204 self.pg_enable_capture(self.pg_interfaces)
1207 # Client side - verify ICMP type 11 packets
1208 capture = self.pg0.get_capture(len(pkts))
1209 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1211 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1212 """ NAT44 handling of error responses to server packets with TTL=2 """
1214 self.nat44_add_address(self.nat_addr)
1215 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1216 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1219 # Client side - create sessions
1220 pkts = self.create_stream_in(self.pg0, self.pg1)
1221 self.pg0.add_stream(pkts)
1222 self.pg_enable_capture(self.pg_interfaces)
1225 # Server side - generate traffic
1226 capture = self.pg1.get_capture(len(pkts))
1227 self.verify_capture_out(capture)
1228 pkts = self.create_stream_out(self.pg1, ttl=2)
1229 self.pg1.add_stream(pkts)
1230 self.pg_enable_capture(self.pg_interfaces)
1233 # Client side - simulate ICMP type 11 response
1234 capture = self.pg0.get_capture(len(pkts))
1235 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1236 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1237 ICMP(type=11) / packet[IP] for packet in capture]
1238 self.pg0.add_stream(pkts)
1239 self.pg_enable_capture(self.pg_interfaces)
1242 # Server side - verify ICMP type 11 packets
1243 capture = self.pg1.get_capture(len(pkts))
1244 self.verify_capture_out_with_icmp_errors(capture)
1246 def test_ping_out_interface_from_outside(self):
1247 """ Ping NAT44 out interface from outside network """
1249 self.nat44_add_address(self.nat_addr)
1250 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1251 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1254 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1255 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1256 ICMP(id=self.icmp_id_out, type='echo-request'))
1258 self.pg1.add_stream(pkts)
1259 self.pg_enable_capture(self.pg_interfaces)
1261 capture = self.pg1.get_capture(len(pkts))
1262 self.assertEqual(1, len(capture))
1265 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1266 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1267 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1268 self.assertEqual(packet[ICMP].type, 0) # echo reply
1270 self.logger.error(ppp("Unexpected or invalid packet "
1271 "(outside network):", packet))
1274 def test_ping_internal_host_from_outside(self):
1275 """ Ping internal host from outside network """
1277 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1278 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1279 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1283 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1284 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1285 ICMP(id=self.icmp_id_out, type='echo-request'))
1286 self.pg1.add_stream(pkt)
1287 self.pg_enable_capture(self.pg_interfaces)
1289 capture = self.pg0.get_capture(1)
1290 self.verify_capture_in(capture, self.pg0, packet_num=1)
1291 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1294 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1295 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1296 ICMP(id=self.icmp_id_in, type='echo-reply'))
1297 self.pg0.add_stream(pkt)
1298 self.pg_enable_capture(self.pg_interfaces)
1300 capture = self.pg1.get_capture(1)
1301 self.verify_capture_out(capture, same_port=True, packet_num=1)
1302 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1304 def test_forwarding(self):
1305 """ NAT44 forwarding test """
1307 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1308 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1310 self.vapi.nat44_forwarding_enable_disable(1)
1312 real_ip = self.pg0.remote_ip4n
1313 alias_ip = self.nat_addr_n
1314 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1315 external_ip=alias_ip)
1318 # in2out - static mapping match
1320 pkts = self.create_stream_out(self.pg1)
1321 self.pg1.add_stream(pkts)
1322 self.pg_enable_capture(self.pg_interfaces)
1324 capture = self.pg0.get_capture(len(pkts))
1325 self.verify_capture_in(capture, self.pg0)
1327 pkts = self.create_stream_in(self.pg0, self.pg1)
1328 self.pg0.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg1.get_capture(len(pkts))
1332 self.verify_capture_out(capture, same_port=True)
1334 # in2out - no static mapping match
1336 host0 = self.pg0.remote_hosts[0]
1337 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1339 pkts = self.create_stream_out(self.pg1,
1340 dst_ip=self.pg0.remote_ip4,
1341 use_inside_ports=True)
1342 self.pg1.add_stream(pkts)
1343 self.pg_enable_capture(self.pg_interfaces)
1345 capture = self.pg0.get_capture(len(pkts))
1346 self.verify_capture_in(capture, self.pg0)
1348 pkts = self.create_stream_in(self.pg0, self.pg1)
1349 self.pg0.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg1.get_capture(len(pkts))
1353 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1356 self.pg0.remote_hosts[0] = host0
1359 self.vapi.nat44_forwarding_enable_disable(0)
1360 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1361 external_ip=alias_ip,
1364 def test_static_in(self):
1365 """ 1:1 NAT initialized from inside network """
1367 nat_ip = "10.0.0.10"
1368 self.tcp_port_out = 6303
1369 self.udp_port_out = 6304
1370 self.icmp_id_out = 6305
1372 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1373 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1374 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1378 pkts = self.create_stream_in(self.pg0, self.pg1)
1379 self.pg0.add_stream(pkts)
1380 self.pg_enable_capture(self.pg_interfaces)
1382 capture = self.pg1.get_capture(len(pkts))
1383 self.verify_capture_out(capture, nat_ip, True)
1386 pkts = self.create_stream_out(self.pg1, nat_ip)
1387 self.pg1.add_stream(pkts)
1388 self.pg_enable_capture(self.pg_interfaces)
1390 capture = self.pg0.get_capture(len(pkts))
1391 self.verify_capture_in(capture, self.pg0)
1393 def test_static_out(self):
1394 """ 1:1 NAT initialized from outside network """
1396 nat_ip = "10.0.0.20"
1397 self.tcp_port_out = 6303
1398 self.udp_port_out = 6304
1399 self.icmp_id_out = 6305
1401 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1402 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1403 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1407 pkts = self.create_stream_out(self.pg1, nat_ip)
1408 self.pg1.add_stream(pkts)
1409 self.pg_enable_capture(self.pg_interfaces)
1411 capture = self.pg0.get_capture(len(pkts))
1412 self.verify_capture_in(capture, self.pg0)
1415 pkts = self.create_stream_in(self.pg0, self.pg1)
1416 self.pg0.add_stream(pkts)
1417 self.pg_enable_capture(self.pg_interfaces)
1419 capture = self.pg1.get_capture(len(pkts))
1420 self.verify_capture_out(capture, nat_ip, True)
1422 def test_static_with_port_in(self):
1423 """ 1:1 NAPT initialized from inside network """
1425 self.tcp_port_out = 3606
1426 self.udp_port_out = 3607
1427 self.icmp_id_out = 3608
1429 self.nat44_add_address(self.nat_addr)
1430 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431 self.tcp_port_in, self.tcp_port_out,
1432 proto=IP_PROTOS.tcp)
1433 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434 self.udp_port_in, self.udp_port_out,
1435 proto=IP_PROTOS.udp)
1436 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437 self.icmp_id_in, self.icmp_id_out,
1438 proto=IP_PROTOS.icmp)
1439 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1440 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1444 pkts = self.create_stream_in(self.pg0, self.pg1)
1445 self.pg0.add_stream(pkts)
1446 self.pg_enable_capture(self.pg_interfaces)
1448 capture = self.pg1.get_capture(len(pkts))
1449 self.verify_capture_out(capture)
1452 pkts = self.create_stream_out(self.pg1)
1453 self.pg1.add_stream(pkts)
1454 self.pg_enable_capture(self.pg_interfaces)
1456 capture = self.pg0.get_capture(len(pkts))
1457 self.verify_capture_in(capture, self.pg0)
1459 def test_static_with_port_out(self):
1460 """ 1:1 NAPT initialized from outside network """
1462 self.tcp_port_out = 30606
1463 self.udp_port_out = 30607
1464 self.icmp_id_out = 30608
1466 self.nat44_add_address(self.nat_addr)
1467 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1468 self.tcp_port_in, self.tcp_port_out,
1469 proto=IP_PROTOS.tcp)
1470 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1471 self.udp_port_in, self.udp_port_out,
1472 proto=IP_PROTOS.udp)
1473 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474 self.icmp_id_in, self.icmp_id_out,
1475 proto=IP_PROTOS.icmp)
1476 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1477 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1481 pkts = self.create_stream_out(self.pg1)
1482 self.pg1.add_stream(pkts)
1483 self.pg_enable_capture(self.pg_interfaces)
1485 capture = self.pg0.get_capture(len(pkts))
1486 self.verify_capture_in(capture, self.pg0)
1489 pkts = self.create_stream_in(self.pg0, self.pg1)
1490 self.pg0.add_stream(pkts)
1491 self.pg_enable_capture(self.pg_interfaces)
1493 capture = self.pg1.get_capture(len(pkts))
1494 self.verify_capture_out(capture)
1496 def test_static_vrf_aware(self):
1497 """ 1:1 NAT VRF awareness """
1499 nat_ip1 = "10.0.0.30"
1500 nat_ip2 = "10.0.0.40"
1501 self.tcp_port_out = 6303
1502 self.udp_port_out = 6304
1503 self.icmp_id_out = 6305
1505 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1507 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1509 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1511 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1512 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1514 # inside interface VRF match NAT44 static mapping VRF
1515 pkts = self.create_stream_in(self.pg4, self.pg3)
1516 self.pg4.add_stream(pkts)
1517 self.pg_enable_capture(self.pg_interfaces)
1519 capture = self.pg3.get_capture(len(pkts))
1520 self.verify_capture_out(capture, nat_ip1, True)
1522 # inside interface VRF don't match NAT44 static mapping VRF (packets
1524 pkts = self.create_stream_in(self.pg0, self.pg3)
1525 self.pg0.add_stream(pkts)
1526 self.pg_enable_capture(self.pg_interfaces)
1528 self.pg3.assert_nothing_captured()
1530 def test_identity_nat(self):
1531 """ Identity NAT """
1533 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1534 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1535 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1538 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1539 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1540 TCP(sport=12345, dport=56789))
1541 self.pg1.add_stream(p)
1542 self.pg_enable_capture(self.pg_interfaces)
1544 capture = self.pg0.get_capture(1)
1549 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1550 self.assertEqual(ip.src, self.pg1.remote_ip4)
1551 self.assertEqual(tcp.dport, 56789)
1552 self.assertEqual(tcp.sport, 12345)
1553 self.check_tcp_checksum(p)
1554 self.check_ip_checksum(p)
1556 self.logger.error(ppp("Unexpected or invalid packet:", p))
1559 def test_static_lb(self):
1560 """ NAT44 local service load balancing """
1561 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1564 server1 = self.pg0.remote_hosts[0]
1565 server2 = self.pg0.remote_hosts[1]
1567 locals = [{'addr': server1.ip4n,
1570 {'addr': server2.ip4n,
1574 self.nat44_add_address(self.nat_addr)
1575 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1578 local_num=len(locals),
1580 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1584 # from client to service
1585 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1586 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1587 TCP(sport=12345, dport=external_port))
1588 self.pg1.add_stream(p)
1589 self.pg_enable_capture(self.pg_interfaces)
1591 capture = self.pg0.get_capture(1)
1597 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1598 if ip.dst == server1.ip4:
1602 self.assertEqual(tcp.dport, local_port)
1603 self.check_tcp_checksum(p)
1604 self.check_ip_checksum(p)
1606 self.logger.error(ppp("Unexpected or invalid packet:", p))
1609 # from service back to client
1610 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1611 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1612 TCP(sport=local_port, dport=12345))
1613 self.pg0.add_stream(p)
1614 self.pg_enable_capture(self.pg_interfaces)
1616 capture = self.pg1.get_capture(1)
1621 self.assertEqual(ip.src, self.nat_addr)
1622 self.assertEqual(tcp.sport, external_port)
1623 self.check_tcp_checksum(p)
1624 self.check_ip_checksum(p)
1626 self.logger.error(ppp("Unexpected or invalid packet:", p))
1632 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1634 for client in clients:
1635 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1636 IP(src=client, dst=self.nat_addr) /
1637 TCP(sport=12345, dport=external_port))
1639 self.pg1.add_stream(pkts)
1640 self.pg_enable_capture(self.pg_interfaces)
1642 capture = self.pg0.get_capture(len(pkts))
1644 if p[IP].dst == server1.ip4:
1648 self.assertTrue(server1_n > server2_n)
1650 def test_static_lb_2(self):
1651 """ NAT44 local service load balancing (asymmetrical rule) """
1652 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1655 server1 = self.pg0.remote_hosts[0]
1656 server2 = self.pg0.remote_hosts[1]
1658 locals = [{'addr': server1.ip4n,
1661 {'addr': server2.ip4n,
1665 self.vapi.nat44_forwarding_enable_disable(1)
1666 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1670 local_num=len(locals),
1672 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1673 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1676 # from client to service
1677 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1678 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1679 TCP(sport=12345, dport=external_port))
1680 self.pg1.add_stream(p)
1681 self.pg_enable_capture(self.pg_interfaces)
1683 capture = self.pg0.get_capture(1)
1689 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1690 if ip.dst == server1.ip4:
1694 self.assertEqual(tcp.dport, local_port)
1695 self.check_tcp_checksum(p)
1696 self.check_ip_checksum(p)
1698 self.logger.error(ppp("Unexpected or invalid packet:", p))
1701 # from service back to client
1702 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1703 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1704 TCP(sport=local_port, dport=12345))
1705 self.pg0.add_stream(p)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg1.get_capture(1)
1713 self.assertEqual(ip.src, self.nat_addr)
1714 self.assertEqual(tcp.sport, external_port)
1715 self.check_tcp_checksum(p)
1716 self.check_ip_checksum(p)
1718 self.logger.error(ppp("Unexpected or invalid packet:", p))
1721 # from client to server (no translation)
1722 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1723 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1724 TCP(sport=12346, dport=local_port))
1725 self.pg1.add_stream(p)
1726 self.pg_enable_capture(self.pg_interfaces)
1728 capture = self.pg0.get_capture(1)
1734 self.assertEqual(ip.dst, server1.ip4)
1735 self.assertEqual(tcp.dport, local_port)
1736 self.check_tcp_checksum(p)
1737 self.check_ip_checksum(p)
1739 self.logger.error(ppp("Unexpected or invalid packet:", p))
1742 # from service back to client (no translation)
1743 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1744 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1745 TCP(sport=local_port, dport=12346))
1746 self.pg0.add_stream(p)
1747 self.pg_enable_capture(self.pg_interfaces)
1749 capture = self.pg1.get_capture(1)
1754 self.assertEqual(ip.src, server1.ip4)
1755 self.assertEqual(tcp.sport, local_port)
1756 self.check_tcp_checksum(p)
1757 self.check_ip_checksum(p)
1759 self.logger.error(ppp("Unexpected or invalid packet:", p))
1762 def test_multiple_inside_interfaces(self):
1763 """ NAT44 multiple non-overlapping address space inside interfaces """
1765 self.nat44_add_address(self.nat_addr)
1766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1768 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1771 # between two NAT44 inside interfaces (no translation)
1772 pkts = self.create_stream_in(self.pg0, self.pg1)
1773 self.pg0.add_stream(pkts)
1774 self.pg_enable_capture(self.pg_interfaces)
1776 capture = self.pg1.get_capture(len(pkts))
1777 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1779 # from NAT44 inside to interface without NAT44 feature (no translation)
1780 pkts = self.create_stream_in(self.pg0, self.pg2)
1781 self.pg0.add_stream(pkts)
1782 self.pg_enable_capture(self.pg_interfaces)
1784 capture = self.pg2.get_capture(len(pkts))
1785 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1787 # in2out 1st interface
1788 pkts = self.create_stream_in(self.pg0, self.pg3)
1789 self.pg0.add_stream(pkts)
1790 self.pg_enable_capture(self.pg_interfaces)
1792 capture = self.pg3.get_capture(len(pkts))
1793 self.verify_capture_out(capture)
1795 # out2in 1st interface
1796 pkts = self.create_stream_out(self.pg3)
1797 self.pg3.add_stream(pkts)
1798 self.pg_enable_capture(self.pg_interfaces)
1800 capture = self.pg0.get_capture(len(pkts))
1801 self.verify_capture_in(capture, self.pg0)
1803 # in2out 2nd interface
1804 pkts = self.create_stream_in(self.pg1, self.pg3)
1805 self.pg1.add_stream(pkts)
1806 self.pg_enable_capture(self.pg_interfaces)
1808 capture = self.pg3.get_capture(len(pkts))
1809 self.verify_capture_out(capture)
1811 # out2in 2nd interface
1812 pkts = self.create_stream_out(self.pg3)
1813 self.pg3.add_stream(pkts)
1814 self.pg_enable_capture(self.pg_interfaces)
1816 capture = self.pg1.get_capture(len(pkts))
1817 self.verify_capture_in(capture, self.pg1)
1819 def test_inside_overlapping_interfaces(self):
1820 """ NAT44 multiple inside interfaces with overlapping address space """
1822 static_nat_ip = "10.0.0.10"
1823 self.nat44_add_address(self.nat_addr)
1824 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1826 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1827 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1828 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1829 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1832 # between NAT44 inside interfaces with same VRF (no translation)
1833 pkts = self.create_stream_in(self.pg4, self.pg5)
1834 self.pg4.add_stream(pkts)
1835 self.pg_enable_capture(self.pg_interfaces)
1837 capture = self.pg5.get_capture(len(pkts))
1838 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1840 # between NAT44 inside interfaces with different VRF (hairpinning)
1841 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1842 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1843 TCP(sport=1234, dport=5678))
1844 self.pg4.add_stream(p)
1845 self.pg_enable_capture(self.pg_interfaces)
1847 capture = self.pg6.get_capture(1)
1852 self.assertEqual(ip.src, self.nat_addr)
1853 self.assertEqual(ip.dst, self.pg6.remote_ip4)
1854 self.assertNotEqual(tcp.sport, 1234)
1855 self.assertEqual(tcp.dport, 5678)
1857 self.logger.error(ppp("Unexpected or invalid packet:", p))
1860 # in2out 1st interface
1861 pkts = self.create_stream_in(self.pg4, self.pg3)
1862 self.pg4.add_stream(pkts)
1863 self.pg_enable_capture(self.pg_interfaces)
1865 capture = self.pg3.get_capture(len(pkts))
1866 self.verify_capture_out(capture)
1868 # out2in 1st interface
1869 pkts = self.create_stream_out(self.pg3)
1870 self.pg3.add_stream(pkts)
1871 self.pg_enable_capture(self.pg_interfaces)
1873 capture = self.pg4.get_capture(len(pkts))
1874 self.verify_capture_in(capture, self.pg4)
1876 # in2out 2nd interface
1877 pkts = self.create_stream_in(self.pg5, self.pg3)
1878 self.pg5.add_stream(pkts)
1879 self.pg_enable_capture(self.pg_interfaces)
1881 capture = self.pg3.get_capture(len(pkts))
1882 self.verify_capture_out(capture)
1884 # out2in 2nd interface
1885 pkts = self.create_stream_out(self.pg3)
1886 self.pg3.add_stream(pkts)
1887 self.pg_enable_capture(self.pg_interfaces)
1889 capture = self.pg5.get_capture(len(pkts))
1890 self.verify_capture_in(capture, self.pg5)
1893 addresses = self.vapi.nat44_address_dump()
1894 self.assertEqual(len(addresses), 1)
1895 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1896 self.assertEqual(len(sessions), 3)
1897 for session in sessions:
1898 self.assertFalse(session.is_static)
1899 self.assertEqual(session.inside_ip_address[0:4],
1900 self.pg5.remote_ip4n)
1901 self.assertEqual(session.outside_ip_address,
1902 addresses[0].ip_address)
1903 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1904 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1905 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1906 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1907 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1908 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1909 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1910 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1911 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1913 # in2out 3rd interface
1914 pkts = self.create_stream_in(self.pg6, self.pg3)
1915 self.pg6.add_stream(pkts)
1916 self.pg_enable_capture(self.pg_interfaces)
1918 capture = self.pg3.get_capture(len(pkts))
1919 self.verify_capture_out(capture, static_nat_ip, True)
1921 # out2in 3rd interface
1922 pkts = self.create_stream_out(self.pg3, static_nat_ip)
1923 self.pg3.add_stream(pkts)
1924 self.pg_enable_capture(self.pg_interfaces)
1926 capture = self.pg6.get_capture(len(pkts))
1927 self.verify_capture_in(capture, self.pg6)
1929 # general user and session dump verifications
1930 users = self.vapi.nat44_user_dump()
1931 self.assertTrue(len(users) >= 3)
1932 addresses = self.vapi.nat44_address_dump()
1933 self.assertEqual(len(addresses), 1)
1935 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1937 for session in sessions:
1938 self.assertEqual(user.ip_address, session.inside_ip_address)
1939 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1940 self.assertTrue(session.protocol in
1941 [IP_PROTOS.tcp, IP_PROTOS.udp,
1945 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1946 self.assertTrue(len(sessions) >= 4)
1947 for session in sessions:
1948 self.assertFalse(session.is_static)
1949 self.assertEqual(session.inside_ip_address[0:4],
1950 self.pg4.remote_ip4n)
1951 self.assertEqual(session.outside_ip_address,
1952 addresses[0].ip_address)
1955 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1956 self.assertTrue(len(sessions) >= 3)
1957 for session in sessions:
1958 self.assertTrue(session.is_static)
1959 self.assertEqual(session.inside_ip_address[0:4],
1960 self.pg6.remote_ip4n)
1961 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1962 map(int, static_nat_ip.split('.')))
1963 self.assertTrue(session.inside_port in
1964 [self.tcp_port_in, self.udp_port_in,
1967 def test_hairpinning(self):
1968 """ NAT44 hairpinning - 1:1 NAPT """
1970 host = self.pg0.remote_hosts[0]
1971 server = self.pg0.remote_hosts[1]
1974 server_in_port = 5678
1975 server_out_port = 8765
1977 self.nat44_add_address(self.nat_addr)
1978 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1979 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1981 # add static mapping for server
1982 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1983 server_in_port, server_out_port,
1984 proto=IP_PROTOS.tcp)
1986 # send packet from host to server
1987 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1988 IP(src=host.ip4, dst=self.nat_addr) /
1989 TCP(sport=host_in_port, dport=server_out_port))
1990 self.pg0.add_stream(p)
1991 self.pg_enable_capture(self.pg_interfaces)
1993 capture = self.pg0.get_capture(1)
1998 self.assertEqual(ip.src, self.nat_addr)
1999 self.assertEqual(ip.dst, server.ip4)
2000 self.assertNotEqual(tcp.sport, host_in_port)
2001 self.assertEqual(tcp.dport, server_in_port)
2002 self.check_tcp_checksum(p)
2003 host_out_port = tcp.sport
2005 self.logger.error(ppp("Unexpected or invalid packet:", p))
2008 # send reply from server to host
2009 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2010 IP(src=server.ip4, dst=self.nat_addr) /
2011 TCP(sport=server_in_port, dport=host_out_port))
2012 self.pg0.add_stream(p)
2013 self.pg_enable_capture(self.pg_interfaces)
2015 capture = self.pg0.get_capture(1)
2020 self.assertEqual(ip.src, self.nat_addr)
2021 self.assertEqual(ip.dst, host.ip4)
2022 self.assertEqual(tcp.sport, server_out_port)
2023 self.assertEqual(tcp.dport, host_in_port)
2024 self.check_tcp_checksum(p)
2026 self.logger.error(ppp("Unexpected or invalid packet:"), p)
2029 def test_hairpinning2(self):
2030 """ NAT44 hairpinning - 1:1 NAT"""
2032 server1_nat_ip = "10.0.0.10"
2033 server2_nat_ip = "10.0.0.11"
2034 host = self.pg0.remote_hosts[0]
2035 server1 = self.pg0.remote_hosts[1]
2036 server2 = self.pg0.remote_hosts[2]
2037 server_tcp_port = 22
2038 server_udp_port = 20
2040 self.nat44_add_address(self.nat_addr)
2041 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2042 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2045 # add static mapping for servers
2046 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2047 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2051 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2052 IP(src=host.ip4, dst=server1_nat_ip) /
2053 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2055 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2056 IP(src=host.ip4, dst=server1_nat_ip) /
2057 UDP(sport=self.udp_port_in, dport=server_udp_port))
2059 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2060 IP(src=host.ip4, dst=server1_nat_ip) /
2061 ICMP(id=self.icmp_id_in, type='echo-request'))
2063 self.pg0.add_stream(pkts)
2064 self.pg_enable_capture(self.pg_interfaces)
2066 capture = self.pg0.get_capture(len(pkts))
2067 for packet in capture:
2069 self.assertEqual(packet[IP].src, self.nat_addr)
2070 self.assertEqual(packet[IP].dst, server1.ip4)
2071 if packet.haslayer(TCP):
2072 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2073 self.assertEqual(packet[TCP].dport, server_tcp_port)
2074 self.tcp_port_out = packet[TCP].sport
2075 self.check_tcp_checksum(packet)
2076 elif packet.haslayer(UDP):
2077 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2078 self.assertEqual(packet[UDP].dport, server_udp_port)
2079 self.udp_port_out = packet[UDP].sport
2081 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2082 self.icmp_id_out = packet[ICMP].id
2084 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2089 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2090 IP(src=server1.ip4, dst=self.nat_addr) /
2091 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2093 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2094 IP(src=server1.ip4, dst=self.nat_addr) /
2095 UDP(sport=server_udp_port, dport=self.udp_port_out))
2097 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2098 IP(src=server1.ip4, dst=self.nat_addr) /
2099 ICMP(id=self.icmp_id_out, type='echo-reply'))
2101 self.pg0.add_stream(pkts)
2102 self.pg_enable_capture(self.pg_interfaces)
2104 capture = self.pg0.get_capture(len(pkts))
2105 for packet in capture:
2107 self.assertEqual(packet[IP].src, server1_nat_ip)
2108 self.assertEqual(packet[IP].dst, host.ip4)
2109 if packet.haslayer(TCP):
2110 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2111 self.assertEqual(packet[TCP].sport, server_tcp_port)
2112 self.check_tcp_checksum(packet)
2113 elif packet.haslayer(UDP):
2114 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2115 self.assertEqual(packet[UDP].sport, server_udp_port)
2117 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2119 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2122 # server2 to server1
2124 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2125 IP(src=server2.ip4, dst=server1_nat_ip) /
2126 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2128 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2129 IP(src=server2.ip4, dst=server1_nat_ip) /
2130 UDP(sport=self.udp_port_in, dport=server_udp_port))
2132 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2133 IP(src=server2.ip4, dst=server1_nat_ip) /
2134 ICMP(id=self.icmp_id_in, type='echo-request'))
2136 self.pg0.add_stream(pkts)
2137 self.pg_enable_capture(self.pg_interfaces)
2139 capture = self.pg0.get_capture(len(pkts))
2140 for packet in capture:
2142 self.assertEqual(packet[IP].src, server2_nat_ip)
2143 self.assertEqual(packet[IP].dst, server1.ip4)
2144 if packet.haslayer(TCP):
2145 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2146 self.assertEqual(packet[TCP].dport, server_tcp_port)
2147 self.tcp_port_out = packet[TCP].sport
2148 self.check_tcp_checksum(packet)
2149 elif packet.haslayer(UDP):
2150 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2151 self.assertEqual(packet[UDP].dport, server_udp_port)
2152 self.udp_port_out = packet[UDP].sport
2154 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2155 self.icmp_id_out = packet[ICMP].id
2157 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2160 # server1 to server2
2162 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2163 IP(src=server1.ip4, dst=server2_nat_ip) /
2164 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2166 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2167 IP(src=server1.ip4, dst=server2_nat_ip) /
2168 UDP(sport=server_udp_port, dport=self.udp_port_out))
2170 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2171 IP(src=server1.ip4, dst=server2_nat_ip) /
2172 ICMP(id=self.icmp_id_out, type='echo-reply'))
2174 self.pg0.add_stream(pkts)
2175 self.pg_enable_capture(self.pg_interfaces)
2177 capture = self.pg0.get_capture(len(pkts))
2178 for packet in capture:
2180 self.assertEqual(packet[IP].src, server1_nat_ip)
2181 self.assertEqual(packet[IP].dst, server2.ip4)
2182 if packet.haslayer(TCP):
2183 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2184 self.assertEqual(packet[TCP].sport, server_tcp_port)
2185 self.check_tcp_checksum(packet)
2186 elif packet.haslayer(UDP):
2187 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2188 self.assertEqual(packet[UDP].sport, server_udp_port)
2190 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2192 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2195 def test_max_translations_per_user(self):
2196 """ MAX translations per user - recycle the least recently used """
2198 self.nat44_add_address(self.nat_addr)
2199 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2200 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2203 # get maximum number of translations per user
2204 nat44_config = self.vapi.nat_show_config()
2206 # send more than maximum number of translations per user packets
2207 pkts_num = nat44_config.max_translations_per_user + 5
2209 for port in range(0, pkts_num):
2210 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2211 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2212 TCP(sport=1025 + port))
2214 self.pg0.add_stream(pkts)
2215 self.pg_enable_capture(self.pg_interfaces)
2218 # verify number of translated packet
2219 self.pg1.get_capture(pkts_num)
2221 def test_interface_addr(self):
2222 """ Acquire NAT44 addresses from interface """
2223 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2225 # no address in NAT pool
2226 adresses = self.vapi.nat44_address_dump()
2227 self.assertEqual(0, len(adresses))
2229 # configure interface address and check NAT address pool
2230 self.pg7.config_ip4()
2231 adresses = self.vapi.nat44_address_dump()
2232 self.assertEqual(1, len(adresses))
2233 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2235 # remove interface address and check NAT address pool
2236 self.pg7.unconfig_ip4()
2237 adresses = self.vapi.nat44_address_dump()
2238 self.assertEqual(0, len(adresses))
2240 def test_interface_addr_static_mapping(self):
2241 """ Static mapping with addresses from interface """
2242 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2243 self.nat44_add_static_mapping(
2245 external_sw_if_index=self.pg7.sw_if_index)
2247 # static mappings with external interface
2248 static_mappings = self.vapi.nat44_static_mapping_dump()
2249 self.assertEqual(1, len(static_mappings))
2250 self.assertEqual(self.pg7.sw_if_index,
2251 static_mappings[0].external_sw_if_index)
2253 # configure interface address and check static mappings
2254 self.pg7.config_ip4()
2255 static_mappings = self.vapi.nat44_static_mapping_dump()
2256 self.assertEqual(1, len(static_mappings))
2257 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2258 self.pg7.local_ip4n)
2259 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2261 # remove interface address and check static mappings
2262 self.pg7.unconfig_ip4()
2263 static_mappings = self.vapi.nat44_static_mapping_dump()
2264 self.assertEqual(0, len(static_mappings))
2266 def test_interface_addr_identity_nat(self):
2267 """ Identity NAT with addresses from interface """
2270 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2271 self.vapi.nat44_add_del_identity_mapping(
2272 sw_if_index=self.pg7.sw_if_index,
2274 protocol=IP_PROTOS.tcp,
2277 # identity mappings with external interface
2278 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2279 self.assertEqual(1, len(identity_mappings))
2280 self.assertEqual(self.pg7.sw_if_index,
2281 identity_mappings[0].sw_if_index)
2283 # configure interface address and check identity mappings
2284 self.pg7.config_ip4()
2285 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2286 self.assertEqual(1, len(identity_mappings))
2287 self.assertEqual(identity_mappings[0].ip_address,
2288 self.pg7.local_ip4n)
2289 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2290 self.assertEqual(port, identity_mappings[0].port)
2291 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2293 # remove interface address and check identity mappings
2294 self.pg7.unconfig_ip4()
2295 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2296 self.assertEqual(0, len(identity_mappings))
2298 def test_ipfix_nat44_sess(self):
2299 """ IPFIX logging NAT44 session created/delted """
2300 self.ipfix_domain_id = 10
2301 self.ipfix_src_port = 20202
2302 colector_port = 30303
2303 bind_layers(UDP, IPFIX, dport=30303)
2304 self.nat44_add_address(self.nat_addr)
2305 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2306 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2308 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2309 src_address=self.pg3.local_ip4n,
2311 template_interval=10,
2312 collector_port=colector_port)
2313 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2314 src_port=self.ipfix_src_port)
2316 pkts = self.create_stream_in(self.pg0, self.pg1)
2317 self.pg0.add_stream(pkts)
2318 self.pg_enable_capture(self.pg_interfaces)
2320 capture = self.pg1.get_capture(len(pkts))
2321 self.verify_capture_out(capture)
2322 self.nat44_add_address(self.nat_addr, is_add=0)
2323 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2324 capture = self.pg3.get_capture(9)
2325 ipfix = IPFIXDecoder()
2326 # first load template
2328 self.assertTrue(p.haslayer(IPFIX))
2329 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2330 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2331 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2332 self.assertEqual(p[UDP].dport, colector_port)
2333 self.assertEqual(p[IPFIX].observationDomainID,
2334 self.ipfix_domain_id)
2335 if p.haslayer(Template):
2336 ipfix.add_template(p.getlayer(Template))
2337 # verify events in data set
2339 if p.haslayer(Data):
2340 data = ipfix.decode_data_set(p.getlayer(Set))
2341 self.verify_ipfix_nat44_ses(data)
2343 def test_ipfix_addr_exhausted(self):
2344 """ IPFIX logging NAT addresses exhausted """
2345 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2346 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2348 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2349 src_address=self.pg3.local_ip4n,
2351 template_interval=10)
2352 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2353 src_port=self.ipfix_src_port)
2355 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2356 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2358 self.pg0.add_stream(p)
2359 self.pg_enable_capture(self.pg_interfaces)
2361 capture = self.pg1.get_capture(0)
2362 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2363 capture = self.pg3.get_capture(9)
2364 ipfix = IPFIXDecoder()
2365 # first load template
2367 self.assertTrue(p.haslayer(IPFIX))
2368 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2369 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2370 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2371 self.assertEqual(p[UDP].dport, 4739)
2372 self.assertEqual(p[IPFIX].observationDomainID,
2373 self.ipfix_domain_id)
2374 if p.haslayer(Template):
2375 ipfix.add_template(p.getlayer(Template))
2376 # verify events in data set
2378 if p.haslayer(Data):
2379 data = ipfix.decode_data_set(p.getlayer(Set))
2380 self.verify_ipfix_addr_exhausted(data)
2382 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2383 def test_ipfix_max_sessions(self):
2384 """ IPFIX logging maximum session entries exceeded """
2385 self.nat44_add_address(self.nat_addr)
2386 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2387 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2390 nat44_config = self.vapi.nat_show_config()
2391 max_sessions = 10 * nat44_config.translation_buckets
2394 for i in range(0, max_sessions):
2395 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2396 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2397 IP(src=src, dst=self.pg1.remote_ip4) /
2400 self.pg0.add_stream(pkts)
2401 self.pg_enable_capture(self.pg_interfaces)
2404 self.pg1.get_capture(max_sessions)
2405 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2406 src_address=self.pg3.local_ip4n,
2408 template_interval=10)
2409 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2410 src_port=self.ipfix_src_port)
2412 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2413 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2415 self.pg0.add_stream(p)
2416 self.pg_enable_capture(self.pg_interfaces)
2418 self.pg1.get_capture(0)
2419 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2420 capture = self.pg3.get_capture(9)
2421 ipfix = IPFIXDecoder()
2422 # first load template
2424 self.assertTrue(p.haslayer(IPFIX))
2425 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2426 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2427 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2428 self.assertEqual(p[UDP].dport, 4739)
2429 self.assertEqual(p[IPFIX].observationDomainID,
2430 self.ipfix_domain_id)
2431 if p.haslayer(Template):
2432 ipfix.add_template(p.getlayer(Template))
2433 # verify events in data set
2435 if p.haslayer(Data):
2436 data = ipfix.decode_data_set(p.getlayer(Set))
2437 self.verify_ipfix_max_sessions(data, max_sessions)
2439 def test_pool_addr_fib(self):
2440 """ NAT44 add pool addresses to FIB """
2441 static_addr = '10.0.0.10'
2442 self.nat44_add_address(self.nat_addr)
2443 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2444 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2446 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2449 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2450 ARP(op=ARP.who_has, pdst=self.nat_addr,
2451 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2452 self.pg1.add_stream(p)
2453 self.pg_enable_capture(self.pg_interfaces)
2455 capture = self.pg1.get_capture(1)
2456 self.assertTrue(capture[0].haslayer(ARP))
2457 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2460 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2461 ARP(op=ARP.who_has, pdst=static_addr,
2462 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2463 self.pg1.add_stream(p)
2464 self.pg_enable_capture(self.pg_interfaces)
2466 capture = self.pg1.get_capture(1)
2467 self.assertTrue(capture[0].haslayer(ARP))
2468 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2470 # send ARP to non-NAT44 interface
2471 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2472 ARP(op=ARP.who_has, pdst=self.nat_addr,
2473 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2474 self.pg2.add_stream(p)
2475 self.pg_enable_capture(self.pg_interfaces)
2477 capture = self.pg1.get_capture(0)
2479 # remove addresses and verify
2480 self.nat44_add_address(self.nat_addr, is_add=0)
2481 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2484 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2485 ARP(op=ARP.who_has, pdst=self.nat_addr,
2486 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2487 self.pg1.add_stream(p)
2488 self.pg_enable_capture(self.pg_interfaces)
2490 capture = self.pg1.get_capture(0)
2492 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2493 ARP(op=ARP.who_has, pdst=static_addr,
2494 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2495 self.pg1.add_stream(p)
2496 self.pg_enable_capture(self.pg_interfaces)
2498 capture = self.pg1.get_capture(0)
2500 def test_vrf_mode(self):
2501 """ NAT44 tenant VRF aware address pool mode """
2505 nat_ip1 = "10.0.0.10"
2506 nat_ip2 = "10.0.0.11"
2508 self.pg0.unconfig_ip4()
2509 self.pg1.unconfig_ip4()
2510 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2511 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2512 self.pg0.set_table_ip4(vrf_id1)
2513 self.pg1.set_table_ip4(vrf_id2)
2514 self.pg0.config_ip4()
2515 self.pg1.config_ip4()
2517 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2518 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2519 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2520 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2521 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2525 pkts = self.create_stream_in(self.pg0, self.pg2)
2526 self.pg0.add_stream(pkts)
2527 self.pg_enable_capture(self.pg_interfaces)
2529 capture = self.pg2.get_capture(len(pkts))
2530 self.verify_capture_out(capture, nat_ip1)
2533 pkts = self.create_stream_in(self.pg1, self.pg2)
2534 self.pg1.add_stream(pkts)
2535 self.pg_enable_capture(self.pg_interfaces)
2537 capture = self.pg2.get_capture(len(pkts))
2538 self.verify_capture_out(capture, nat_ip2)
2540 self.pg0.unconfig_ip4()
2541 self.pg1.unconfig_ip4()
2542 self.pg0.set_table_ip4(0)
2543 self.pg1.set_table_ip4(0)
2544 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2545 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2547 def test_vrf_feature_independent(self):
2548 """ NAT44 tenant VRF independent address pool mode """
2550 nat_ip1 = "10.0.0.10"
2551 nat_ip2 = "10.0.0.11"
2553 self.nat44_add_address(nat_ip1)
2554 self.nat44_add_address(nat_ip2, vrf_id=99)
2555 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2556 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2557 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2561 pkts = self.create_stream_in(self.pg0, self.pg2)
2562 self.pg0.add_stream(pkts)
2563 self.pg_enable_capture(self.pg_interfaces)
2565 capture = self.pg2.get_capture(len(pkts))
2566 self.verify_capture_out(capture, nat_ip1)
2569 pkts = self.create_stream_in(self.pg1, self.pg2)
2570 self.pg1.add_stream(pkts)
2571 self.pg_enable_capture(self.pg_interfaces)
2573 capture = self.pg2.get_capture(len(pkts))
2574 self.verify_capture_out(capture, nat_ip1)
2576 def test_dynamic_ipless_interfaces(self):
2577 """ NAT44 interfaces without configured IP address """
2579 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2580 mactobinary(self.pg7.remote_mac),
2581 self.pg7.remote_ip4n,
2583 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2584 mactobinary(self.pg8.remote_mac),
2585 self.pg8.remote_ip4n,
2588 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2589 dst_address_length=32,
2590 next_hop_address=self.pg7.remote_ip4n,
2591 next_hop_sw_if_index=self.pg7.sw_if_index)
2592 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2593 dst_address_length=32,
2594 next_hop_address=self.pg8.remote_ip4n,
2595 next_hop_sw_if_index=self.pg8.sw_if_index)
2597 self.nat44_add_address(self.nat_addr)
2598 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2599 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2603 pkts = self.create_stream_in(self.pg7, self.pg8)
2604 self.pg7.add_stream(pkts)
2605 self.pg_enable_capture(self.pg_interfaces)
2607 capture = self.pg8.get_capture(len(pkts))
2608 self.verify_capture_out(capture)
2611 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2612 self.pg8.add_stream(pkts)
2613 self.pg_enable_capture(self.pg_interfaces)
2615 capture = self.pg7.get_capture(len(pkts))
2616 self.verify_capture_in(capture, self.pg7)
2618 def test_static_ipless_interfaces(self):
2619 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2621 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2622 mactobinary(self.pg7.remote_mac),
2623 self.pg7.remote_ip4n,
2625 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2626 mactobinary(self.pg8.remote_mac),
2627 self.pg8.remote_ip4n,
2630 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2631 dst_address_length=32,
2632 next_hop_address=self.pg7.remote_ip4n,
2633 next_hop_sw_if_index=self.pg7.sw_if_index)
2634 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2635 dst_address_length=32,
2636 next_hop_address=self.pg8.remote_ip4n,
2637 next_hop_sw_if_index=self.pg8.sw_if_index)
2639 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2640 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2641 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2645 pkts = self.create_stream_out(self.pg8)
2646 self.pg8.add_stream(pkts)
2647 self.pg_enable_capture(self.pg_interfaces)
2649 capture = self.pg7.get_capture(len(pkts))
2650 self.verify_capture_in(capture, self.pg7)
2653 pkts = self.create_stream_in(self.pg7, self.pg8)
2654 self.pg7.add_stream(pkts)
2655 self.pg_enable_capture(self.pg_interfaces)
2657 capture = self.pg8.get_capture(len(pkts))
2658 self.verify_capture_out(capture, self.nat_addr, True)
2660 def test_static_with_port_ipless_interfaces(self):
2661 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2663 self.tcp_port_out = 30606
2664 self.udp_port_out = 30607
2665 self.icmp_id_out = 30608
2667 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2668 mactobinary(self.pg7.remote_mac),
2669 self.pg7.remote_ip4n,
2671 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2672 mactobinary(self.pg8.remote_mac),
2673 self.pg8.remote_ip4n,
2676 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2677 dst_address_length=32,
2678 next_hop_address=self.pg7.remote_ip4n,
2679 next_hop_sw_if_index=self.pg7.sw_if_index)
2680 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2681 dst_address_length=32,
2682 next_hop_address=self.pg8.remote_ip4n,
2683 next_hop_sw_if_index=self.pg8.sw_if_index)
2685 self.nat44_add_address(self.nat_addr)
2686 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2687 self.tcp_port_in, self.tcp_port_out,
2688 proto=IP_PROTOS.tcp)
2689 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2690 self.udp_port_in, self.udp_port_out,
2691 proto=IP_PROTOS.udp)
2692 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2693 self.icmp_id_in, self.icmp_id_out,
2694 proto=IP_PROTOS.icmp)
2695 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2696 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2700 pkts = self.create_stream_out(self.pg8)
2701 self.pg8.add_stream(pkts)
2702 self.pg_enable_capture(self.pg_interfaces)
2704 capture = self.pg7.get_capture(len(pkts))
2705 self.verify_capture_in(capture, self.pg7)
2708 pkts = self.create_stream_in(self.pg7, self.pg8)
2709 self.pg7.add_stream(pkts)
2710 self.pg_enable_capture(self.pg_interfaces)
2712 capture = self.pg8.get_capture(len(pkts))
2713 self.verify_capture_out(capture)
2715 def test_static_unknown_proto(self):
2716 """ 1:1 NAT translate packet with unknown protocol """
2717 nat_ip = "10.0.0.10"
2718 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2719 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2720 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2724 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2725 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2727 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2728 TCP(sport=1234, dport=1234))
2729 self.pg0.add_stream(p)
2730 self.pg_enable_capture(self.pg_interfaces)
2732 p = self.pg1.get_capture(1)
2735 self.assertEqual(packet[IP].src, nat_ip)
2736 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2737 self.assertTrue(packet.haslayer(GRE))
2738 self.check_ip_checksum(packet)
2740 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2744 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2745 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2747 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2748 TCP(sport=1234, dport=1234))
2749 self.pg1.add_stream(p)
2750 self.pg_enable_capture(self.pg_interfaces)
2752 p = self.pg0.get_capture(1)
2755 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2756 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2757 self.assertTrue(packet.haslayer(GRE))
2758 self.check_ip_checksum(packet)
2760 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2763 def test_hairpinning_static_unknown_proto(self):
2764 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2766 host = self.pg0.remote_hosts[0]
2767 server = self.pg0.remote_hosts[1]
2769 host_nat_ip = "10.0.0.10"
2770 server_nat_ip = "10.0.0.11"
2772 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2773 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2774 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2775 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2779 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2780 IP(src=host.ip4, dst=server_nat_ip) /
2782 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2783 TCP(sport=1234, dport=1234))
2784 self.pg0.add_stream(p)
2785 self.pg_enable_capture(self.pg_interfaces)
2787 p = self.pg0.get_capture(1)
2790 self.assertEqual(packet[IP].src, host_nat_ip)
2791 self.assertEqual(packet[IP].dst, server.ip4)
2792 self.assertTrue(packet.haslayer(GRE))
2793 self.check_ip_checksum(packet)
2795 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2799 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2800 IP(src=server.ip4, dst=host_nat_ip) /
2802 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2803 TCP(sport=1234, dport=1234))
2804 self.pg0.add_stream(p)
2805 self.pg_enable_capture(self.pg_interfaces)
2807 p = self.pg0.get_capture(1)
2810 self.assertEqual(packet[IP].src, server_nat_ip)
2811 self.assertEqual(packet[IP].dst, host.ip4)
2812 self.assertTrue(packet.haslayer(GRE))
2813 self.check_ip_checksum(packet)
2815 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2818 def test_unknown_proto(self):
2819 """ NAT44 translate packet with unknown protocol """
2820 self.nat44_add_address(self.nat_addr)
2821 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2822 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2826 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2827 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2828 TCP(sport=self.tcp_port_in, dport=20))
2829 self.pg0.add_stream(p)
2830 self.pg_enable_capture(self.pg_interfaces)
2832 p = self.pg1.get_capture(1)
2834 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2835 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2837 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2838 TCP(sport=1234, dport=1234))
2839 self.pg0.add_stream(p)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 p = self.pg1.get_capture(1)
2845 self.assertEqual(packet[IP].src, self.nat_addr)
2846 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2847 self.assertTrue(packet.haslayer(GRE))
2848 self.check_ip_checksum(packet)
2850 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2854 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2855 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2857 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2858 TCP(sport=1234, dport=1234))
2859 self.pg1.add_stream(p)
2860 self.pg_enable_capture(self.pg_interfaces)
2862 p = self.pg0.get_capture(1)
2865 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2866 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2867 self.assertTrue(packet.haslayer(GRE))
2868 self.check_ip_checksum(packet)
2870 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2873 def test_hairpinning_unknown_proto(self):
2874 """ NAT44 translate packet with unknown protocol - hairpinning """
2875 host = self.pg0.remote_hosts[0]
2876 server = self.pg0.remote_hosts[1]
2879 server_in_port = 5678
2880 server_out_port = 8765
2881 server_nat_ip = "10.0.0.11"
2883 self.nat44_add_address(self.nat_addr)
2884 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2885 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2888 # add static mapping for server
2889 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2892 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2893 IP(src=host.ip4, dst=server_nat_ip) /
2894 TCP(sport=host_in_port, dport=server_out_port))
2895 self.pg0.add_stream(p)
2896 self.pg_enable_capture(self.pg_interfaces)
2898 capture = self.pg0.get_capture(1)
2900 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2901 IP(src=host.ip4, dst=server_nat_ip) /
2903 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2904 TCP(sport=1234, dport=1234))
2905 self.pg0.add_stream(p)
2906 self.pg_enable_capture(self.pg_interfaces)
2908 p = self.pg0.get_capture(1)
2911 self.assertEqual(packet[IP].src, self.nat_addr)
2912 self.assertEqual(packet[IP].dst, server.ip4)
2913 self.assertTrue(packet.haslayer(GRE))
2914 self.check_ip_checksum(packet)
2916 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2920 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2921 IP(src=server.ip4, dst=self.nat_addr) /
2923 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2924 TCP(sport=1234, dport=1234))
2925 self.pg0.add_stream(p)
2926 self.pg_enable_capture(self.pg_interfaces)
2928 p = self.pg0.get_capture(1)
2931 self.assertEqual(packet[IP].src, server_nat_ip)
2932 self.assertEqual(packet[IP].dst, host.ip4)
2933 self.assertTrue(packet.haslayer(GRE))
2934 self.check_ip_checksum(packet)
2936 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2939 def test_output_feature(self):
2940 """ NAT44 interface output feature (in2out postrouting) """
2941 self.nat44_add_address(self.nat_addr)
2942 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2943 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2944 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2948 pkts = self.create_stream_in(self.pg0, self.pg3)
2949 self.pg0.add_stream(pkts)
2950 self.pg_enable_capture(self.pg_interfaces)
2952 capture = self.pg3.get_capture(len(pkts))
2953 self.verify_capture_out(capture)
2956 pkts = self.create_stream_out(self.pg3)
2957 self.pg3.add_stream(pkts)
2958 self.pg_enable_capture(self.pg_interfaces)
2960 capture = self.pg0.get_capture(len(pkts))
2961 self.verify_capture_in(capture, self.pg0)
2963 # from non-NAT interface to NAT inside interface
2964 pkts = self.create_stream_in(self.pg2, self.pg0)
2965 self.pg2.add_stream(pkts)
2966 self.pg_enable_capture(self.pg_interfaces)
2968 capture = self.pg0.get_capture(len(pkts))
2969 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2971 def test_output_feature_vrf_aware(self):
2972 """ NAT44 interface output feature VRF aware (in2out postrouting) """
2973 nat_ip_vrf10 = "10.0.0.10"
2974 nat_ip_vrf20 = "10.0.0.20"
2976 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2977 dst_address_length=32,
2978 next_hop_address=self.pg3.remote_ip4n,
2979 next_hop_sw_if_index=self.pg3.sw_if_index,
2981 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2982 dst_address_length=32,
2983 next_hop_address=self.pg3.remote_ip4n,
2984 next_hop_sw_if_index=self.pg3.sw_if_index,
2987 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2988 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2989 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2990 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2991 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2995 pkts = self.create_stream_in(self.pg4, self.pg3)
2996 self.pg4.add_stream(pkts)
2997 self.pg_enable_capture(self.pg_interfaces)
2999 capture = self.pg3.get_capture(len(pkts))
3000 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3003 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3004 self.pg3.add_stream(pkts)
3005 self.pg_enable_capture(self.pg_interfaces)
3007 capture = self.pg4.get_capture(len(pkts))
3008 self.verify_capture_in(capture, self.pg4)
3011 pkts = self.create_stream_in(self.pg6, self.pg3)
3012 self.pg6.add_stream(pkts)
3013 self.pg_enable_capture(self.pg_interfaces)
3015 capture = self.pg3.get_capture(len(pkts))
3016 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3019 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3020 self.pg3.add_stream(pkts)
3021 self.pg_enable_capture(self.pg_interfaces)
3023 capture = self.pg6.get_capture(len(pkts))
3024 self.verify_capture_in(capture, self.pg6)
3026 def test_output_feature_hairpinning(self):
3027 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3028 host = self.pg0.remote_hosts[0]
3029 server = self.pg0.remote_hosts[1]
3032 server_in_port = 5678
3033 server_out_port = 8765
3035 self.nat44_add_address(self.nat_addr)
3036 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3037 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3040 # add static mapping for server
3041 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3042 server_in_port, server_out_port,
3043 proto=IP_PROTOS.tcp)
3045 # send packet from host to server
3046 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3047 IP(src=host.ip4, dst=self.nat_addr) /
3048 TCP(sport=host_in_port, dport=server_out_port))
3049 self.pg0.add_stream(p)
3050 self.pg_enable_capture(self.pg_interfaces)
3052 capture = self.pg0.get_capture(1)
3057 self.assertEqual(ip.src, self.nat_addr)
3058 self.assertEqual(ip.dst, server.ip4)
3059 self.assertNotEqual(tcp.sport, host_in_port)
3060 self.assertEqual(tcp.dport, server_in_port)
3061 self.check_tcp_checksum(p)
3062 host_out_port = tcp.sport
3064 self.logger.error(ppp("Unexpected or invalid packet:", p))
3067 # send reply from server to host
3068 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3069 IP(src=server.ip4, dst=self.nat_addr) /
3070 TCP(sport=server_in_port, dport=host_out_port))
3071 self.pg0.add_stream(p)
3072 self.pg_enable_capture(self.pg_interfaces)
3074 capture = self.pg0.get_capture(1)
3079 self.assertEqual(ip.src, self.nat_addr)
3080 self.assertEqual(ip.dst, host.ip4)
3081 self.assertEqual(tcp.sport, server_out_port)
3082 self.assertEqual(tcp.dport, host_in_port)
3083 self.check_tcp_checksum(p)
3085 self.logger.error(ppp("Unexpected or invalid packet:"), p)
3088 def test_one_armed_nat44(self):
3089 """ One armed NAT44 """
3090 remote_host = self.pg9.remote_hosts[0]
3091 local_host = self.pg9.remote_hosts[1]
3094 self.nat44_add_address(self.nat_addr)
3095 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3096 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3100 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3101 IP(src=local_host.ip4, dst=remote_host.ip4) /
3102 TCP(sport=12345, dport=80))
3103 self.pg9.add_stream(p)
3104 self.pg_enable_capture(self.pg_interfaces)
3106 capture = self.pg9.get_capture(1)
3111 self.assertEqual(ip.src, self.nat_addr)
3112 self.assertEqual(ip.dst, remote_host.ip4)
3113 self.assertNotEqual(tcp.sport, 12345)
3114 external_port = tcp.sport
3115 self.assertEqual(tcp.dport, 80)
3116 self.check_tcp_checksum(p)
3117 self.check_ip_checksum(p)
3119 self.logger.error(ppp("Unexpected or invalid packet:", p))
3123 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3124 IP(src=remote_host.ip4, dst=self.nat_addr) /
3125 TCP(sport=80, dport=external_port))
3126 self.pg9.add_stream(p)
3127 self.pg_enable_capture(self.pg_interfaces)
3129 capture = self.pg9.get_capture(1)
3134 self.assertEqual(ip.src, remote_host.ip4)
3135 self.assertEqual(ip.dst, local_host.ip4)
3136 self.assertEqual(tcp.sport, 80)
3137 self.assertEqual(tcp.dport, 12345)
3138 self.check_tcp_checksum(p)
3139 self.check_ip_checksum(p)
3141 self.logger.error(ppp("Unexpected or invalid packet:", p))
3144 def test_del_session(self):
3145 """ Delete NAT44 session """
3146 self.nat44_add_address(self.nat_addr)
3147 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3148 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3151 pkts = self.create_stream_in(self.pg0, self.pg1)
3152 self.pg0.add_stream(pkts)
3153 self.pg_enable_capture(self.pg_interfaces)
3155 capture = self.pg1.get_capture(len(pkts))
3157 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3158 nsessions = len(sessions)
3160 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3161 sessions[0].inside_port,
3162 sessions[0].protocol)
3163 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3164 sessions[1].outside_port,
3165 sessions[1].protocol,
3168 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3169 self.assertEqual(nsessions - len(sessions), 2)
3171 def test_set_get_reass(self):
3172 """ NAT44 set/get virtual fragmentation reassembly """
3173 reas_cfg1 = self.vapi.nat_get_reass()
3175 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3176 max_reass=reas_cfg1.ip4_max_reass * 2,
3177 max_frag=reas_cfg1.ip4_max_frag * 2)
3179 reas_cfg2 = self.vapi.nat_get_reass()
3181 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3182 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3183 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3185 self.vapi.nat_set_reass(drop_frag=1)
3186 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3188 def test_frag_in_order(self):
3189 """ NAT44 translate fragments arriving in order """
3190 self.nat44_add_address(self.nat_addr)
3191 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3192 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3195 data = "A" * 4 + "B" * 16 + "C" * 3
3196 self.tcp_port_in = random.randint(1025, 65535)
3198 reass = self.vapi.nat_reass_dump()
3199 reass_n_start = len(reass)
3202 pkts = self.create_stream_frag(self.pg0,
3203 self.pg1.remote_ip4,
3207 self.pg0.add_stream(pkts)
3208 self.pg_enable_capture(self.pg_interfaces)
3210 frags = self.pg1.get_capture(len(pkts))
3211 p = self.reass_frags_and_verify(frags,
3213 self.pg1.remote_ip4)
3214 self.assertEqual(p[TCP].dport, 20)
3215 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3216 self.tcp_port_out = p[TCP].sport
3217 self.assertEqual(data, p[Raw].load)
3220 pkts = self.create_stream_frag(self.pg1,
3225 self.pg1.add_stream(pkts)
3226 self.pg_enable_capture(self.pg_interfaces)
3228 frags = self.pg0.get_capture(len(pkts))
3229 p = self.reass_frags_and_verify(frags,
3230 self.pg1.remote_ip4,
3231 self.pg0.remote_ip4)
3232 self.assertEqual(p[TCP].sport, 20)
3233 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3234 self.assertEqual(data, p[Raw].load)
3236 reass = self.vapi.nat_reass_dump()
3237 reass_n_end = len(reass)
3239 self.assertEqual(reass_n_end - reass_n_start, 2)
3241 def test_reass_hairpinning(self):
3242 """ NAT44 fragments hairpinning """
3243 host = self.pg0.remote_hosts[0]
3244 server = self.pg0.remote_hosts[1]
3245 host_in_port = random.randint(1025, 65535)
3247 server_in_port = random.randint(1025, 65535)
3248 server_out_port = random.randint(1025, 65535)
3249 data = "A" * 4 + "B" * 16 + "C" * 3
3251 self.nat44_add_address(self.nat_addr)
3252 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3253 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3255 # add static mapping for server
3256 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3257 server_in_port, server_out_port,
3258 proto=IP_PROTOS.tcp)
3260 # send packet from host to server
3261 pkts = self.create_stream_frag(self.pg0,
3266 self.pg0.add_stream(pkts)
3267 self.pg_enable_capture(self.pg_interfaces)
3269 frags = self.pg0.get_capture(len(pkts))
3270 p = self.reass_frags_and_verify(frags,
3273 self.assertNotEqual(p[TCP].sport, host_in_port)
3274 self.assertEqual(p[TCP].dport, server_in_port)
3275 self.assertEqual(data, p[Raw].load)
3277 def test_frag_out_of_order(self):
3278 """ NAT44 translate fragments arriving out of order """
3279 self.nat44_add_address(self.nat_addr)
3280 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3281 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3284 data = "A" * 4 + "B" * 16 + "C" * 3
3285 random.randint(1025, 65535)
3288 pkts = self.create_stream_frag(self.pg0,
3289 self.pg1.remote_ip4,
3294 self.pg0.add_stream(pkts)
3295 self.pg_enable_capture(self.pg_interfaces)
3297 frags = self.pg1.get_capture(len(pkts))
3298 p = self.reass_frags_and_verify(frags,
3300 self.pg1.remote_ip4)
3301 self.assertEqual(p[TCP].dport, 20)
3302 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3303 self.tcp_port_out = p[TCP].sport
3304 self.assertEqual(data, p[Raw].load)
3307 pkts = self.create_stream_frag(self.pg1,
3313 self.pg1.add_stream(pkts)
3314 self.pg_enable_capture(self.pg_interfaces)
3316 frags = self.pg0.get_capture(len(pkts))
3317 p = self.reass_frags_and_verify(frags,
3318 self.pg1.remote_ip4,
3319 self.pg0.remote_ip4)
3320 self.assertEqual(p[TCP].sport, 20)
3321 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3322 self.assertEqual(data, p[Raw].load)
3324 def test_port_restricted(self):
3325 """ Port restricted NAT44 (MAP-E CE) """
3326 self.nat44_add_address(self.nat_addr)
3327 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3328 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3330 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3331 "psid-offset 6 psid-len 6")
3333 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3334 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3335 TCP(sport=4567, dport=22))
3336 self.pg0.add_stream(p)
3337 self.pg_enable_capture(self.pg_interfaces)
3339 capture = self.pg1.get_capture(1)
3344 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3345 self.assertEqual(ip.src, self.nat_addr)
3346 self.assertEqual(tcp.dport, 22)
3347 self.assertNotEqual(tcp.sport, 4567)
3348 self.assertEqual((tcp.sport >> 6) & 63, 10)
3349 self.check_tcp_checksum(p)
3350 self.check_ip_checksum(p)
3352 self.logger.error(ppp("Unexpected or invalid packet:", p))
3355 def test_twice_nat(self):
3357 twice_nat_addr = '10.0.1.3'
3362 self.nat44_add_address(self.nat_addr)
3363 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3364 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3365 port_in, port_out, proto=IP_PROTOS.tcp,
3367 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3368 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3371 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3372 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3373 TCP(sport=eh_port_out, dport=port_out))
3374 self.pg1.add_stream(p)
3375 self.pg_enable_capture(self.pg_interfaces)
3377 capture = self.pg0.get_capture(1)
3382 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3383 self.assertEqual(ip.src, twice_nat_addr)
3384 self.assertEqual(tcp.dport, port_in)
3385 self.assertNotEqual(tcp.sport, eh_port_out)
3386 eh_port_in = tcp.sport
3387 self.check_tcp_checksum(p)
3388 self.check_ip_checksum(p)
3390 self.logger.error(ppp("Unexpected or invalid packet:", p))
3393 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3394 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3395 TCP(sport=port_in, dport=eh_port_in))
3396 self.pg0.add_stream(p)
3397 self.pg_enable_capture(self.pg_interfaces)
3399 capture = self.pg1.get_capture(1)
3404 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3405 self.assertEqual(ip.src, self.nat_addr)
3406 self.assertEqual(tcp.dport, eh_port_out)
3407 self.assertEqual(tcp.sport, port_out)
3408 self.check_tcp_checksum(p)
3409 self.check_ip_checksum(p)
3411 self.logger.error(ppp("Unexpected or invalid packet:", p))
3414 def test_twice_nat_lb(self):
3415 """ Twice NAT44 local service load balancing """
3416 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3417 twice_nat_addr = '10.0.1.3'
3422 server1 = self.pg0.remote_hosts[0]
3423 server2 = self.pg0.remote_hosts[1]
3425 locals = [{'addr': server1.ip4n,
3428 {'addr': server2.ip4n,
3432 self.nat44_add_address(self.nat_addr)
3433 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3435 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3439 local_num=len(locals),
3441 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3442 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3445 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3446 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3447 TCP(sport=eh_port_out, dport=external_port))
3448 self.pg1.add_stream(p)
3449 self.pg_enable_capture(self.pg_interfaces)
3451 capture = self.pg0.get_capture(1)
3457 self.assertEqual(ip.src, twice_nat_addr)
3458 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3459 if ip.dst == server1.ip4:
3463 self.assertNotEqual(tcp.sport, eh_port_out)
3464 eh_port_in = tcp.sport
3465 self.assertEqual(tcp.dport, local_port)
3466 self.check_tcp_checksum(p)
3467 self.check_ip_checksum(p)
3469 self.logger.error(ppp("Unexpected or invalid packet:", p))
3472 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3473 IP(src=server.ip4, dst=twice_nat_addr) /
3474 TCP(sport=local_port, dport=eh_port_in))
3475 self.pg0.add_stream(p)
3476 self.pg_enable_capture(self.pg_interfaces)
3478 capture = self.pg1.get_capture(1)
3483 self.assertEqual(ip.src, self.nat_addr)
3484 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3485 self.assertEqual(tcp.sport, external_port)
3486 self.assertEqual(tcp.dport, eh_port_out)
3487 self.check_tcp_checksum(p)
3488 self.check_ip_checksum(p)
3490 self.logger.error(ppp("Unexpected or invalid packet:", p))
3493 def test_twice_nat_interface_addr(self):
3494 """ Acquire twice NAT44 addresses from interface """
3495 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3497 # no address in NAT pool
3498 adresses = self.vapi.nat44_address_dump()
3499 self.assertEqual(0, len(adresses))
3501 # configure interface address and check NAT address pool
3502 self.pg7.config_ip4()
3503 adresses = self.vapi.nat44_address_dump()
3504 self.assertEqual(1, len(adresses))
3505 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3506 self.assertEqual(adresses[0].twice_nat, 1)
3508 # remove interface address and check NAT address pool
3509 self.pg7.unconfig_ip4()
3510 adresses = self.vapi.nat44_address_dump()
3511 self.assertEqual(0, len(adresses))
3513 def test_ipfix_max_frags(self):
3514 """ IPFIX logging maximum fragments pending reassembly exceeded """
3515 self.nat44_add_address(self.nat_addr)
3516 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3517 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3519 self.vapi.nat_set_reass(max_frag=0)
3520 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3521 src_address=self.pg3.local_ip4n,
3523 template_interval=10)
3524 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3525 src_port=self.ipfix_src_port)
3527 data = "A" * 4 + "B" * 16 + "C" * 3
3528 self.tcp_port_in = random.randint(1025, 65535)
3529 pkts = self.create_stream_frag(self.pg0,
3530 self.pg1.remote_ip4,
3534 self.pg0.add_stream(pkts[-1])
3535 self.pg_enable_capture(self.pg_interfaces)
3537 frags = self.pg1.get_capture(0)
3538 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3539 capture = self.pg3.get_capture(9)
3540 ipfix = IPFIXDecoder()
3541 # first load template
3543 self.assertTrue(p.haslayer(IPFIX))
3544 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3545 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3546 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3547 self.assertEqual(p[UDP].dport, 4739)
3548 self.assertEqual(p[IPFIX].observationDomainID,
3549 self.ipfix_domain_id)
3550 if p.haslayer(Template):
3551 ipfix.add_template(p.getlayer(Template))
3552 # verify events in data set
3554 if p.haslayer(Data):
3555 data = ipfix.decode_data_set(p.getlayer(Set))
3556 self.verify_ipfix_max_fragments_ip4(data, 0,
3557 self.pg0.remote_ip4n)
3560 super(TestNAT44, self).tearDown()
3561 if not self.vpp_dead:
3562 self.logger.info(self.vapi.cli("show nat44 verbose"))
3563 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3564 self.vapi.cli("nat addr-port-assignment-alg default")
3568 class TestNAT44Out2InDPO(MethodHolder):
3569 """ NAT44 Test Cases using out2in DPO """
3572 def setUpConstants(cls):
3573 super(TestNAT44Out2InDPO, cls).setUpConstants()
3574 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3577 def setUpClass(cls):
3578 super(TestNAT44Out2InDPO, cls).setUpClass()
3581 cls.tcp_port_in = 6303
3582 cls.tcp_port_out = 6303
3583 cls.udp_port_in = 6304
3584 cls.udp_port_out = 6304
3585 cls.icmp_id_in = 6305
3586 cls.icmp_id_out = 6305
3587 cls.nat_addr = '10.0.0.3'
3588 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3589 cls.dst_ip4 = '192.168.70.1'
3591 cls.create_pg_interfaces(range(2))
3594 cls.pg0.config_ip4()
3595 cls.pg0.resolve_arp()
3598 cls.pg1.config_ip6()
3599 cls.pg1.resolve_ndp()
3601 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3602 dst_address_length=0,
3603 next_hop_address=cls.pg1.remote_ip6n,
3604 next_hop_sw_if_index=cls.pg1.sw_if_index)
3607 super(TestNAT44Out2InDPO, cls).tearDownClass()
3610 def configure_xlat(self):
3611 self.dst_ip6_pfx = '1:2:3::'
3612 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3614 self.dst_ip6_pfx_len = 96
3615 self.src_ip6_pfx = '4:5:6::'
3616 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3618 self.src_ip6_pfx_len = 96
3619 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3620 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3621 '\x00\x00\x00\x00', 0, is_translation=1,
3624 def test_464xlat_ce(self):
3625 """ Test 464XLAT CE with NAT44 """
3627 self.configure_xlat()
3629 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3630 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3632 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3633 self.dst_ip6_pfx_len)
3634 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3635 self.src_ip6_pfx_len)
3638 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3639 self.pg0.add_stream(pkts)
3640 self.pg_enable_capture(self.pg_interfaces)
3642 capture = self.pg1.get_capture(len(pkts))
3643 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3646 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3648 self.pg1.add_stream(pkts)
3649 self.pg_enable_capture(self.pg_interfaces)
3651 capture = self.pg0.get_capture(len(pkts))
3652 self.verify_capture_in(capture, self.pg0)
3654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3656 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
3657 self.nat_addr_n, is_add=0)
3659 def test_464xlat_ce_no_nat(self):
3660 """ Test 464XLAT CE without NAT44 """
3662 self.configure_xlat()
3664 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3665 self.dst_ip6_pfx_len)
3666 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3667 self.src_ip6_pfx_len)
3669 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3670 self.pg0.add_stream(pkts)
3671 self.pg_enable_capture(self.pg_interfaces)
3673 capture = self.pg1.get_capture(len(pkts))
3674 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3675 nat_ip=out_dst_ip6, same_port=True)
3677 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3678 self.pg1.add_stream(pkts)
3679 self.pg_enable_capture(self.pg_interfaces)
3681 capture = self.pg0.get_capture(len(pkts))
3682 self.verify_capture_in(capture, self.pg0)
3685 class TestDeterministicNAT(MethodHolder):
3686 """ Deterministic NAT Test Cases """
3689 def setUpConstants(cls):
3690 super(TestDeterministicNAT, cls).setUpConstants()
3691 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
3694 def setUpClass(cls):
3695 super(TestDeterministicNAT, cls).setUpClass()
3698 cls.tcp_port_in = 6303
3699 cls.tcp_external_port = 6303
3700 cls.udp_port_in = 6304
3701 cls.udp_external_port = 6304
3702 cls.icmp_id_in = 6305
3703 cls.nat_addr = '10.0.0.3'
3705 cls.create_pg_interfaces(range(3))
3706 cls.interfaces = list(cls.pg_interfaces)
3708 for i in cls.interfaces:
3713 cls.pg0.generate_remote_hosts(2)
3714 cls.pg0.configure_ipv4_neighbors()
3717 super(TestDeterministicNAT, cls).tearDownClass()
3720 def create_stream_in(self, in_if, out_if, ttl=64):
3722 Create packet stream for inside network
3724 :param in_if: Inside interface
3725 :param out_if: Outside interface
3726 :param ttl: TTL of generated packets
3730 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3731 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3732 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
3736 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3737 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3738 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
3742 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
3743 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
3744 ICMP(id=self.icmp_id_in, type='echo-request'))
3749 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
3751 Create packet stream for outside network
3753 :param out_if: Outside interface
3754 :param dst_ip: Destination IP address (Default use global NAT address)
3755 :param ttl: TTL of generated packets
3758 dst_ip = self.nat_addr
3761 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3762 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3763 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
3767 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3768 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3769 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
3773 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
3774 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
3775 ICMP(id=self.icmp_external_id, type='echo-reply'))
3780 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
3782 Verify captured packets on outside network
3784 :param capture: Captured packets
3785 :param nat_ip: Translated IP address (Default use global NAT address)
3786 :param same_port: Sorce port number is not translated (Default False)
3787 :param packet_num: Expected number of packets (Default 3)
3790 nat_ip = self.nat_addr
3791 self.assertEqual(packet_num, len(capture))
3792 for packet in capture:
3794 self.assertEqual(packet[IP].src, nat_ip)
3795 if packet.haslayer(TCP):
3796 self.tcp_port_out = packet[TCP].sport
3797 elif packet.haslayer(UDP):
3798 self.udp_port_out = packet[UDP].sport
3800 self.icmp_external_id = packet[ICMP].id
3802 self.logger.error(ppp("Unexpected or invalid packet "
3803 "(outside network):", packet))
3806 def initiate_tcp_session(self, in_if, out_if):
3808 Initiates TCP session
3810 :param in_if: Inside interface
3811 :param out_if: Outside interface
3814 # SYN packet in->out
3815 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3816 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3817 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3820 self.pg_enable_capture(self.pg_interfaces)
3822 capture = out_if.get_capture(1)
3824 self.tcp_port_out = p[TCP].sport
3826 # SYN + ACK packet out->in
3827 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
3828 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
3829 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
3831 out_if.add_stream(p)
3832 self.pg_enable_capture(self.pg_interfaces)
3834 in_if.get_capture(1)
3836 # ACK packet in->out
3837 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
3838 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
3839 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3842 self.pg_enable_capture(self.pg_interfaces)
3844 out_if.get_capture(1)
3847 self.logger.error("TCP 3 way handshake failed")
3850 def verify_ipfix_max_entries_per_user(self, data):
3852 Verify IPFIX maximum entries per user exceeded event
3854 :param data: Decoded IPFIX data records
3856 self.assertEqual(1, len(data))
3859 self.assertEqual(ord(record[230]), 13)
3860 # natQuotaExceededEvent
3861 self.assertEqual('\x03\x00\x00\x00', record[466])
3863 self.assertEqual('\xe8\x03\x00\x00', record[473])
3865 self.assertEqual(self.pg0.remote_ip4n, record[8])
3867 def test_deterministic_mode(self):
3868 """ NAT plugin run deterministic mode """
3869 in_addr = '172.16.255.0'
3870 out_addr = '172.17.255.50'
3871 in_addr_t = '172.16.255.20'
3872 in_addr_n = socket.inet_aton(in_addr)
3873 out_addr_n = socket.inet_aton(out_addr)
3874 in_addr_t_n = socket.inet_aton(in_addr_t)
3878 nat_config = self.vapi.nat_show_config()
3879 self.assertEqual(1, nat_config.deterministic)
3881 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
3883 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
3884 self.assertEqual(rep1.out_addr[:4], out_addr_n)
3885 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
3886 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
3888 deterministic_mappings = self.vapi.nat_det_map_dump()
3889 self.assertEqual(len(deterministic_mappings), 1)
3890 dsm = deterministic_mappings[0]
3891 self.assertEqual(in_addr_n, dsm.in_addr[:4])
3892 self.assertEqual(in_plen, dsm.in_plen)
3893 self.assertEqual(out_addr_n, dsm.out_addr[:4])
3894 self.assertEqual(out_plen, dsm.out_plen)
3896 self.clear_nat_det()
3897 deterministic_mappings = self.vapi.nat_det_map_dump()
3898 self.assertEqual(len(deterministic_mappings), 0)
3900 def test_set_timeouts(self):
3901 """ Set deterministic NAT timeouts """
3902 timeouts_before = self.vapi.nat_det_get_timeouts()
3904 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
3905 timeouts_before.tcp_established + 10,
3906 timeouts_before.tcp_transitory + 10,
3907 timeouts_before.icmp + 10)
3909 timeouts_after = self.vapi.nat_det_get_timeouts()
3911 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
3912 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
3913 self.assertNotEqual(timeouts_before.tcp_established,
3914 timeouts_after.tcp_established)
3915 self.assertNotEqual(timeouts_before.tcp_transitory,
3916 timeouts_after.tcp_transitory)
3918 def test_det_in(self):
3919 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
3921 nat_ip = "10.0.0.10"
3923 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
3925 socket.inet_aton(nat_ip),
3927 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3928 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3932 pkts = self.create_stream_in(self.pg0, self.pg1)
3933 self.pg0.add_stream(pkts)
3934 self.pg_enable_capture(self.pg_interfaces)
3936 capture = self.pg1.get_capture(len(pkts))
3937 self.verify_capture_out(capture, nat_ip)
3940 pkts = self.create_stream_out(self.pg1, nat_ip)
3941 self.pg1.add_stream(pkts)
3942 self.pg_enable_capture(self.pg_interfaces)
3944 capture = self.pg0.get_capture(len(pkts))
3945 self.verify_capture_in(capture, self.pg0)
3948 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
3949 self.assertEqual(len(sessions), 3)
3953 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3954 self.assertEqual(s.in_port, self.tcp_port_in)
3955 self.assertEqual(s.out_port, self.tcp_port_out)
3956 self.assertEqual(s.ext_port, self.tcp_external_port)
3960 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3961 self.assertEqual(s.in_port, self.udp_port_in)
3962 self.assertEqual(s.out_port, self.udp_port_out)
3963 self.assertEqual(s.ext_port, self.udp_external_port)
3967 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
3968 self.assertEqual(s.in_port, self.icmp_id_in)
3969 self.assertEqual(s.out_port, self.icmp_external_id)
3971 def test_multiple_users(self):
3972 """ Deterministic NAT multiple users """
3974 nat_ip = "10.0.0.10"
3976 external_port = 6303
3978 host0 = self.pg0.remote_hosts[0]
3979 host1 = self.pg0.remote_hosts[1]
3981 self.vapi.nat_det_add_del_map(host0.ip4n,
3983 socket.inet_aton(nat_ip),
3985 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3986 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3990 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
3991 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
3992 TCP(sport=port_in, dport=external_port))
3993 self.pg0.add_stream(p)
3994 self.pg_enable_capture(self.pg_interfaces)
3996 capture = self.pg1.get_capture(1)
4001 self.assertEqual(ip.src, nat_ip)
4002 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4003 self.assertEqual(tcp.dport, external_port)
4004 port_out0 = tcp.sport
4006 self.logger.error(ppp("Unexpected or invalid packet:", p))
4010 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4011 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4012 TCP(sport=port_in, dport=external_port))
4013 self.pg0.add_stream(p)
4014 self.pg_enable_capture(self.pg_interfaces)
4016 capture = self.pg1.get_capture(1)
4021 self.assertEqual(ip.src, nat_ip)
4022 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4023 self.assertEqual(tcp.dport, external_port)
4024 port_out1 = tcp.sport
4026 self.logger.error(ppp("Unexpected or invalid packet:", p))
4029 dms = self.vapi.nat_det_map_dump()
4030 self.assertEqual(1, len(dms))
4031 self.assertEqual(2, dms[0].ses_num)
4034 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4035 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4036 TCP(sport=external_port, dport=port_out0))
4037 self.pg1.add_stream(p)
4038 self.pg_enable_capture(self.pg_interfaces)
4040 capture = self.pg0.get_capture(1)
4045 self.assertEqual(ip.src, self.pg1.remote_ip4)
4046 self.assertEqual(ip.dst, host0.ip4)
4047 self.assertEqual(tcp.dport, port_in)
4048 self.assertEqual(tcp.sport, external_port)
4050 self.logger.error(ppp("Unexpected or invalid packet:", p))
4054 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4055 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4056 TCP(sport=external_port, dport=port_out1))
4057 self.pg1.add_stream(p)
4058 self.pg_enable_capture(self.pg_interfaces)
4060 capture = self.pg0.get_capture(1)
4065 self.assertEqual(ip.src, self.pg1.remote_ip4)
4066 self.assertEqual(ip.dst, host1.ip4)
4067 self.assertEqual(tcp.dport, port_in)
4068 self.assertEqual(tcp.sport, external_port)
4070 self.logger.error(ppp("Unexpected or invalid packet", p))
4073 # session close api test
4074 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4076 self.pg1.remote_ip4n,
4078 dms = self.vapi.nat_det_map_dump()
4079 self.assertEqual(dms[0].ses_num, 1)
4081 self.vapi.nat_det_close_session_in(host0.ip4n,
4083 self.pg1.remote_ip4n,
4085 dms = self.vapi.nat_det_map_dump()
4086 self.assertEqual(dms[0].ses_num, 0)
4088 def test_tcp_session_close_detection_in(self):
4089 """ Deterministic NAT TCP session close from inside network """
4090 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4092 socket.inet_aton(self.nat_addr),
4094 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4095 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4098 self.initiate_tcp_session(self.pg0, self.pg1)
4100 # close the session from inside
4102 # FIN packet in -> out
4103 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4104 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4105 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4107 self.pg0.add_stream(p)
4108 self.pg_enable_capture(self.pg_interfaces)
4110 self.pg1.get_capture(1)
4114 # ACK packet out -> in
4115 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4116 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4117 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4121 # FIN packet out -> in
4122 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4123 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4124 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4128 self.pg1.add_stream(pkts)
4129 self.pg_enable_capture(self.pg_interfaces)
4131 self.pg0.get_capture(2)
4133 # ACK packet in -> out
4134 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4135 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4136 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4138 self.pg0.add_stream(p)
4139 self.pg_enable_capture(self.pg_interfaces)
4141 self.pg1.get_capture(1)
4143 # Check if deterministic NAT44 closed the session
4144 dms = self.vapi.nat_det_map_dump()
4145 self.assertEqual(0, dms[0].ses_num)
4147 self.logger.error("TCP session termination failed")
4150 def test_tcp_session_close_detection_out(self):
4151 """ Deterministic NAT TCP session close from outside network """
4152 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4154 socket.inet_aton(self.nat_addr),
4156 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4157 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4160 self.initiate_tcp_session(self.pg0, self.pg1)
4162 # close the session from outside
4164 # FIN packet out -> in
4165 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4166 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4167 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4169 self.pg1.add_stream(p)
4170 self.pg_enable_capture(self.pg_interfaces)
4172 self.pg0.get_capture(1)
4176 # ACK packet in -> out
4177 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4178 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4179 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4183 # ACK packet in -> out
4184 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4185 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4186 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4190 self.pg0.add_stream(pkts)
4191 self.pg_enable_capture(self.pg_interfaces)
4193 self.pg1.get_capture(2)
4195 # ACK packet out -> in
4196 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4197 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4198 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4200 self.pg1.add_stream(p)
4201 self.pg_enable_capture(self.pg_interfaces)
4203 self.pg0.get_capture(1)
4205 # Check if deterministic NAT44 closed the session
4206 dms = self.vapi.nat_det_map_dump()
4207 self.assertEqual(0, dms[0].ses_num)
4209 self.logger.error("TCP session termination failed")
4212 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4213 def test_session_timeout(self):
4214 """ Deterministic NAT session timeouts """
4215 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4217 socket.inet_aton(self.nat_addr),
4219 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4220 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4223 self.initiate_tcp_session(self.pg0, self.pg1)
4224 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4225 pkts = self.create_stream_in(self.pg0, self.pg1)
4226 self.pg0.add_stream(pkts)
4227 self.pg_enable_capture(self.pg_interfaces)
4229 capture = self.pg1.get_capture(len(pkts))
4232 dms = self.vapi.nat_det_map_dump()
4233 self.assertEqual(0, dms[0].ses_num)
4235 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4236 def test_session_limit_per_user(self):
4237 """ Deterministic NAT maximum sessions per user limit """
4238 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4240 socket.inet_aton(self.nat_addr),
4242 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4243 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4245 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4246 src_address=self.pg2.local_ip4n,
4248 template_interval=10)
4249 self.vapi.nat_ipfix()
4252 for port in range(1025, 2025):
4253 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4254 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4255 UDP(sport=port, dport=port))
4258 self.pg0.add_stream(pkts)
4259 self.pg_enable_capture(self.pg_interfaces)
4261 capture = self.pg1.get_capture(len(pkts))
4263 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4264 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4265 UDP(sport=3001, dport=3002))
4266 self.pg0.add_stream(p)
4267 self.pg_enable_capture(self.pg_interfaces)
4269 capture = self.pg1.assert_nothing_captured()
4271 # verify ICMP error packet
4272 capture = self.pg0.get_capture(1)
4274 self.assertTrue(p.haslayer(ICMP))
4276 self.assertEqual(icmp.type, 3)
4277 self.assertEqual(icmp.code, 1)
4278 self.assertTrue(icmp.haslayer(IPerror))
4279 inner_ip = icmp[IPerror]
4280 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4281 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4283 dms = self.vapi.nat_det_map_dump()
4285 self.assertEqual(1000, dms[0].ses_num)
4287 # verify IPFIX logging
4288 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4290 capture = self.pg2.get_capture(2)
4291 ipfix = IPFIXDecoder()
4292 # first load template
4294 self.assertTrue(p.haslayer(IPFIX))
4295 if p.haslayer(Template):
4296 ipfix.add_template(p.getlayer(Template))
4297 # verify events in data set
4299 if p.haslayer(Data):
4300 data = ipfix.decode_data_set(p.getlayer(Set))
4301 self.verify_ipfix_max_entries_per_user(data)
4303 def clear_nat_det(self):
4305 Clear deterministic NAT configuration.
4307 self.vapi.nat_ipfix(enable=0)
4308 self.vapi.nat_det_set_timeouts()
4309 deterministic_mappings = self.vapi.nat_det_map_dump()
4310 for dsm in deterministic_mappings:
4311 self.vapi.nat_det_add_del_map(dsm.in_addr,
4317 interfaces = self.vapi.nat44_interface_dump()
4318 for intf in interfaces:
4319 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4324 super(TestDeterministicNAT, self).tearDown()
4325 if not self.vpp_dead:
4326 self.logger.info(self.vapi.cli("show nat44 detail"))
4327 self.clear_nat_det()
4330 class TestNAT64(MethodHolder):
4331 """ NAT64 Test Cases """
4334 def setUpConstants(cls):
4335 super(TestNAT64, cls).setUpConstants()
4336 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4337 "nat64 st hash buckets 256", "}"])
4340 def setUpClass(cls):
4341 super(TestNAT64, cls).setUpClass()
4344 cls.tcp_port_in = 6303
4345 cls.tcp_port_out = 6303
4346 cls.udp_port_in = 6304
4347 cls.udp_port_out = 6304
4348 cls.icmp_id_in = 6305
4349 cls.icmp_id_out = 6305
4350 cls.nat_addr = '10.0.0.3'
4351 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4353 cls.vrf1_nat_addr = '10.0.10.3'
4354 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4356 cls.ipfix_src_port = 4739
4357 cls.ipfix_domain_id = 1
4359 cls.create_pg_interfaces(range(5))
4360 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4361 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4362 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4364 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4366 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4368 cls.pg0.generate_remote_hosts(2)
4370 for i in cls.ip6_interfaces:
4373 i.configure_ipv6_neighbors()
4375 for i in cls.ip4_interfaces:
4381 cls.pg3.config_ip4()
4382 cls.pg3.resolve_arp()
4383 cls.pg3.config_ip6()
4384 cls.pg3.configure_ipv6_neighbors()
4387 super(TestNAT64, cls).tearDownClass()
4390 def test_pool(self):
4391 """ Add/delete address to NAT64 pool """
4392 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4394 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4396 addresses = self.vapi.nat64_pool_addr_dump()
4397 self.assertEqual(len(addresses), 1)
4398 self.assertEqual(addresses[0].address, nat_addr)
4400 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4402 addresses = self.vapi.nat64_pool_addr_dump()
4403 self.assertEqual(len(addresses), 0)
4405 def test_interface(self):
4406 """ Enable/disable NAT64 feature on the interface """
4407 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4408 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4410 interfaces = self.vapi.nat64_interface_dump()
4411 self.assertEqual(len(interfaces), 2)
4414 for intf in interfaces:
4415 if intf.sw_if_index == self.pg0.sw_if_index:
4416 self.assertEqual(intf.is_inside, 1)
4418 elif intf.sw_if_index == self.pg1.sw_if_index:
4419 self.assertEqual(intf.is_inside, 0)
4421 self.assertTrue(pg0_found)
4422 self.assertTrue(pg1_found)
4424 features = self.vapi.cli("show interface features pg0")
4425 self.assertNotEqual(features.find('nat64-in2out'), -1)
4426 features = self.vapi.cli("show interface features pg1")
4427 self.assertNotEqual(features.find('nat64-out2in'), -1)
4429 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4430 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4432 interfaces = self.vapi.nat64_interface_dump()
4433 self.assertEqual(len(interfaces), 0)
4435 def test_static_bib(self):
4436 """ Add/delete static BIB entry """
4437 in_addr = socket.inet_pton(socket.AF_INET6,
4438 '2001:db8:85a3::8a2e:370:7334')
4439 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4442 proto = IP_PROTOS.tcp
4444 self.vapi.nat64_add_del_static_bib(in_addr,
4449 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4454 self.assertEqual(bibe.i_addr, in_addr)
4455 self.assertEqual(bibe.o_addr, out_addr)
4456 self.assertEqual(bibe.i_port, in_port)
4457 self.assertEqual(bibe.o_port, out_port)
4458 self.assertEqual(static_bib_num, 1)
4460 self.vapi.nat64_add_del_static_bib(in_addr,
4466 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4471 self.assertEqual(static_bib_num, 0)
4473 def test_set_timeouts(self):
4474 """ Set NAT64 timeouts """
4475 # verify default values
4476 timeouts = self.vapi.nat64_get_timeouts()
4477 self.assertEqual(timeouts.udp, 300)
4478 self.assertEqual(timeouts.icmp, 60)
4479 self.assertEqual(timeouts.tcp_trans, 240)
4480 self.assertEqual(timeouts.tcp_est, 7440)
4481 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4483 # set and verify custom values
4484 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4485 tcp_est=7450, tcp_incoming_syn=10)
4486 timeouts = self.vapi.nat64_get_timeouts()
4487 self.assertEqual(timeouts.udp, 200)
4488 self.assertEqual(timeouts.icmp, 30)
4489 self.assertEqual(timeouts.tcp_trans, 250)
4490 self.assertEqual(timeouts.tcp_est, 7450)
4491 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4493 def test_dynamic(self):
4494 """ NAT64 dynamic translation test """
4495 self.tcp_port_in = 6303
4496 self.udp_port_in = 6304
4497 self.icmp_id_in = 6305
4499 ses_num_start = self.nat64_get_ses_num()
4501 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4503 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4504 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4507 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4508 self.pg0.add_stream(pkts)
4509 self.pg_enable_capture(self.pg_interfaces)
4511 capture = self.pg1.get_capture(len(pkts))
4512 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4513 dst_ip=self.pg1.remote_ip4)
4516 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4517 self.pg1.add_stream(pkts)
4518 self.pg_enable_capture(self.pg_interfaces)
4520 capture = self.pg0.get_capture(len(pkts))
4521 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4522 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4525 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4526 self.pg0.add_stream(pkts)
4527 self.pg_enable_capture(self.pg_interfaces)
4529 capture = self.pg1.get_capture(len(pkts))
4530 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4531 dst_ip=self.pg1.remote_ip4)
4534 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4535 self.pg1.add_stream(pkts)
4536 self.pg_enable_capture(self.pg_interfaces)
4538 capture = self.pg0.get_capture(len(pkts))
4539 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4541 ses_num_end = self.nat64_get_ses_num()
4543 self.assertEqual(ses_num_end - ses_num_start, 3)
4545 # tenant with specific VRF
4546 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4547 self.vrf1_nat_addr_n,
4548 vrf_id=self.vrf1_id)
4549 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4551 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4552 self.pg2.add_stream(pkts)
4553 self.pg_enable_capture(self.pg_interfaces)
4555 capture = self.pg1.get_capture(len(pkts))
4556 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4557 dst_ip=self.pg1.remote_ip4)
4559 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4560 self.pg1.add_stream(pkts)
4561 self.pg_enable_capture(self.pg_interfaces)
4563 capture = self.pg2.get_capture(len(pkts))
4564 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4566 def test_static(self):
4567 """ NAT64 static translation test """
4568 self.tcp_port_in = 60303
4569 self.udp_port_in = 60304
4570 self.icmp_id_in = 60305
4571 self.tcp_port_out = 60303
4572 self.udp_port_out = 60304
4573 self.icmp_id_out = 60305
4575 ses_num_start = self.nat64_get_ses_num()
4577 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4579 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4580 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4582 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4587 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4592 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4599 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4600 self.pg0.add_stream(pkts)
4601 self.pg_enable_capture(self.pg_interfaces)
4603 capture = self.pg1.get_capture(len(pkts))
4604 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4605 dst_ip=self.pg1.remote_ip4, same_port=True)
4608 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4609 self.pg1.add_stream(pkts)
4610 self.pg_enable_capture(self.pg_interfaces)
4612 capture = self.pg0.get_capture(len(pkts))
4613 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4614 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4616 ses_num_end = self.nat64_get_ses_num()
4618 self.assertEqual(ses_num_end - ses_num_start, 3)
4620 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4621 def test_session_timeout(self):
4622 """ NAT64 session timeout """
4623 self.icmp_id_in = 1234
4624 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4626 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4627 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4628 self.vapi.nat64_set_timeouts(icmp=5)
4630 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4631 self.pg0.add_stream(pkts)
4632 self.pg_enable_capture(self.pg_interfaces)
4634 capture = self.pg1.get_capture(len(pkts))
4636 ses_num_before_timeout = self.nat64_get_ses_num()
4640 # ICMP session after timeout
4641 ses_num_after_timeout = self.nat64_get_ses_num()
4642 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
4644 def test_icmp_error(self):
4645 """ NAT64 ICMP Error message translation """
4646 self.tcp_port_in = 6303
4647 self.udp_port_in = 6304
4648 self.icmp_id_in = 6305
4650 ses_num_start = self.nat64_get_ses_num()
4652 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4654 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4655 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4657 # send some packets to create sessions
4658 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4659 self.pg0.add_stream(pkts)
4660 self.pg_enable_capture(self.pg_interfaces)
4662 capture_ip4 = self.pg1.get_capture(len(pkts))
4663 self.verify_capture_out(capture_ip4,
4664 nat_ip=self.nat_addr,
4665 dst_ip=self.pg1.remote_ip4)
4667 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4668 self.pg1.add_stream(pkts)
4669 self.pg_enable_capture(self.pg_interfaces)
4671 capture_ip6 = self.pg0.get_capture(len(pkts))
4672 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4673 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
4674 self.pg0.remote_ip6)
4677 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4678 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
4679 ICMPv6DestUnreach(code=1) /
4680 packet[IPv6] for packet in capture_ip6]
4681 self.pg0.add_stream(pkts)
4682 self.pg_enable_capture(self.pg_interfaces)
4684 capture = self.pg1.get_capture(len(pkts))
4685 for packet in capture:
4687 self.assertEqual(packet[IP].src, self.nat_addr)
4688 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4689 self.assertEqual(packet[ICMP].type, 3)
4690 self.assertEqual(packet[ICMP].code, 13)
4691 inner = packet[IPerror]
4692 self.assertEqual(inner.src, self.pg1.remote_ip4)
4693 self.assertEqual(inner.dst, self.nat_addr)
4694 self.check_icmp_checksum(packet)
4695 if inner.haslayer(TCPerror):
4696 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
4697 elif inner.haslayer(UDPerror):
4698 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
4700 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
4702 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4706 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4707 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4708 ICMP(type=3, code=13) /
4709 packet[IP] for packet in capture_ip4]
4710 self.pg1.add_stream(pkts)
4711 self.pg_enable_capture(self.pg_interfaces)
4713 capture = self.pg0.get_capture(len(pkts))
4714 for packet in capture:
4716 self.assertEqual(packet[IPv6].src, ip.src)
4717 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4718 icmp = packet[ICMPv6DestUnreach]
4719 self.assertEqual(icmp.code, 1)
4720 inner = icmp[IPerror6]
4721 self.assertEqual(inner.src, self.pg0.remote_ip6)
4722 self.assertEqual(inner.dst, ip.src)
4723 self.check_icmpv6_checksum(packet)
4724 if inner.haslayer(TCPerror):
4725 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
4726 elif inner.haslayer(UDPerror):
4727 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
4729 self.assertEqual(inner[ICMPv6EchoRequest].id,
4732 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4735 def test_hairpinning(self):
4736 """ NAT64 hairpinning """
4738 client = self.pg0.remote_hosts[0]
4739 server = self.pg0.remote_hosts[1]
4740 server_tcp_in_port = 22
4741 server_tcp_out_port = 4022
4742 server_udp_in_port = 23
4743 server_udp_out_port = 4023
4744 client_tcp_in_port = 1234
4745 client_udp_in_port = 1235
4746 client_tcp_out_port = 0
4747 client_udp_out_port = 0
4748 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
4749 nat_addr_ip6 = ip.src
4751 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4753 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4754 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4756 self.vapi.nat64_add_del_static_bib(server.ip6n,
4759 server_tcp_out_port,
4761 self.vapi.nat64_add_del_static_bib(server.ip6n,
4764 server_udp_out_port,
4769 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4770 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4771 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
4773 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4774 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4775 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
4777 self.pg0.add_stream(pkts)
4778 self.pg_enable_capture(self.pg_interfaces)
4780 capture = self.pg0.get_capture(len(pkts))
4781 for packet in capture:
4783 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4784 self.assertEqual(packet[IPv6].dst, server.ip6)
4785 if packet.haslayer(TCP):
4786 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
4787 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
4788 self.check_tcp_checksum(packet)
4789 client_tcp_out_port = packet[TCP].sport
4791 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
4792 self.assertEqual(packet[UDP].dport, server_udp_in_port)
4793 self.check_udp_checksum(packet)
4794 client_udp_out_port = packet[UDP].sport
4796 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4801 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4802 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4803 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
4805 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4806 IPv6(src=server.ip6, dst=nat_addr_ip6) /
4807 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
4809 self.pg0.add_stream(pkts)
4810 self.pg_enable_capture(self.pg_interfaces)
4812 capture = self.pg0.get_capture(len(pkts))
4813 for packet in capture:
4815 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4816 self.assertEqual(packet[IPv6].dst, client.ip6)
4817 if packet.haslayer(TCP):
4818 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
4819 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
4820 self.check_tcp_checksum(packet)
4822 self.assertEqual(packet[UDP].sport, server_udp_out_port)
4823 self.assertEqual(packet[UDP].dport, client_udp_in_port)
4824 self.check_udp_checksum(packet)
4826 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4831 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4832 IPv6(src=client.ip6, dst=nat_addr_ip6) /
4833 ICMPv6DestUnreach(code=1) /
4834 packet[IPv6] for packet in capture]
4835 self.pg0.add_stream(pkts)
4836 self.pg_enable_capture(self.pg_interfaces)
4838 capture = self.pg0.get_capture(len(pkts))
4839 for packet in capture:
4841 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
4842 self.assertEqual(packet[IPv6].dst, server.ip6)
4843 icmp = packet[ICMPv6DestUnreach]
4844 self.assertEqual(icmp.code, 1)
4845 inner = icmp[IPerror6]
4846 self.assertEqual(inner.src, server.ip6)
4847 self.assertEqual(inner.dst, nat_addr_ip6)
4848 self.check_icmpv6_checksum(packet)
4849 if inner.haslayer(TCPerror):
4850 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
4851 self.assertEqual(inner[TCPerror].dport,
4852 client_tcp_out_port)
4854 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
4855 self.assertEqual(inner[UDPerror].dport,
4856 client_udp_out_port)
4858 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4861 def test_prefix(self):
4862 """ NAT64 Network-Specific Prefix """
4864 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4866 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4867 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4868 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4869 self.vrf1_nat_addr_n,
4870 vrf_id=self.vrf1_id)
4871 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4874 global_pref64 = "2001:db8::"
4875 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
4876 global_pref64_len = 32
4877 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
4879 prefix = self.vapi.nat64_prefix_dump()
4880 self.assertEqual(len(prefix), 1)
4881 self.assertEqual(prefix[0].prefix, global_pref64_n)
4882 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
4883 self.assertEqual(prefix[0].vrf_id, 0)
4885 # Add tenant specific prefix
4886 vrf1_pref64 = "2001:db8:122:300::"
4887 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
4888 vrf1_pref64_len = 56
4889 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
4891 vrf_id=self.vrf1_id)
4892 prefix = self.vapi.nat64_prefix_dump()
4893 self.assertEqual(len(prefix), 2)
4896 pkts = self.create_stream_in_ip6(self.pg0,
4899 plen=global_pref64_len)
4900 self.pg0.add_stream(pkts)
4901 self.pg_enable_capture(self.pg_interfaces)
4903 capture = self.pg1.get_capture(len(pkts))
4904 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4905 dst_ip=self.pg1.remote_ip4)
4907 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4908 self.pg1.add_stream(pkts)
4909 self.pg_enable_capture(self.pg_interfaces)
4911 capture = self.pg0.get_capture(len(pkts))
4912 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4915 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
4917 # Tenant specific prefix
4918 pkts = self.create_stream_in_ip6(self.pg2,
4921 plen=vrf1_pref64_len)
4922 self.pg2.add_stream(pkts)
4923 self.pg_enable_capture(self.pg_interfaces)
4925 capture = self.pg1.get_capture(len(pkts))
4926 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4927 dst_ip=self.pg1.remote_ip4)
4929 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4930 self.pg1.add_stream(pkts)
4931 self.pg_enable_capture(self.pg_interfaces)
4933 capture = self.pg2.get_capture(len(pkts))
4934 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
4937 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
4939 def test_unknown_proto(self):
4940 """ NAT64 translate packet with unknown protocol """
4942 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4944 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4945 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4946 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
4949 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4950 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
4951 TCP(sport=self.tcp_port_in, dport=20))
4952 self.pg0.add_stream(p)
4953 self.pg_enable_capture(self.pg_interfaces)
4955 p = self.pg1.get_capture(1)
4957 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4958 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
4960 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
4961 TCP(sport=1234, dport=1234))
4962 self.pg0.add_stream(p)
4963 self.pg_enable_capture(self.pg_interfaces)
4965 p = self.pg1.get_capture(1)
4968 self.assertEqual(packet[IP].src, self.nat_addr)
4969 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
4970 self.assertTrue(packet.haslayer(GRE))
4971 self.check_ip_checksum(packet)
4973 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4977 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
4978 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4980 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
4981 TCP(sport=1234, dport=1234))
4982 self.pg1.add_stream(p)
4983 self.pg_enable_capture(self.pg_interfaces)
4985 p = self.pg0.get_capture(1)
4988 self.assertEqual(packet[IPv6].src, remote_ip6)
4989 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
4990 self.assertEqual(packet[IPv6].nh, 47)
4992 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4995 def test_hairpinning_unknown_proto(self):
4996 """ NAT64 translate packet with unknown protocol - hairpinning """
4998 client = self.pg0.remote_hosts[0]
4999 server = self.pg0.remote_hosts[1]
5000 server_tcp_in_port = 22
5001 server_tcp_out_port = 4022
5002 client_tcp_in_port = 1234
5003 client_tcp_out_port = 1235
5004 server_nat_ip = "10.0.0.100"
5005 client_nat_ip = "10.0.0.110"
5006 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5007 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5008 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5009 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5011 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5013 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5014 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5016 self.vapi.nat64_add_del_static_bib(server.ip6n,
5019 server_tcp_out_port,
5022 self.vapi.nat64_add_del_static_bib(server.ip6n,
5028 self.vapi.nat64_add_del_static_bib(client.ip6n,
5031 client_tcp_out_port,
5035 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5036 IPv6(src=client.ip6, dst=server_nat_ip6) /
5037 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5038 self.pg0.add_stream(p)
5039 self.pg_enable_capture(self.pg_interfaces)
5041 p = self.pg0.get_capture(1)
5043 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5044 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5046 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5047 TCP(sport=1234, dport=1234))
5048 self.pg0.add_stream(p)
5049 self.pg_enable_capture(self.pg_interfaces)
5051 p = self.pg0.get_capture(1)
5054 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5055 self.assertEqual(packet[IPv6].dst, server.ip6)
5056 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5058 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5062 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5063 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5065 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5066 TCP(sport=1234, dport=1234))
5067 self.pg0.add_stream(p)
5068 self.pg_enable_capture(self.pg_interfaces)
5070 p = self.pg0.get_capture(1)
5073 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5074 self.assertEqual(packet[IPv6].dst, client.ip6)
5075 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5077 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5080 def test_one_armed_nat64(self):
5081 """ One armed NAT64 """
5083 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5087 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5089 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5090 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5093 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5094 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5095 TCP(sport=12345, dport=80))
5096 self.pg3.add_stream(p)
5097 self.pg_enable_capture(self.pg_interfaces)
5099 capture = self.pg3.get_capture(1)
5104 self.assertEqual(ip.src, self.nat_addr)
5105 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5106 self.assertNotEqual(tcp.sport, 12345)
5107 external_port = tcp.sport
5108 self.assertEqual(tcp.dport, 80)
5109 self.check_tcp_checksum(p)
5110 self.check_ip_checksum(p)
5112 self.logger.error(ppp("Unexpected or invalid packet:", p))
5116 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5117 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5118 TCP(sport=80, dport=external_port))
5119 self.pg3.add_stream(p)
5120 self.pg_enable_capture(self.pg_interfaces)
5122 capture = self.pg3.get_capture(1)
5127 self.assertEqual(ip.src, remote_host_ip6)
5128 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5129 self.assertEqual(tcp.sport, 80)
5130 self.assertEqual(tcp.dport, 12345)
5131 self.check_tcp_checksum(p)
5133 self.logger.error(ppp("Unexpected or invalid packet:", p))
5136 def test_frag_in_order(self):
5137 """ NAT64 translate fragments arriving in order """
5138 self.tcp_port_in = random.randint(1025, 65535)
5140 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5142 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5143 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5145 reass = self.vapi.nat_reass_dump()
5146 reass_n_start = len(reass)
5150 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5151 self.tcp_port_in, 20, data)
5152 self.pg0.add_stream(pkts)
5153 self.pg_enable_capture(self.pg_interfaces)
5155 frags = self.pg1.get_capture(len(pkts))
5156 p = self.reass_frags_and_verify(frags,
5158 self.pg1.remote_ip4)
5159 self.assertEqual(p[TCP].dport, 20)
5160 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5161 self.tcp_port_out = p[TCP].sport
5162 self.assertEqual(data, p[Raw].load)
5165 data = "A" * 4 + "b" * 16 + "C" * 3
5166 pkts = self.create_stream_frag(self.pg1,
5171 self.pg1.add_stream(pkts)
5172 self.pg_enable_capture(self.pg_interfaces)
5174 frags = self.pg0.get_capture(len(pkts))
5175 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5176 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5177 self.assertEqual(p[TCP].sport, 20)
5178 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5179 self.assertEqual(data, p[Raw].load)
5181 reass = self.vapi.nat_reass_dump()
5182 reass_n_end = len(reass)
5184 self.assertEqual(reass_n_end - reass_n_start, 2)
5186 def test_reass_hairpinning(self):
5187 """ NAT64 fragments hairpinning """
5189 client = self.pg0.remote_hosts[0]
5190 server = self.pg0.remote_hosts[1]
5191 server_in_port = random.randint(1025, 65535)
5192 server_out_port = random.randint(1025, 65535)
5193 client_in_port = random.randint(1025, 65535)
5194 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5195 nat_addr_ip6 = ip.src
5197 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5199 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5200 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5202 # add static BIB entry for server
5203 self.vapi.nat64_add_del_static_bib(server.ip6n,
5209 # send packet from host to server
5210 pkts = self.create_stream_frag_ip6(self.pg0,
5215 self.pg0.add_stream(pkts)
5216 self.pg_enable_capture(self.pg_interfaces)
5218 frags = self.pg0.get_capture(len(pkts))
5219 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5220 self.assertNotEqual(p[TCP].sport, client_in_port)
5221 self.assertEqual(p[TCP].dport, server_in_port)
5222 self.assertEqual(data, p[Raw].load)
5224 def test_frag_out_of_order(self):
5225 """ NAT64 translate fragments arriving out of order """
5226 self.tcp_port_in = random.randint(1025, 65535)
5228 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5230 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5231 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5235 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5236 self.tcp_port_in, 20, data)
5238 self.pg0.add_stream(pkts)
5239 self.pg_enable_capture(self.pg_interfaces)
5241 frags = self.pg1.get_capture(len(pkts))
5242 p = self.reass_frags_and_verify(frags,
5244 self.pg1.remote_ip4)
5245 self.assertEqual(p[TCP].dport, 20)
5246 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5247 self.tcp_port_out = p[TCP].sport
5248 self.assertEqual(data, p[Raw].load)
5251 data = "A" * 4 + "B" * 16 + "C" * 3
5252 pkts = self.create_stream_frag(self.pg1,
5258 self.pg1.add_stream(pkts)
5259 self.pg_enable_capture(self.pg_interfaces)
5261 frags = self.pg0.get_capture(len(pkts))
5262 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5263 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5264 self.assertEqual(p[TCP].sport, 20)
5265 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5266 self.assertEqual(data, p[Raw].load)
5268 def test_interface_addr(self):
5269 """ Acquire NAT64 pool addresses from interface """
5270 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5272 # no address in NAT64 pool
5273 adresses = self.vapi.nat44_address_dump()
5274 self.assertEqual(0, len(adresses))
5276 # configure interface address and check NAT64 address pool
5277 self.pg4.config_ip4()
5278 addresses = self.vapi.nat64_pool_addr_dump()
5279 self.assertEqual(len(addresses), 1)
5280 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5282 # remove interface address and check NAT64 address pool
5283 self.pg4.unconfig_ip4()
5284 addresses = self.vapi.nat64_pool_addr_dump()
5285 self.assertEqual(0, len(adresses))
5287 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5288 def test_ipfix_max_bibs_sessions(self):
5289 """ IPFIX logging maximum session and BIB entries exceeded """
5292 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5296 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5298 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5299 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5303 for i in range(0, max_bibs):
5304 src = "fd01:aa::%x" % (i)
5305 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5306 IPv6(src=src, dst=remote_host_ip6) /
5307 TCP(sport=12345, dport=80))
5309 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5310 IPv6(src=src, dst=remote_host_ip6) /
5311 TCP(sport=12345, dport=22))
5313 self.pg0.add_stream(pkts)
5314 self.pg_enable_capture(self.pg_interfaces)
5316 self.pg1.get_capture(max_sessions)
5318 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5319 src_address=self.pg3.local_ip4n,
5321 template_interval=10)
5322 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5323 src_port=self.ipfix_src_port)
5325 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5326 IPv6(src=src, dst=remote_host_ip6) /
5327 TCP(sport=12345, dport=25))
5328 self.pg0.add_stream(p)
5329 self.pg_enable_capture(self.pg_interfaces)
5331 self.pg1.get_capture(0)
5332 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5333 capture = self.pg3.get_capture(9)
5334 ipfix = IPFIXDecoder()
5335 # first load template
5337 self.assertTrue(p.haslayer(IPFIX))
5338 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5339 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5340 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5341 self.assertEqual(p[UDP].dport, 4739)
5342 self.assertEqual(p[IPFIX].observationDomainID,
5343 self.ipfix_domain_id)
5344 if p.haslayer(Template):
5345 ipfix.add_template(p.getlayer(Template))
5346 # verify events in data set
5348 if p.haslayer(Data):
5349 data = ipfix.decode_data_set(p.getlayer(Set))
5350 self.verify_ipfix_max_sessions(data, max_sessions)
5352 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5353 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5354 TCP(sport=12345, dport=80))
5355 self.pg0.add_stream(p)
5356 self.pg_enable_capture(self.pg_interfaces)
5358 self.pg1.get_capture(0)
5359 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5360 capture = self.pg3.get_capture(1)
5361 # verify events in data set
5363 self.assertTrue(p.haslayer(IPFIX))
5364 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5365 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5366 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5367 self.assertEqual(p[UDP].dport, 4739)
5368 self.assertEqual(p[IPFIX].observationDomainID,
5369 self.ipfix_domain_id)
5370 if p.haslayer(Data):
5371 data = ipfix.decode_data_set(p.getlayer(Set))
5372 self.verify_ipfix_max_bibs(data, max_bibs)
5374 def test_ipfix_max_frags(self):
5375 """ IPFIX logging maximum fragments pending reassembly exceeded """
5376 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5378 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5379 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5380 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5381 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5382 src_address=self.pg3.local_ip4n,
5384 template_interval=10)
5385 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5386 src_port=self.ipfix_src_port)
5389 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5390 self.tcp_port_in, 20, data)
5391 self.pg0.add_stream(pkts[-1])
5392 self.pg_enable_capture(self.pg_interfaces)
5394 self.pg1.get_capture(0)
5395 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5396 capture = self.pg3.get_capture(9)
5397 ipfix = IPFIXDecoder()
5398 # first load template
5400 self.assertTrue(p.haslayer(IPFIX))
5401 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5402 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5403 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5404 self.assertEqual(p[UDP].dport, 4739)
5405 self.assertEqual(p[IPFIX].observationDomainID,
5406 self.ipfix_domain_id)
5407 if p.haslayer(Template):
5408 ipfix.add_template(p.getlayer(Template))
5409 # verify events in data set
5411 if p.haslayer(Data):
5412 data = ipfix.decode_data_set(p.getlayer(Set))
5413 self.verify_ipfix_max_fragments_ip6(data, 0,
5414 self.pg0.remote_ip6n)
5416 def test_ipfix_bib_ses(self):
5417 """ IPFIX logging NAT64 BIB/session create and delete events """
5418 self.tcp_port_in = random.randint(1025, 65535)
5419 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5423 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5425 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5426 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5427 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5428 src_address=self.pg3.local_ip4n,
5430 template_interval=10)
5431 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5432 src_port=self.ipfix_src_port)
5435 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5436 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5437 TCP(sport=self.tcp_port_in, dport=25))
5438 self.pg0.add_stream(p)
5439 self.pg_enable_capture(self.pg_interfaces)
5441 p = self.pg1.get_capture(1)
5442 self.tcp_port_out = p[0][TCP].sport
5443 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5444 capture = self.pg3.get_capture(10)
5445 ipfix = IPFIXDecoder()
5446 # first load template
5448 self.assertTrue(p.haslayer(IPFIX))
5449 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5450 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5451 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5452 self.assertEqual(p[UDP].dport, 4739)
5453 self.assertEqual(p[IPFIX].observationDomainID,
5454 self.ipfix_domain_id)
5455 if p.haslayer(Template):
5456 ipfix.add_template(p.getlayer(Template))
5457 # verify events in data set
5459 if p.haslayer(Data):
5460 data = ipfix.decode_data_set(p.getlayer(Set))
5461 if ord(data[0][230]) == 10:
5462 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5463 elif ord(data[0][230]) == 6:
5464 self.verify_ipfix_nat64_ses(data,
5466 self.pg0.remote_ip6n,
5467 self.pg1.remote_ip4,
5470 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5473 self.pg_enable_capture(self.pg_interfaces)
5474 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5477 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5478 capture = self.pg3.get_capture(2)
5479 # verify events in data set
5481 self.assertTrue(p.haslayer(IPFIX))
5482 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5483 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5484 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5485 self.assertEqual(p[UDP].dport, 4739)
5486 self.assertEqual(p[IPFIX].observationDomainID,
5487 self.ipfix_domain_id)
5488 if p.haslayer(Data):
5489 data = ipfix.decode_data_set(p.getlayer(Set))
5490 if ord(data[0][230]) == 11:
5491 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5492 elif ord(data[0][230]) == 7:
5493 self.verify_ipfix_nat64_ses(data,
5495 self.pg0.remote_ip6n,
5496 self.pg1.remote_ip4,
5499 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5501 def nat64_get_ses_num(self):
5503 Return number of active NAT64 sessions.
5505 st = self.vapi.nat64_st_dump()
5508 def clear_nat64(self):
5510 Clear NAT64 configuration.
5512 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5513 domain_id=self.ipfix_domain_id)
5514 self.ipfix_src_port = 4739
5515 self.ipfix_domain_id = 1
5517 self.vapi.nat64_set_timeouts()
5519 interfaces = self.vapi.nat64_interface_dump()
5520 for intf in interfaces:
5521 if intf.is_inside > 1:
5522 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5525 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5529 bib = self.vapi.nat64_bib_dump(255)
5532 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5540 adresses = self.vapi.nat64_pool_addr_dump()
5541 for addr in adresses:
5542 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5547 prefixes = self.vapi.nat64_prefix_dump()
5548 for prefix in prefixes:
5549 self.vapi.nat64_add_del_prefix(prefix.prefix,
5551 vrf_id=prefix.vrf_id,
5555 super(TestNAT64, self).tearDown()
5556 if not self.vpp_dead:
5557 self.logger.info(self.vapi.cli("show nat64 pool"))
5558 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5559 self.logger.info(self.vapi.cli("show nat64 prefix"))
5560 self.logger.info(self.vapi.cli("show nat64 bib all"))
5561 self.logger.info(self.vapi.cli("show nat64 session table all"))
5562 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5566 class TestDSlite(MethodHolder):
5567 """ DS-Lite Test Cases """
5570 def setUpClass(cls):
5571 super(TestDSlite, cls).setUpClass()
5574 cls.nat_addr = '10.0.0.3'
5575 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5577 cls.create_pg_interfaces(range(2))
5579 cls.pg0.config_ip4()
5580 cls.pg0.resolve_arp()
5582 cls.pg1.config_ip6()
5583 cls.pg1.generate_remote_hosts(2)
5584 cls.pg1.configure_ipv6_neighbors()
5587 super(TestDSlite, cls).tearDownClass()
5590 def test_dslite(self):
5591 """ Test DS-Lite """
5592 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5594 aftr_ip4 = '192.0.0.1'
5595 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5596 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5597 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5598 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5601 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5602 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5603 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5604 UDP(sport=20000, dport=10000))
5605 self.pg1.add_stream(p)
5606 self.pg_enable_capture(self.pg_interfaces)
5608 capture = self.pg0.get_capture(1)
5609 capture = capture[0]
5610 self.assertFalse(capture.haslayer(IPv6))
5611 self.assertEqual(capture[IP].src, self.nat_addr)
5612 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5613 self.assertNotEqual(capture[UDP].sport, 20000)
5614 self.assertEqual(capture[UDP].dport, 10000)
5615 self.check_ip_checksum(capture)
5616 out_port = capture[UDP].sport
5618 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5619 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5620 UDP(sport=10000, dport=out_port))
5621 self.pg0.add_stream(p)
5622 self.pg_enable_capture(self.pg_interfaces)
5624 capture = self.pg1.get_capture(1)
5625 capture = capture[0]
5626 self.assertEqual(capture[IPv6].src, aftr_ip6)
5627 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5628 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5629 self.assertEqual(capture[IP].dst, '192.168.1.1')
5630 self.assertEqual(capture[UDP].sport, 10000)
5631 self.assertEqual(capture[UDP].dport, 20000)
5632 self.check_ip_checksum(capture)
5635 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5636 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5637 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5638 TCP(sport=20001, dport=10001))
5639 self.pg1.add_stream(p)
5640 self.pg_enable_capture(self.pg_interfaces)
5642 capture = self.pg0.get_capture(1)
5643 capture = capture[0]
5644 self.assertFalse(capture.haslayer(IPv6))
5645 self.assertEqual(capture[IP].src, self.nat_addr)
5646 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5647 self.assertNotEqual(capture[TCP].sport, 20001)
5648 self.assertEqual(capture[TCP].dport, 10001)
5649 self.check_ip_checksum(capture)
5650 self.check_tcp_checksum(capture)
5651 out_port = capture[TCP].sport
5653 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5654 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5655 TCP(sport=10001, dport=out_port))
5656 self.pg0.add_stream(p)
5657 self.pg_enable_capture(self.pg_interfaces)
5659 capture = self.pg1.get_capture(1)
5660 capture = capture[0]
5661 self.assertEqual(capture[IPv6].src, aftr_ip6)
5662 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5663 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5664 self.assertEqual(capture[IP].dst, '192.168.1.1')
5665 self.assertEqual(capture[TCP].sport, 10001)
5666 self.assertEqual(capture[TCP].dport, 20001)
5667 self.check_ip_checksum(capture)
5668 self.check_tcp_checksum(capture)
5671 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5672 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5673 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5674 ICMP(id=4000, type='echo-request'))
5675 self.pg1.add_stream(p)
5676 self.pg_enable_capture(self.pg_interfaces)
5678 capture = self.pg0.get_capture(1)
5679 capture = capture[0]
5680 self.assertFalse(capture.haslayer(IPv6))
5681 self.assertEqual(capture[IP].src, self.nat_addr)
5682 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5683 self.assertNotEqual(capture[ICMP].id, 4000)
5684 self.check_ip_checksum(capture)
5685 self.check_icmp_checksum(capture)
5686 out_id = capture[ICMP].id
5688 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5689 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5690 ICMP(id=out_id, type='echo-reply'))
5691 self.pg0.add_stream(p)
5692 self.pg_enable_capture(self.pg_interfaces)
5694 capture = self.pg1.get_capture(1)
5695 capture = capture[0]
5696 self.assertEqual(capture[IPv6].src, aftr_ip6)
5697 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5698 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5699 self.assertEqual(capture[IP].dst, '192.168.1.1')
5700 self.assertEqual(capture[ICMP].id, 4000)
5701 self.check_ip_checksum(capture)
5702 self.check_icmp_checksum(capture)
5704 # ping DS-Lite AFTR tunnel endpoint address
5705 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5706 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
5707 ICMPv6EchoRequest())
5708 self.pg1.add_stream(p)
5709 self.pg_enable_capture(self.pg_interfaces)
5711 capture = self.pg1.get_capture(1)
5712 self.assertEqual(1, len(capture))
5713 capture = capture[0]
5714 self.assertEqual(capture[IPv6].src, aftr_ip6)
5715 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
5716 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5719 super(TestDSlite, self).tearDown()
5720 if not self.vpp_dead:
5721 self.logger.info(self.vapi.cli("show dslite pool"))
5723 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5724 self.logger.info(self.vapi.cli("show dslite sessions"))
5727 class TestDSliteCE(MethodHolder):
5728 """ DS-Lite CE Test Cases """
5731 def setUpConstants(cls):
5732 super(TestDSliteCE, cls).setUpConstants()
5733 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
5736 def setUpClass(cls):
5737 super(TestDSliteCE, cls).setUpClass()
5740 cls.create_pg_interfaces(range(2))
5742 cls.pg0.config_ip4()
5743 cls.pg0.resolve_arp()
5745 cls.pg1.config_ip6()
5746 cls.pg1.generate_remote_hosts(1)
5747 cls.pg1.configure_ipv6_neighbors()
5750 super(TestDSliteCE, cls).tearDownClass()
5753 def test_dslite_ce(self):
5754 """ Test DS-Lite CE """
5756 b4_ip4 = '192.0.0.2'
5757 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
5758 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
5759 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
5760 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
5762 aftr_ip4 = '192.0.0.1'
5763 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5764 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5765 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5766 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5768 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
5769 dst_address_length=128,
5770 next_hop_address=self.pg1.remote_ip6n,
5771 next_hop_sw_if_index=self.pg1.sw_if_index,
5775 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5776 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
5777 UDP(sport=10000, dport=20000))
5778 self.pg0.add_stream(p)
5779 self.pg_enable_capture(self.pg_interfaces)
5781 capture = self.pg1.get_capture(1)
5782 capture = capture[0]
5783 self.assertEqual(capture[IPv6].src, b4_ip6)
5784 self.assertEqual(capture[IPv6].dst, aftr_ip6)
5785 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5786 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
5787 self.assertEqual(capture[UDP].sport, 10000)
5788 self.assertEqual(capture[UDP].dport, 20000)
5789 self.check_ip_checksum(capture)
5792 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5793 IPv6(dst=b4_ip6, src=aftr_ip6) /
5794 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
5795 UDP(sport=20000, dport=10000))
5796 self.pg1.add_stream(p)
5797 self.pg_enable_capture(self.pg_interfaces)
5799 capture = self.pg0.get_capture(1)
5800 capture = capture[0]
5801 self.assertFalse(capture.haslayer(IPv6))
5802 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
5803 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5804 self.assertEqual(capture[UDP].sport, 20000)
5805 self.assertEqual(capture[UDP].dport, 10000)
5806 self.check_ip_checksum(capture)
5808 # ping DS-Lite B4 tunnel endpoint address
5809 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5810 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
5811 ICMPv6EchoRequest())
5812 self.pg1.add_stream(p)
5813 self.pg_enable_capture(self.pg_interfaces)
5815 capture = self.pg1.get_capture(1)
5816 self.assertEqual(1, len(capture))
5817 capture = capture[0]
5818 self.assertEqual(capture[IPv6].src, b4_ip6)
5819 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5820 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
5823 super(TestDSliteCE, self).tearDown()
5824 if not self.vpp_dead:
5826 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
5828 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
5830 if __name__ == '__main__':
5831 unittest.main(testRunner=VppTestRunner)