9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_dynamic_to_static(self):
1680 """ Switch from dynamic translation to 1:1NAT """
1681 nat_ip = "10.0.0.10"
1682 self.tcp_port_out = 6303
1683 self.udp_port_out = 6304
1684 self.icmp_id_out = 6305
1686 self.nat44_add_address(self.nat_addr)
1687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692 pkts = self.create_stream_in(self.pg0, self.pg1)
1693 self.pg0.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg1.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1700 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702 self.assertEqual(len(sessions), 0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_identity_nat(self):
1711 """ Identity NAT """
1713 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1718 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720 TCP(sport=12345, dport=56789))
1721 self.pg1.add_stream(p)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg0.get_capture(1)
1729 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730 self.assertEqual(ip.src, self.pg1.remote_ip4)
1731 self.assertEqual(tcp.dport, 56789)
1732 self.assertEqual(tcp.sport, 12345)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1739 def test_static_lb(self):
1740 """ NAT44 local service load balancing """
1741 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1744 server1 = self.pg0.remote_hosts[0]
1745 server2 = self.pg0.remote_hosts[1]
1747 locals = [{'addr': server1.ip4n,
1750 {'addr': server2.ip4n,
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1758 local_num=len(locals),
1760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1764 # from client to service
1765 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767 TCP(sport=12345, dport=external_port))
1768 self.pg1.add_stream(p)
1769 self.pg_enable_capture(self.pg_interfaces)
1771 capture = self.pg0.get_capture(1)
1777 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778 if ip.dst == server1.ip4:
1782 self.assertEqual(tcp.dport, local_port)
1783 self.check_tcp_checksum(p)
1784 self.check_ip_checksum(p)
1786 self.logger.error(ppp("Unexpected or invalid packet:", p))
1789 # from service back to client
1790 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792 TCP(sport=local_port, dport=12345))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg1.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(tcp.sport, external_port)
1803 self.check_tcp_checksum(p)
1804 self.check_ip_checksum(p)
1806 self.logger.error(ppp("Unexpected or invalid packet:", p))
1812 clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1814 for client in clients:
1815 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1816 IP(src=client, dst=self.nat_addr) /
1817 TCP(sport=12345, dport=external_port))
1819 self.pg1.add_stream(pkts)
1820 self.pg_enable_capture(self.pg_interfaces)
1822 capture = self.pg0.get_capture(len(pkts))
1824 if p[IP].dst == server1.ip4:
1828 self.assertTrue(server1_n > server2_n)
1830 def test_static_lb_2(self):
1831 """ NAT44 local service load balancing (asymmetrical rule) """
1832 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1835 server1 = self.pg0.remote_hosts[0]
1836 server2 = self.pg0.remote_hosts[1]
1838 locals = [{'addr': server1.ip4n,
1841 {'addr': server2.ip4n,
1845 self.vapi.nat44_forwarding_enable_disable(1)
1846 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1850 local_num=len(locals),
1852 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1853 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1856 # from client to service
1857 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1858 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1859 TCP(sport=12345, dport=external_port))
1860 self.pg1.add_stream(p)
1861 self.pg_enable_capture(self.pg_interfaces)
1863 capture = self.pg0.get_capture(1)
1869 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1870 if ip.dst == server1.ip4:
1874 self.assertEqual(tcp.dport, local_port)
1875 self.check_tcp_checksum(p)
1876 self.check_ip_checksum(p)
1878 self.logger.error(ppp("Unexpected or invalid packet:", p))
1881 # from service back to client
1882 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1883 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1884 TCP(sport=local_port, dport=12345))
1885 self.pg0.add_stream(p)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg1.get_capture(1)
1893 self.assertEqual(ip.src, self.nat_addr)
1894 self.assertEqual(tcp.sport, external_port)
1895 self.check_tcp_checksum(p)
1896 self.check_ip_checksum(p)
1898 self.logger.error(ppp("Unexpected or invalid packet:", p))
1901 # from client to server (no translation)
1902 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1903 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1904 TCP(sport=12346, dport=local_port))
1905 self.pg1.add_stream(p)
1906 self.pg_enable_capture(self.pg_interfaces)
1908 capture = self.pg0.get_capture(1)
1914 self.assertEqual(ip.dst, server1.ip4)
1915 self.assertEqual(tcp.dport, local_port)
1916 self.check_tcp_checksum(p)
1917 self.check_ip_checksum(p)
1919 self.logger.error(ppp("Unexpected or invalid packet:", p))
1922 # from service back to client (no translation)
1923 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1924 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1925 TCP(sport=local_port, dport=12346))
1926 self.pg0.add_stream(p)
1927 self.pg_enable_capture(self.pg_interfaces)
1929 capture = self.pg1.get_capture(1)
1934 self.assertEqual(ip.src, server1.ip4)
1935 self.assertEqual(tcp.sport, local_port)
1936 self.check_tcp_checksum(p)
1937 self.check_ip_checksum(p)
1939 self.logger.error(ppp("Unexpected or invalid packet:", p))
1942 def test_multiple_inside_interfaces(self):
1943 """ NAT44 multiple non-overlapping address space inside interfaces """
1945 self.nat44_add_address(self.nat_addr)
1946 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1947 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1948 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1951 # between two NAT44 inside interfaces (no translation)
1952 pkts = self.create_stream_in(self.pg0, self.pg1)
1953 self.pg0.add_stream(pkts)
1954 self.pg_enable_capture(self.pg_interfaces)
1956 capture = self.pg1.get_capture(len(pkts))
1957 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1959 # from NAT44 inside to interface without NAT44 feature (no translation)
1960 pkts = self.create_stream_in(self.pg0, self.pg2)
1961 self.pg0.add_stream(pkts)
1962 self.pg_enable_capture(self.pg_interfaces)
1964 capture = self.pg2.get_capture(len(pkts))
1965 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1967 # in2out 1st interface
1968 pkts = self.create_stream_in(self.pg0, self.pg3)
1969 self.pg0.add_stream(pkts)
1970 self.pg_enable_capture(self.pg_interfaces)
1972 capture = self.pg3.get_capture(len(pkts))
1973 self.verify_capture_out(capture)
1975 # out2in 1st interface
1976 pkts = self.create_stream_out(self.pg3)
1977 self.pg3.add_stream(pkts)
1978 self.pg_enable_capture(self.pg_interfaces)
1980 capture = self.pg0.get_capture(len(pkts))
1981 self.verify_capture_in(capture, self.pg0)
1983 # in2out 2nd interface
1984 pkts = self.create_stream_in(self.pg1, self.pg3)
1985 self.pg1.add_stream(pkts)
1986 self.pg_enable_capture(self.pg_interfaces)
1988 capture = self.pg3.get_capture(len(pkts))
1989 self.verify_capture_out(capture)
1991 # out2in 2nd interface
1992 pkts = self.create_stream_out(self.pg3)
1993 self.pg3.add_stream(pkts)
1994 self.pg_enable_capture(self.pg_interfaces)
1996 capture = self.pg1.get_capture(len(pkts))
1997 self.verify_capture_in(capture, self.pg1)
1999 def test_inside_overlapping_interfaces(self):
2000 """ NAT44 multiple inside interfaces with overlapping address space """
2002 static_nat_ip = "10.0.0.10"
2003 self.nat44_add_address(self.nat_addr)
2004 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2006 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2007 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2008 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2009 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2012 # between NAT44 inside interfaces with same VRF (no translation)
2013 pkts = self.create_stream_in(self.pg4, self.pg5)
2014 self.pg4.add_stream(pkts)
2015 self.pg_enable_capture(self.pg_interfaces)
2017 capture = self.pg5.get_capture(len(pkts))
2018 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2020 # between NAT44 inside interfaces with different VRF (hairpinning)
2021 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2022 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2023 TCP(sport=1234, dport=5678))
2024 self.pg4.add_stream(p)
2025 self.pg_enable_capture(self.pg_interfaces)
2027 capture = self.pg6.get_capture(1)
2032 self.assertEqual(ip.src, self.nat_addr)
2033 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2034 self.assertNotEqual(tcp.sport, 1234)
2035 self.assertEqual(tcp.dport, 5678)
2037 self.logger.error(ppp("Unexpected or invalid packet:", p))
2040 # in2out 1st interface
2041 pkts = self.create_stream_in(self.pg4, self.pg3)
2042 self.pg4.add_stream(pkts)
2043 self.pg_enable_capture(self.pg_interfaces)
2045 capture = self.pg3.get_capture(len(pkts))
2046 self.verify_capture_out(capture)
2048 # out2in 1st interface
2049 pkts = self.create_stream_out(self.pg3)
2050 self.pg3.add_stream(pkts)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 capture = self.pg4.get_capture(len(pkts))
2054 self.verify_capture_in(capture, self.pg4)
2056 # in2out 2nd interface
2057 pkts = self.create_stream_in(self.pg5, self.pg3)
2058 self.pg5.add_stream(pkts)
2059 self.pg_enable_capture(self.pg_interfaces)
2061 capture = self.pg3.get_capture(len(pkts))
2062 self.verify_capture_out(capture)
2064 # out2in 2nd interface
2065 pkts = self.create_stream_out(self.pg3)
2066 self.pg3.add_stream(pkts)
2067 self.pg_enable_capture(self.pg_interfaces)
2069 capture = self.pg5.get_capture(len(pkts))
2070 self.verify_capture_in(capture, self.pg5)
2073 addresses = self.vapi.nat44_address_dump()
2074 self.assertEqual(len(addresses), 1)
2075 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2076 self.assertEqual(len(sessions), 3)
2077 for session in sessions:
2078 self.assertFalse(session.is_static)
2079 self.assertEqual(session.inside_ip_address[0:4],
2080 self.pg5.remote_ip4n)
2081 self.assertEqual(session.outside_ip_address,
2082 addresses[0].ip_address)
2083 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2084 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2085 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2086 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2087 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2088 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2089 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2090 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2091 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2093 # in2out 3rd interface
2094 pkts = self.create_stream_in(self.pg6, self.pg3)
2095 self.pg6.add_stream(pkts)
2096 self.pg_enable_capture(self.pg_interfaces)
2098 capture = self.pg3.get_capture(len(pkts))
2099 self.verify_capture_out(capture, static_nat_ip, True)
2101 # out2in 3rd interface
2102 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2103 self.pg3.add_stream(pkts)
2104 self.pg_enable_capture(self.pg_interfaces)
2106 capture = self.pg6.get_capture(len(pkts))
2107 self.verify_capture_in(capture, self.pg6)
2109 # general user and session dump verifications
2110 users = self.vapi.nat44_user_dump()
2111 self.assertTrue(len(users) >= 3)
2112 addresses = self.vapi.nat44_address_dump()
2113 self.assertEqual(len(addresses), 1)
2115 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2117 for session in sessions:
2118 self.assertEqual(user.ip_address, session.inside_ip_address)
2119 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2120 self.assertTrue(session.protocol in
2121 [IP_PROTOS.tcp, IP_PROTOS.udp,
2125 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2126 self.assertTrue(len(sessions) >= 4)
2127 for session in sessions:
2128 self.assertFalse(session.is_static)
2129 self.assertEqual(session.inside_ip_address[0:4],
2130 self.pg4.remote_ip4n)
2131 self.assertEqual(session.outside_ip_address,
2132 addresses[0].ip_address)
2135 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2136 self.assertTrue(len(sessions) >= 3)
2137 for session in sessions:
2138 self.assertTrue(session.is_static)
2139 self.assertEqual(session.inside_ip_address[0:4],
2140 self.pg6.remote_ip4n)
2141 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2142 map(int, static_nat_ip.split('.')))
2143 self.assertTrue(session.inside_port in
2144 [self.tcp_port_in, self.udp_port_in,
2147 def test_hairpinning(self):
2148 """ NAT44 hairpinning - 1:1 NAPT """
2150 host = self.pg0.remote_hosts[0]
2151 server = self.pg0.remote_hosts[1]
2154 server_in_port = 5678
2155 server_out_port = 8765
2157 self.nat44_add_address(self.nat_addr)
2158 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2159 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2161 # add static mapping for server
2162 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2163 server_in_port, server_out_port,
2164 proto=IP_PROTOS.tcp)
2166 # send packet from host to server
2167 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2168 IP(src=host.ip4, dst=self.nat_addr) /
2169 TCP(sport=host_in_port, dport=server_out_port))
2170 self.pg0.add_stream(p)
2171 self.pg_enable_capture(self.pg_interfaces)
2173 capture = self.pg0.get_capture(1)
2178 self.assertEqual(ip.src, self.nat_addr)
2179 self.assertEqual(ip.dst, server.ip4)
2180 self.assertNotEqual(tcp.sport, host_in_port)
2181 self.assertEqual(tcp.dport, server_in_port)
2182 self.check_tcp_checksum(p)
2183 host_out_port = tcp.sport
2185 self.logger.error(ppp("Unexpected or invalid packet:", p))
2188 # send reply from server to host
2189 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2190 IP(src=server.ip4, dst=self.nat_addr) /
2191 TCP(sport=server_in_port, dport=host_out_port))
2192 self.pg0.add_stream(p)
2193 self.pg_enable_capture(self.pg_interfaces)
2195 capture = self.pg0.get_capture(1)
2200 self.assertEqual(ip.src, self.nat_addr)
2201 self.assertEqual(ip.dst, host.ip4)
2202 self.assertEqual(tcp.sport, server_out_port)
2203 self.assertEqual(tcp.dport, host_in_port)
2204 self.check_tcp_checksum(p)
2206 self.logger.error(ppp("Unexpected or invalid packet:", p))
2209 def test_hairpinning2(self):
2210 """ NAT44 hairpinning - 1:1 NAT"""
2212 server1_nat_ip = "10.0.0.10"
2213 server2_nat_ip = "10.0.0.11"
2214 host = self.pg0.remote_hosts[0]
2215 server1 = self.pg0.remote_hosts[1]
2216 server2 = self.pg0.remote_hosts[2]
2217 server_tcp_port = 22
2218 server_udp_port = 20
2220 self.nat44_add_address(self.nat_addr)
2221 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2222 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2225 # add static mapping for servers
2226 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2227 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2231 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2232 IP(src=host.ip4, dst=server1_nat_ip) /
2233 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2236 IP(src=host.ip4, dst=server1_nat_ip) /
2237 UDP(sport=self.udp_port_in, dport=server_udp_port))
2239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2240 IP(src=host.ip4, dst=server1_nat_ip) /
2241 ICMP(id=self.icmp_id_in, type='echo-request'))
2243 self.pg0.add_stream(pkts)
2244 self.pg_enable_capture(self.pg_interfaces)
2246 capture = self.pg0.get_capture(len(pkts))
2247 for packet in capture:
2249 self.assertEqual(packet[IP].src, self.nat_addr)
2250 self.assertEqual(packet[IP].dst, server1.ip4)
2251 if packet.haslayer(TCP):
2252 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2253 self.assertEqual(packet[TCP].dport, server_tcp_port)
2254 self.tcp_port_out = packet[TCP].sport
2255 self.check_tcp_checksum(packet)
2256 elif packet.haslayer(UDP):
2257 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2258 self.assertEqual(packet[UDP].dport, server_udp_port)
2259 self.udp_port_out = packet[UDP].sport
2261 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2262 self.icmp_id_out = packet[ICMP].id
2264 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2269 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2270 IP(src=server1.ip4, dst=self.nat_addr) /
2271 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2273 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2274 IP(src=server1.ip4, dst=self.nat_addr) /
2275 UDP(sport=server_udp_port, dport=self.udp_port_out))
2277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2278 IP(src=server1.ip4, dst=self.nat_addr) /
2279 ICMP(id=self.icmp_id_out, type='echo-reply'))
2281 self.pg0.add_stream(pkts)
2282 self.pg_enable_capture(self.pg_interfaces)
2284 capture = self.pg0.get_capture(len(pkts))
2285 for packet in capture:
2287 self.assertEqual(packet[IP].src, server1_nat_ip)
2288 self.assertEqual(packet[IP].dst, host.ip4)
2289 if packet.haslayer(TCP):
2290 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2291 self.assertEqual(packet[TCP].sport, server_tcp_port)
2292 self.check_tcp_checksum(packet)
2293 elif packet.haslayer(UDP):
2294 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2295 self.assertEqual(packet[UDP].sport, server_udp_port)
2297 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2299 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2302 # server2 to server1
2304 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2305 IP(src=server2.ip4, dst=server1_nat_ip) /
2306 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2308 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2309 IP(src=server2.ip4, dst=server1_nat_ip) /
2310 UDP(sport=self.udp_port_in, dport=server_udp_port))
2312 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2313 IP(src=server2.ip4, dst=server1_nat_ip) /
2314 ICMP(id=self.icmp_id_in, type='echo-request'))
2316 self.pg0.add_stream(pkts)
2317 self.pg_enable_capture(self.pg_interfaces)
2319 capture = self.pg0.get_capture(len(pkts))
2320 for packet in capture:
2322 self.assertEqual(packet[IP].src, server2_nat_ip)
2323 self.assertEqual(packet[IP].dst, server1.ip4)
2324 if packet.haslayer(TCP):
2325 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2326 self.assertEqual(packet[TCP].dport, server_tcp_port)
2327 self.tcp_port_out = packet[TCP].sport
2328 self.check_tcp_checksum(packet)
2329 elif packet.haslayer(UDP):
2330 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2331 self.assertEqual(packet[UDP].dport, server_udp_port)
2332 self.udp_port_out = packet[UDP].sport
2334 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2335 self.icmp_id_out = packet[ICMP].id
2337 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2340 # server1 to server2
2342 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2343 IP(src=server1.ip4, dst=server2_nat_ip) /
2344 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2346 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2347 IP(src=server1.ip4, dst=server2_nat_ip) /
2348 UDP(sport=server_udp_port, dport=self.udp_port_out))
2350 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2351 IP(src=server1.ip4, dst=server2_nat_ip) /
2352 ICMP(id=self.icmp_id_out, type='echo-reply'))
2354 self.pg0.add_stream(pkts)
2355 self.pg_enable_capture(self.pg_interfaces)
2357 capture = self.pg0.get_capture(len(pkts))
2358 for packet in capture:
2360 self.assertEqual(packet[IP].src, server1_nat_ip)
2361 self.assertEqual(packet[IP].dst, server2.ip4)
2362 if packet.haslayer(TCP):
2363 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2364 self.assertEqual(packet[TCP].sport, server_tcp_port)
2365 self.check_tcp_checksum(packet)
2366 elif packet.haslayer(UDP):
2367 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2368 self.assertEqual(packet[UDP].sport, server_udp_port)
2370 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2372 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2375 def test_max_translations_per_user(self):
2376 """ MAX translations per user - recycle the least recently used """
2378 self.nat44_add_address(self.nat_addr)
2379 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2380 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2383 # get maximum number of translations per user
2384 nat44_config = self.vapi.nat_show_config()
2386 # send more than maximum number of translations per user packets
2387 pkts_num = nat44_config.max_translations_per_user + 5
2389 for port in range(0, pkts_num):
2390 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392 TCP(sport=1025 + port))
2394 self.pg0.add_stream(pkts)
2395 self.pg_enable_capture(self.pg_interfaces)
2398 # verify number of translated packet
2399 self.pg1.get_capture(pkts_num)
2401 def test_interface_addr(self):
2402 """ Acquire NAT44 addresses from interface """
2403 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2405 # no address in NAT pool
2406 adresses = self.vapi.nat44_address_dump()
2407 self.assertEqual(0, len(adresses))
2409 # configure interface address and check NAT address pool
2410 self.pg7.config_ip4()
2411 adresses = self.vapi.nat44_address_dump()
2412 self.assertEqual(1, len(adresses))
2413 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2415 # remove interface address and check NAT address pool
2416 self.pg7.unconfig_ip4()
2417 adresses = self.vapi.nat44_address_dump()
2418 self.assertEqual(0, len(adresses))
2420 def test_interface_addr_static_mapping(self):
2421 """ Static mapping with addresses from interface """
2424 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2425 self.nat44_add_static_mapping(
2427 external_sw_if_index=self.pg7.sw_if_index,
2430 # static mappings with external interface
2431 static_mappings = self.vapi.nat44_static_mapping_dump()
2432 self.assertEqual(1, len(static_mappings))
2433 self.assertEqual(self.pg7.sw_if_index,
2434 static_mappings[0].external_sw_if_index)
2435 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2437 # configure interface address and check static mappings
2438 self.pg7.config_ip4()
2439 static_mappings = self.vapi.nat44_static_mapping_dump()
2440 self.assertEqual(1, len(static_mappings))
2441 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2442 self.pg7.local_ip4n)
2443 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2444 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2446 # remove interface address and check static mappings
2447 self.pg7.unconfig_ip4()
2448 static_mappings = self.vapi.nat44_static_mapping_dump()
2449 self.assertEqual(0, len(static_mappings))
2451 def test_interface_addr_identity_nat(self):
2452 """ Identity NAT with addresses from interface """
2455 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2456 self.vapi.nat44_add_del_identity_mapping(
2457 sw_if_index=self.pg7.sw_if_index,
2459 protocol=IP_PROTOS.tcp,
2462 # identity mappings with external interface
2463 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2464 self.assertEqual(1, len(identity_mappings))
2465 self.assertEqual(self.pg7.sw_if_index,
2466 identity_mappings[0].sw_if_index)
2468 # configure interface address and check identity mappings
2469 self.pg7.config_ip4()
2470 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2471 self.assertEqual(1, len(identity_mappings))
2472 self.assertEqual(identity_mappings[0].ip_address,
2473 self.pg7.local_ip4n)
2474 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2475 self.assertEqual(port, identity_mappings[0].port)
2476 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2478 # remove interface address and check identity mappings
2479 self.pg7.unconfig_ip4()
2480 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2481 self.assertEqual(0, len(identity_mappings))
2483 def test_ipfix_nat44_sess(self):
2484 """ IPFIX logging NAT44 session created/delted """
2485 self.ipfix_domain_id = 10
2486 self.ipfix_src_port = 20202
2487 colector_port = 30303
2488 bind_layers(UDP, IPFIX, dport=30303)
2489 self.nat44_add_address(self.nat_addr)
2490 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2491 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2493 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2494 src_address=self.pg3.local_ip4n,
2496 template_interval=10,
2497 collector_port=colector_port)
2498 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2499 src_port=self.ipfix_src_port)
2501 pkts = self.create_stream_in(self.pg0, self.pg1)
2502 self.pg0.add_stream(pkts)
2503 self.pg_enable_capture(self.pg_interfaces)
2505 capture = self.pg1.get_capture(len(pkts))
2506 self.verify_capture_out(capture)
2507 self.nat44_add_address(self.nat_addr, is_add=0)
2508 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2509 capture = self.pg3.get_capture(9)
2510 ipfix = IPFIXDecoder()
2511 # first load template
2513 self.assertTrue(p.haslayer(IPFIX))
2514 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2515 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2516 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2517 self.assertEqual(p[UDP].dport, colector_port)
2518 self.assertEqual(p[IPFIX].observationDomainID,
2519 self.ipfix_domain_id)
2520 if p.haslayer(Template):
2521 ipfix.add_template(p.getlayer(Template))
2522 # verify events in data set
2524 if p.haslayer(Data):
2525 data = ipfix.decode_data_set(p.getlayer(Set))
2526 self.verify_ipfix_nat44_ses(data)
2528 def test_ipfix_addr_exhausted(self):
2529 """ IPFIX logging NAT addresses exhausted """
2530 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2531 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2533 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2534 src_address=self.pg3.local_ip4n,
2536 template_interval=10)
2537 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2538 src_port=self.ipfix_src_port)
2540 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2541 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2543 self.pg0.add_stream(p)
2544 self.pg_enable_capture(self.pg_interfaces)
2546 capture = self.pg1.get_capture(0)
2547 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2548 capture = self.pg3.get_capture(9)
2549 ipfix = IPFIXDecoder()
2550 # first load template
2552 self.assertTrue(p.haslayer(IPFIX))
2553 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2554 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2555 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2556 self.assertEqual(p[UDP].dport, 4739)
2557 self.assertEqual(p[IPFIX].observationDomainID,
2558 self.ipfix_domain_id)
2559 if p.haslayer(Template):
2560 ipfix.add_template(p.getlayer(Template))
2561 # verify events in data set
2563 if p.haslayer(Data):
2564 data = ipfix.decode_data_set(p.getlayer(Set))
2565 self.verify_ipfix_addr_exhausted(data)
2567 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2568 def test_ipfix_max_sessions(self):
2569 """ IPFIX logging maximum session entries exceeded """
2570 self.nat44_add_address(self.nat_addr)
2571 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2572 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2575 nat44_config = self.vapi.nat_show_config()
2576 max_sessions = 10 * nat44_config.translation_buckets
2579 for i in range(0, max_sessions):
2580 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2581 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2582 IP(src=src, dst=self.pg1.remote_ip4) /
2585 self.pg0.add_stream(pkts)
2586 self.pg_enable_capture(self.pg_interfaces)
2589 self.pg1.get_capture(max_sessions)
2590 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2591 src_address=self.pg3.local_ip4n,
2593 template_interval=10)
2594 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2595 src_port=self.ipfix_src_port)
2597 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2598 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2600 self.pg0.add_stream(p)
2601 self.pg_enable_capture(self.pg_interfaces)
2603 self.pg1.get_capture(0)
2604 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2605 capture = self.pg3.get_capture(9)
2606 ipfix = IPFIXDecoder()
2607 # first load template
2609 self.assertTrue(p.haslayer(IPFIX))
2610 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2611 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2612 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2613 self.assertEqual(p[UDP].dport, 4739)
2614 self.assertEqual(p[IPFIX].observationDomainID,
2615 self.ipfix_domain_id)
2616 if p.haslayer(Template):
2617 ipfix.add_template(p.getlayer(Template))
2618 # verify events in data set
2620 if p.haslayer(Data):
2621 data = ipfix.decode_data_set(p.getlayer(Set))
2622 self.verify_ipfix_max_sessions(data, max_sessions)
2624 def test_pool_addr_fib(self):
2625 """ NAT44 add pool addresses to FIB """
2626 static_addr = '10.0.0.10'
2627 self.nat44_add_address(self.nat_addr)
2628 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2629 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2631 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2634 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2635 ARP(op=ARP.who_has, pdst=self.nat_addr,
2636 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2637 self.pg1.add_stream(p)
2638 self.pg_enable_capture(self.pg_interfaces)
2640 capture = self.pg1.get_capture(1)
2641 self.assertTrue(capture[0].haslayer(ARP))
2642 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2645 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2646 ARP(op=ARP.who_has, pdst=static_addr,
2647 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2648 self.pg1.add_stream(p)
2649 self.pg_enable_capture(self.pg_interfaces)
2651 capture = self.pg1.get_capture(1)
2652 self.assertTrue(capture[0].haslayer(ARP))
2653 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2655 # send ARP to non-NAT44 interface
2656 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2657 ARP(op=ARP.who_has, pdst=self.nat_addr,
2658 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2659 self.pg2.add_stream(p)
2660 self.pg_enable_capture(self.pg_interfaces)
2662 capture = self.pg1.get_capture(0)
2664 # remove addresses and verify
2665 self.nat44_add_address(self.nat_addr, is_add=0)
2666 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2669 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2670 ARP(op=ARP.who_has, pdst=self.nat_addr,
2671 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2672 self.pg1.add_stream(p)
2673 self.pg_enable_capture(self.pg_interfaces)
2675 capture = self.pg1.get_capture(0)
2677 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2678 ARP(op=ARP.who_has, pdst=static_addr,
2679 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2680 self.pg1.add_stream(p)
2681 self.pg_enable_capture(self.pg_interfaces)
2683 capture = self.pg1.get_capture(0)
2685 def test_vrf_mode(self):
2686 """ NAT44 tenant VRF aware address pool mode """
2690 nat_ip1 = "10.0.0.10"
2691 nat_ip2 = "10.0.0.11"
2693 self.pg0.unconfig_ip4()
2694 self.pg1.unconfig_ip4()
2695 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2696 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2697 self.pg0.set_table_ip4(vrf_id1)
2698 self.pg1.set_table_ip4(vrf_id2)
2699 self.pg0.config_ip4()
2700 self.pg1.config_ip4()
2702 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2703 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2704 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2705 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2706 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2710 pkts = self.create_stream_in(self.pg0, self.pg2)
2711 self.pg0.add_stream(pkts)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg2.get_capture(len(pkts))
2715 self.verify_capture_out(capture, nat_ip1)
2718 pkts = self.create_stream_in(self.pg1, self.pg2)
2719 self.pg1.add_stream(pkts)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg2.get_capture(len(pkts))
2723 self.verify_capture_out(capture, nat_ip2)
2725 self.pg0.unconfig_ip4()
2726 self.pg1.unconfig_ip4()
2727 self.pg0.set_table_ip4(0)
2728 self.pg1.set_table_ip4(0)
2729 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2730 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2732 def test_vrf_feature_independent(self):
2733 """ NAT44 tenant VRF independent address pool mode """
2735 nat_ip1 = "10.0.0.10"
2736 nat_ip2 = "10.0.0.11"
2738 self.nat44_add_address(nat_ip1)
2739 self.nat44_add_address(nat_ip2, vrf_id=99)
2740 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2741 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2742 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2746 pkts = self.create_stream_in(self.pg0, self.pg2)
2747 self.pg0.add_stream(pkts)
2748 self.pg_enable_capture(self.pg_interfaces)
2750 capture = self.pg2.get_capture(len(pkts))
2751 self.verify_capture_out(capture, nat_ip1)
2754 pkts = self.create_stream_in(self.pg1, self.pg2)
2755 self.pg1.add_stream(pkts)
2756 self.pg_enable_capture(self.pg_interfaces)
2758 capture = self.pg2.get_capture(len(pkts))
2759 self.verify_capture_out(capture, nat_ip1)
2761 def test_dynamic_ipless_interfaces(self):
2762 """ NAT44 interfaces without configured IP address """
2764 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2765 mactobinary(self.pg7.remote_mac),
2766 self.pg7.remote_ip4n,
2768 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2769 mactobinary(self.pg8.remote_mac),
2770 self.pg8.remote_ip4n,
2773 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2774 dst_address_length=32,
2775 next_hop_address=self.pg7.remote_ip4n,
2776 next_hop_sw_if_index=self.pg7.sw_if_index)
2777 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2778 dst_address_length=32,
2779 next_hop_address=self.pg8.remote_ip4n,
2780 next_hop_sw_if_index=self.pg8.sw_if_index)
2782 self.nat44_add_address(self.nat_addr)
2783 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2784 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2788 pkts = self.create_stream_in(self.pg7, self.pg8)
2789 self.pg7.add_stream(pkts)
2790 self.pg_enable_capture(self.pg_interfaces)
2792 capture = self.pg8.get_capture(len(pkts))
2793 self.verify_capture_out(capture)
2796 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2797 self.pg8.add_stream(pkts)
2798 self.pg_enable_capture(self.pg_interfaces)
2800 capture = self.pg7.get_capture(len(pkts))
2801 self.verify_capture_in(capture, self.pg7)
2803 def test_static_ipless_interfaces(self):
2804 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2806 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2807 mactobinary(self.pg7.remote_mac),
2808 self.pg7.remote_ip4n,
2810 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2811 mactobinary(self.pg8.remote_mac),
2812 self.pg8.remote_ip4n,
2815 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2816 dst_address_length=32,
2817 next_hop_address=self.pg7.remote_ip4n,
2818 next_hop_sw_if_index=self.pg7.sw_if_index)
2819 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2820 dst_address_length=32,
2821 next_hop_address=self.pg8.remote_ip4n,
2822 next_hop_sw_if_index=self.pg8.sw_if_index)
2824 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2825 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2826 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2830 pkts = self.create_stream_out(self.pg8)
2831 self.pg8.add_stream(pkts)
2832 self.pg_enable_capture(self.pg_interfaces)
2834 capture = self.pg7.get_capture(len(pkts))
2835 self.verify_capture_in(capture, self.pg7)
2838 pkts = self.create_stream_in(self.pg7, self.pg8)
2839 self.pg7.add_stream(pkts)
2840 self.pg_enable_capture(self.pg_interfaces)
2842 capture = self.pg8.get_capture(len(pkts))
2843 self.verify_capture_out(capture, self.nat_addr, True)
2845 def test_static_with_port_ipless_interfaces(self):
2846 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2848 self.tcp_port_out = 30606
2849 self.udp_port_out = 30607
2850 self.icmp_id_out = 30608
2852 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2853 mactobinary(self.pg7.remote_mac),
2854 self.pg7.remote_ip4n,
2856 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2857 mactobinary(self.pg8.remote_mac),
2858 self.pg8.remote_ip4n,
2861 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2862 dst_address_length=32,
2863 next_hop_address=self.pg7.remote_ip4n,
2864 next_hop_sw_if_index=self.pg7.sw_if_index)
2865 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2866 dst_address_length=32,
2867 next_hop_address=self.pg8.remote_ip4n,
2868 next_hop_sw_if_index=self.pg8.sw_if_index)
2870 self.nat44_add_address(self.nat_addr)
2871 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2872 self.tcp_port_in, self.tcp_port_out,
2873 proto=IP_PROTOS.tcp)
2874 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2875 self.udp_port_in, self.udp_port_out,
2876 proto=IP_PROTOS.udp)
2877 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2878 self.icmp_id_in, self.icmp_id_out,
2879 proto=IP_PROTOS.icmp)
2880 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2881 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2885 pkts = self.create_stream_out(self.pg8)
2886 self.pg8.add_stream(pkts)
2887 self.pg_enable_capture(self.pg_interfaces)
2889 capture = self.pg7.get_capture(len(pkts))
2890 self.verify_capture_in(capture, self.pg7)
2893 pkts = self.create_stream_in(self.pg7, self.pg8)
2894 self.pg7.add_stream(pkts)
2895 self.pg_enable_capture(self.pg_interfaces)
2897 capture = self.pg8.get_capture(len(pkts))
2898 self.verify_capture_out(capture)
2900 def test_static_unknown_proto(self):
2901 """ 1:1 NAT translate packet with unknown protocol """
2902 nat_ip = "10.0.0.10"
2903 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2904 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2905 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2909 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2910 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2912 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2913 TCP(sport=1234, dport=1234))
2914 self.pg0.add_stream(p)
2915 self.pg_enable_capture(self.pg_interfaces)
2917 p = self.pg1.get_capture(1)
2920 self.assertEqual(packet[IP].src, nat_ip)
2921 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2922 self.assertTrue(packet.haslayer(GRE))
2923 self.check_ip_checksum(packet)
2925 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2929 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2930 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2932 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2933 TCP(sport=1234, dport=1234))
2934 self.pg1.add_stream(p)
2935 self.pg_enable_capture(self.pg_interfaces)
2937 p = self.pg0.get_capture(1)
2940 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2941 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2942 self.assertTrue(packet.haslayer(GRE))
2943 self.check_ip_checksum(packet)
2945 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2948 def test_hairpinning_static_unknown_proto(self):
2949 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2951 host = self.pg0.remote_hosts[0]
2952 server = self.pg0.remote_hosts[1]
2954 host_nat_ip = "10.0.0.10"
2955 server_nat_ip = "10.0.0.11"
2957 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2958 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2959 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2960 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2964 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2965 IP(src=host.ip4, dst=server_nat_ip) /
2967 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2968 TCP(sport=1234, dport=1234))
2969 self.pg0.add_stream(p)
2970 self.pg_enable_capture(self.pg_interfaces)
2972 p = self.pg0.get_capture(1)
2975 self.assertEqual(packet[IP].src, host_nat_ip)
2976 self.assertEqual(packet[IP].dst, server.ip4)
2977 self.assertTrue(packet.haslayer(GRE))
2978 self.check_ip_checksum(packet)
2980 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2984 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2985 IP(src=server.ip4, dst=host_nat_ip) /
2987 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2988 TCP(sport=1234, dport=1234))
2989 self.pg0.add_stream(p)
2990 self.pg_enable_capture(self.pg_interfaces)
2992 p = self.pg0.get_capture(1)
2995 self.assertEqual(packet[IP].src, server_nat_ip)
2996 self.assertEqual(packet[IP].dst, host.ip4)
2997 self.assertTrue(packet.haslayer(GRE))
2998 self.check_ip_checksum(packet)
3000 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3003 def test_unknown_proto(self):
3004 """ NAT44 translate packet with unknown protocol """
3005 self.nat44_add_address(self.nat_addr)
3006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3007 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3011 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3012 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3013 TCP(sport=self.tcp_port_in, dport=20))
3014 self.pg0.add_stream(p)
3015 self.pg_enable_capture(self.pg_interfaces)
3017 p = self.pg1.get_capture(1)
3019 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3020 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3022 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3023 TCP(sport=1234, dport=1234))
3024 self.pg0.add_stream(p)
3025 self.pg_enable_capture(self.pg_interfaces)
3027 p = self.pg1.get_capture(1)
3030 self.assertEqual(packet[IP].src, self.nat_addr)
3031 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3032 self.assertTrue(packet.haslayer(GRE))
3033 self.check_ip_checksum(packet)
3035 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3039 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3040 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3042 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3043 TCP(sport=1234, dport=1234))
3044 self.pg1.add_stream(p)
3045 self.pg_enable_capture(self.pg_interfaces)
3047 p = self.pg0.get_capture(1)
3050 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3051 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3052 self.assertTrue(packet.haslayer(GRE))
3053 self.check_ip_checksum(packet)
3055 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3058 def test_hairpinning_unknown_proto(self):
3059 """ NAT44 translate packet with unknown protocol - hairpinning """
3060 host = self.pg0.remote_hosts[0]
3061 server = self.pg0.remote_hosts[1]
3064 server_in_port = 5678
3065 server_out_port = 8765
3066 server_nat_ip = "10.0.0.11"
3068 self.nat44_add_address(self.nat_addr)
3069 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3070 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3073 # add static mapping for server
3074 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3077 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3078 IP(src=host.ip4, dst=server_nat_ip) /
3079 TCP(sport=host_in_port, dport=server_out_port))
3080 self.pg0.add_stream(p)
3081 self.pg_enable_capture(self.pg_interfaces)
3083 capture = self.pg0.get_capture(1)
3085 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3086 IP(src=host.ip4, dst=server_nat_ip) /
3088 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3089 TCP(sport=1234, dport=1234))
3090 self.pg0.add_stream(p)
3091 self.pg_enable_capture(self.pg_interfaces)
3093 p = self.pg0.get_capture(1)
3096 self.assertEqual(packet[IP].src, self.nat_addr)
3097 self.assertEqual(packet[IP].dst, server.ip4)
3098 self.assertTrue(packet.haslayer(GRE))
3099 self.check_ip_checksum(packet)
3101 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3105 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3106 IP(src=server.ip4, dst=self.nat_addr) /
3108 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3109 TCP(sport=1234, dport=1234))
3110 self.pg0.add_stream(p)
3111 self.pg_enable_capture(self.pg_interfaces)
3113 p = self.pg0.get_capture(1)
3116 self.assertEqual(packet[IP].src, server_nat_ip)
3117 self.assertEqual(packet[IP].dst, host.ip4)
3118 self.assertTrue(packet.haslayer(GRE))
3119 self.check_ip_checksum(packet)
3121 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3124 def test_output_feature(self):
3125 """ NAT44 interface output feature (in2out postrouting) """
3126 self.nat44_add_address(self.nat_addr)
3127 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3128 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3129 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3133 pkts = self.create_stream_in(self.pg0, self.pg3)
3134 self.pg0.add_stream(pkts)
3135 self.pg_enable_capture(self.pg_interfaces)
3137 capture = self.pg3.get_capture(len(pkts))
3138 self.verify_capture_out(capture)
3141 pkts = self.create_stream_out(self.pg3)
3142 self.pg3.add_stream(pkts)
3143 self.pg_enable_capture(self.pg_interfaces)
3145 capture = self.pg0.get_capture(len(pkts))
3146 self.verify_capture_in(capture, self.pg0)
3148 # from non-NAT interface to NAT inside interface
3149 pkts = self.create_stream_in(self.pg2, self.pg0)
3150 self.pg2.add_stream(pkts)
3151 self.pg_enable_capture(self.pg_interfaces)
3153 capture = self.pg0.get_capture(len(pkts))
3154 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3156 def test_output_feature_vrf_aware(self):
3157 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3158 nat_ip_vrf10 = "10.0.0.10"
3159 nat_ip_vrf20 = "10.0.0.20"
3161 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3162 dst_address_length=32,
3163 next_hop_address=self.pg3.remote_ip4n,
3164 next_hop_sw_if_index=self.pg3.sw_if_index,
3166 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3167 dst_address_length=32,
3168 next_hop_address=self.pg3.remote_ip4n,
3169 next_hop_sw_if_index=self.pg3.sw_if_index,
3172 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3173 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3174 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3175 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3176 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3180 pkts = self.create_stream_in(self.pg4, self.pg3)
3181 self.pg4.add_stream(pkts)
3182 self.pg_enable_capture(self.pg_interfaces)
3184 capture = self.pg3.get_capture(len(pkts))
3185 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3188 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3189 self.pg3.add_stream(pkts)
3190 self.pg_enable_capture(self.pg_interfaces)
3192 capture = self.pg4.get_capture(len(pkts))
3193 self.verify_capture_in(capture, self.pg4)
3196 pkts = self.create_stream_in(self.pg6, self.pg3)
3197 self.pg6.add_stream(pkts)
3198 self.pg_enable_capture(self.pg_interfaces)
3200 capture = self.pg3.get_capture(len(pkts))
3201 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3204 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3205 self.pg3.add_stream(pkts)
3206 self.pg_enable_capture(self.pg_interfaces)
3208 capture = self.pg6.get_capture(len(pkts))
3209 self.verify_capture_in(capture, self.pg6)
3211 def test_output_feature_hairpinning(self):
3212 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3213 host = self.pg0.remote_hosts[0]
3214 server = self.pg0.remote_hosts[1]
3217 server_in_port = 5678
3218 server_out_port = 8765
3220 self.nat44_add_address(self.nat_addr)
3221 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3222 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3225 # add static mapping for server
3226 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3227 server_in_port, server_out_port,
3228 proto=IP_PROTOS.tcp)
3230 # send packet from host to server
3231 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3232 IP(src=host.ip4, dst=self.nat_addr) /
3233 TCP(sport=host_in_port, dport=server_out_port))
3234 self.pg0.add_stream(p)
3235 self.pg_enable_capture(self.pg_interfaces)
3237 capture = self.pg0.get_capture(1)
3242 self.assertEqual(ip.src, self.nat_addr)
3243 self.assertEqual(ip.dst, server.ip4)
3244 self.assertNotEqual(tcp.sport, host_in_port)
3245 self.assertEqual(tcp.dport, server_in_port)
3246 self.check_tcp_checksum(p)
3247 host_out_port = tcp.sport
3249 self.logger.error(ppp("Unexpected or invalid packet:", p))
3252 # send reply from server to host
3253 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3254 IP(src=server.ip4, dst=self.nat_addr) /
3255 TCP(sport=server_in_port, dport=host_out_port))
3256 self.pg0.add_stream(p)
3257 self.pg_enable_capture(self.pg_interfaces)
3259 capture = self.pg0.get_capture(1)
3264 self.assertEqual(ip.src, self.nat_addr)
3265 self.assertEqual(ip.dst, host.ip4)
3266 self.assertEqual(tcp.sport, server_out_port)
3267 self.assertEqual(tcp.dport, host_in_port)
3268 self.check_tcp_checksum(p)
3270 self.logger.error(ppp("Unexpected or invalid packet:", p))
3273 def test_output_feature_and_service(self):
3274 """ NAT44 interface output feature and services """
3275 external_addr = '1.2.3.4'
3279 self.vapi.nat44_forwarding_enable_disable(1)
3280 self.nat44_add_address(self.nat_addr)
3281 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3282 local_port, external_port,
3283 proto=IP_PROTOS.tcp, out2in_only=1)
3284 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3285 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3288 # from client to service
3289 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3290 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3291 TCP(sport=12345, dport=external_port))
3292 self.pg1.add_stream(p)
3293 self.pg_enable_capture(self.pg_interfaces)
3295 capture = self.pg0.get_capture(1)
3301 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3302 self.assertEqual(tcp.dport, local_port)
3303 self.check_tcp_checksum(p)
3304 self.check_ip_checksum(p)
3306 self.logger.error(ppp("Unexpected or invalid packet:", p))
3309 # from service back to client
3310 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3311 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3312 TCP(sport=local_port, dport=12345))
3313 self.pg0.add_stream(p)
3314 self.pg_enable_capture(self.pg_interfaces)
3316 capture = self.pg1.get_capture(1)
3321 self.assertEqual(ip.src, external_addr)
3322 self.assertEqual(tcp.sport, external_port)
3323 self.check_tcp_checksum(p)
3324 self.check_ip_checksum(p)
3326 self.logger.error(ppp("Unexpected or invalid packet:", p))
3329 # from local network host to external network
3331 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3332 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3333 TCP(sport=23456, dport=34567))
3334 self.pg0.add_stream(p)
3335 self.pg_enable_capture(self.pg_interfaces)
3337 capture = self.pg1.get_capture(1)
3342 self.assertEqual(ip.src, self.nat_addr)
3343 self.assertNotEqual(tcp.sport, 23456)
3344 ext_port = tcp.sport
3345 self.check_tcp_checksum(p)
3346 self.check_ip_checksum(p)
3348 self.logger.error(ppp("Unexpected or invalid packet:", p))
3351 # from external network back to local network host
3352 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3353 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3354 TCP(sport=34567, dport=ext_port))
3355 self.pg1.add_stream(p)
3356 self.pg_enable_capture(self.pg_interfaces)
3358 capture = self.pg0.get_capture(1)
3364 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3365 self.assertEqual(tcp.dport, 23456)
3366 self.check_tcp_checksum(p)
3367 self.check_ip_checksum(p)
3369 self.logger.error(ppp("Unexpected or invalid packet:", p))
3372 def test_one_armed_nat44(self):
3373 """ One armed NAT44 """
3374 remote_host = self.pg9.remote_hosts[0]
3375 local_host = self.pg9.remote_hosts[1]
3378 self.nat44_add_address(self.nat_addr)
3379 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3380 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3384 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3385 IP(src=local_host.ip4, dst=remote_host.ip4) /
3386 TCP(sport=12345, dport=80))
3387 self.pg9.add_stream(p)
3388 self.pg_enable_capture(self.pg_interfaces)
3390 capture = self.pg9.get_capture(1)
3395 self.assertEqual(ip.src, self.nat_addr)
3396 self.assertEqual(ip.dst, remote_host.ip4)
3397 self.assertNotEqual(tcp.sport, 12345)
3398 external_port = tcp.sport
3399 self.assertEqual(tcp.dport, 80)
3400 self.check_tcp_checksum(p)
3401 self.check_ip_checksum(p)
3403 self.logger.error(ppp("Unexpected or invalid packet:", p))
3407 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3408 IP(src=remote_host.ip4, dst=self.nat_addr) /
3409 TCP(sport=80, dport=external_port))
3410 self.pg9.add_stream(p)
3411 self.pg_enable_capture(self.pg_interfaces)
3413 capture = self.pg9.get_capture(1)
3418 self.assertEqual(ip.src, remote_host.ip4)
3419 self.assertEqual(ip.dst, local_host.ip4)
3420 self.assertEqual(tcp.sport, 80)
3421 self.assertEqual(tcp.dport, 12345)
3422 self.check_tcp_checksum(p)
3423 self.check_ip_checksum(p)
3425 self.logger.error(ppp("Unexpected or invalid packet:", p))
3428 def test_one_armed_nat44_static(self):
3429 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3430 remote_host = self.pg9.remote_hosts[0]
3431 local_host = self.pg9.remote_hosts[1]
3436 self.vapi.nat44_forwarding_enable_disable(1)
3437 self.nat44_add_address(self.nat_addr, twice_nat=1)
3438 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3439 local_port, external_port,
3440 proto=IP_PROTOS.tcp, out2in_only=1,
3442 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3443 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3446 # from client to service
3447 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3448 IP(src=remote_host.ip4, dst=self.nat_addr) /
3449 TCP(sport=12345, dport=external_port))
3450 self.pg9.add_stream(p)
3451 self.pg_enable_capture(self.pg_interfaces)
3453 capture = self.pg9.get_capture(1)
3459 self.assertEqual(ip.dst, local_host.ip4)
3460 self.assertEqual(ip.src, self.nat_addr)
3461 self.assertEqual(tcp.dport, local_port)
3462 self.assertNotEqual(tcp.sport, 12345)
3463 eh_port_in = tcp.sport
3464 self.check_tcp_checksum(p)
3465 self.check_ip_checksum(p)
3467 self.logger.error(ppp("Unexpected or invalid packet:", p))
3470 # from service back to client
3471 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3472 IP(src=local_host.ip4, dst=self.nat_addr) /
3473 TCP(sport=local_port, dport=eh_port_in))
3474 self.pg9.add_stream(p)
3475 self.pg_enable_capture(self.pg_interfaces)
3477 capture = self.pg9.get_capture(1)
3482 self.assertEqual(ip.src, self.nat_addr)
3483 self.assertEqual(ip.dst, remote_host.ip4)
3484 self.assertEqual(tcp.sport, external_port)
3485 self.assertEqual(tcp.dport, 12345)
3486 self.check_tcp_checksum(p)
3487 self.check_ip_checksum(p)
3489 self.logger.error(ppp("Unexpected or invalid packet:", p))
3492 def test_del_session(self):
3493 """ Delete NAT44 session """
3494 self.nat44_add_address(self.nat_addr)
3495 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3496 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3499 pkts = self.create_stream_in(self.pg0, self.pg1)
3500 self.pg0.add_stream(pkts)
3501 self.pg_enable_capture(self.pg_interfaces)
3503 capture = self.pg1.get_capture(len(pkts))
3505 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3506 nsessions = len(sessions)
3508 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3509 sessions[0].inside_port,
3510 sessions[0].protocol)
3511 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3512 sessions[1].outside_port,
3513 sessions[1].protocol,
3516 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3517 self.assertEqual(nsessions - len(sessions), 2)
3519 def test_set_get_reass(self):
3520 """ NAT44 set/get virtual fragmentation reassembly """
3521 reas_cfg1 = self.vapi.nat_get_reass()
3523 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3524 max_reass=reas_cfg1.ip4_max_reass * 2,
3525 max_frag=reas_cfg1.ip4_max_frag * 2)
3527 reas_cfg2 = self.vapi.nat_get_reass()
3529 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3530 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3531 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3533 self.vapi.nat_set_reass(drop_frag=1)
3534 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3536 def test_frag_in_order(self):
3537 """ NAT44 translate fragments arriving in order """
3538 self.nat44_add_address(self.nat_addr)
3539 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3540 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3543 data = "A" * 4 + "B" * 16 + "C" * 3
3544 self.tcp_port_in = random.randint(1025, 65535)
3546 reass = self.vapi.nat_reass_dump()
3547 reass_n_start = len(reass)
3550 pkts = self.create_stream_frag(self.pg0,
3551 self.pg1.remote_ip4,
3555 self.pg0.add_stream(pkts)
3556 self.pg_enable_capture(self.pg_interfaces)
3558 frags = self.pg1.get_capture(len(pkts))
3559 p = self.reass_frags_and_verify(frags,
3561 self.pg1.remote_ip4)
3562 self.assertEqual(p[TCP].dport, 20)
3563 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3564 self.tcp_port_out = p[TCP].sport
3565 self.assertEqual(data, p[Raw].load)
3568 pkts = self.create_stream_frag(self.pg1,
3573 self.pg1.add_stream(pkts)
3574 self.pg_enable_capture(self.pg_interfaces)
3576 frags = self.pg0.get_capture(len(pkts))
3577 p = self.reass_frags_and_verify(frags,
3578 self.pg1.remote_ip4,
3579 self.pg0.remote_ip4)
3580 self.assertEqual(p[TCP].sport, 20)
3581 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3582 self.assertEqual(data, p[Raw].load)
3584 reass = self.vapi.nat_reass_dump()
3585 reass_n_end = len(reass)
3587 self.assertEqual(reass_n_end - reass_n_start, 2)
3589 def test_reass_hairpinning(self):
3590 """ NAT44 fragments hairpinning """
3591 host = self.pg0.remote_hosts[0]
3592 server = self.pg0.remote_hosts[1]
3593 host_in_port = random.randint(1025, 65535)
3595 server_in_port = random.randint(1025, 65535)
3596 server_out_port = random.randint(1025, 65535)
3597 data = "A" * 4 + "B" * 16 + "C" * 3
3599 self.nat44_add_address(self.nat_addr)
3600 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3601 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3603 # add static mapping for server
3604 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3605 server_in_port, server_out_port,
3606 proto=IP_PROTOS.tcp)
3608 # send packet from host to server
3609 pkts = self.create_stream_frag(self.pg0,
3614 self.pg0.add_stream(pkts)
3615 self.pg_enable_capture(self.pg_interfaces)
3617 frags = self.pg0.get_capture(len(pkts))
3618 p = self.reass_frags_and_verify(frags,
3621 self.assertNotEqual(p[TCP].sport, host_in_port)
3622 self.assertEqual(p[TCP].dport, server_in_port)
3623 self.assertEqual(data, p[Raw].load)
3625 def test_frag_out_of_order(self):
3626 """ NAT44 translate fragments arriving out of order """
3627 self.nat44_add_address(self.nat_addr)
3628 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3629 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3632 data = "A" * 4 + "B" * 16 + "C" * 3
3633 random.randint(1025, 65535)
3636 pkts = self.create_stream_frag(self.pg0,
3637 self.pg1.remote_ip4,
3642 self.pg0.add_stream(pkts)
3643 self.pg_enable_capture(self.pg_interfaces)
3645 frags = self.pg1.get_capture(len(pkts))
3646 p = self.reass_frags_and_verify(frags,
3648 self.pg1.remote_ip4)
3649 self.assertEqual(p[TCP].dport, 20)
3650 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3651 self.tcp_port_out = p[TCP].sport
3652 self.assertEqual(data, p[Raw].load)
3655 pkts = self.create_stream_frag(self.pg1,
3661 self.pg1.add_stream(pkts)
3662 self.pg_enable_capture(self.pg_interfaces)
3664 frags = self.pg0.get_capture(len(pkts))
3665 p = self.reass_frags_and_verify(frags,
3666 self.pg1.remote_ip4,
3667 self.pg0.remote_ip4)
3668 self.assertEqual(p[TCP].sport, 20)
3669 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3670 self.assertEqual(data, p[Raw].load)
3672 def test_port_restricted(self):
3673 """ Port restricted NAT44 (MAP-E CE) """
3674 self.nat44_add_address(self.nat_addr)
3675 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3676 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3678 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3679 "psid-offset 6 psid-len 6")
3681 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3682 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3683 TCP(sport=4567, dport=22))
3684 self.pg0.add_stream(p)
3685 self.pg_enable_capture(self.pg_interfaces)
3687 capture = self.pg1.get_capture(1)
3692 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3693 self.assertEqual(ip.src, self.nat_addr)
3694 self.assertEqual(tcp.dport, 22)
3695 self.assertNotEqual(tcp.sport, 4567)
3696 self.assertEqual((tcp.sport >> 6) & 63, 10)
3697 self.check_tcp_checksum(p)
3698 self.check_ip_checksum(p)
3700 self.logger.error(ppp("Unexpected or invalid packet:", p))
3703 def test_twice_nat(self):
3705 twice_nat_addr = '10.0.1.3'
3710 self.nat44_add_address(self.nat_addr)
3711 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3712 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3713 port_in, port_out, proto=IP_PROTOS.tcp,
3715 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3716 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3719 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3720 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3721 TCP(sport=eh_port_out, dport=port_out))
3722 self.pg1.add_stream(p)
3723 self.pg_enable_capture(self.pg_interfaces)
3725 capture = self.pg0.get_capture(1)
3730 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3731 self.assertEqual(ip.src, twice_nat_addr)
3732 self.assertEqual(tcp.dport, port_in)
3733 self.assertNotEqual(tcp.sport, eh_port_out)
3734 eh_port_in = tcp.sport
3735 self.check_tcp_checksum(p)
3736 self.check_ip_checksum(p)
3738 self.logger.error(ppp("Unexpected or invalid packet:", p))
3741 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3742 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3743 TCP(sport=port_in, dport=eh_port_in))
3744 self.pg0.add_stream(p)
3745 self.pg_enable_capture(self.pg_interfaces)
3747 capture = self.pg1.get_capture(1)
3752 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3753 self.assertEqual(ip.src, self.nat_addr)
3754 self.assertEqual(tcp.dport, eh_port_out)
3755 self.assertEqual(tcp.sport, port_out)
3756 self.check_tcp_checksum(p)
3757 self.check_ip_checksum(p)
3759 self.logger.error(ppp("Unexpected or invalid packet:", p))
3762 def test_twice_nat_lb(self):
3763 """ Twice NAT44 local service load balancing """
3764 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3765 twice_nat_addr = '10.0.1.3'
3770 server1 = self.pg0.remote_hosts[0]
3771 server2 = self.pg0.remote_hosts[1]
3773 locals = [{'addr': server1.ip4n,
3776 {'addr': server2.ip4n,
3780 self.nat44_add_address(self.nat_addr)
3781 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3783 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3787 local_num=len(locals),
3789 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3790 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3793 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3794 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3795 TCP(sport=eh_port_out, dport=external_port))
3796 self.pg1.add_stream(p)
3797 self.pg_enable_capture(self.pg_interfaces)
3799 capture = self.pg0.get_capture(1)
3805 self.assertEqual(ip.src, twice_nat_addr)
3806 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3807 if ip.dst == server1.ip4:
3811 self.assertNotEqual(tcp.sport, eh_port_out)
3812 eh_port_in = tcp.sport
3813 self.assertEqual(tcp.dport, local_port)
3814 self.check_tcp_checksum(p)
3815 self.check_ip_checksum(p)
3817 self.logger.error(ppp("Unexpected or invalid packet:", p))
3820 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3821 IP(src=server.ip4, dst=twice_nat_addr) /
3822 TCP(sport=local_port, dport=eh_port_in))
3823 self.pg0.add_stream(p)
3824 self.pg_enable_capture(self.pg_interfaces)
3826 capture = self.pg1.get_capture(1)
3831 self.assertEqual(ip.src, self.nat_addr)
3832 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3833 self.assertEqual(tcp.sport, external_port)
3834 self.assertEqual(tcp.dport, eh_port_out)
3835 self.check_tcp_checksum(p)
3836 self.check_ip_checksum(p)
3838 self.logger.error(ppp("Unexpected or invalid packet:", p))
3841 def test_twice_nat_interface_addr(self):
3842 """ Acquire twice NAT44 addresses from interface """
3843 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3845 # no address in NAT pool
3846 adresses = self.vapi.nat44_address_dump()
3847 self.assertEqual(0, len(adresses))
3849 # configure interface address and check NAT address pool
3850 self.pg7.config_ip4()
3851 adresses = self.vapi.nat44_address_dump()
3852 self.assertEqual(1, len(adresses))
3853 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3854 self.assertEqual(adresses[0].twice_nat, 1)
3856 # remove interface address and check NAT address pool
3857 self.pg7.unconfig_ip4()
3858 adresses = self.vapi.nat44_address_dump()
3859 self.assertEqual(0, len(adresses))
3861 def test_ipfix_max_frags(self):
3862 """ IPFIX logging maximum fragments pending reassembly exceeded """
3863 self.nat44_add_address(self.nat_addr)
3864 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3865 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3867 self.vapi.nat_set_reass(max_frag=0)
3868 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3869 src_address=self.pg3.local_ip4n,
3871 template_interval=10)
3872 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3873 src_port=self.ipfix_src_port)
3875 data = "A" * 4 + "B" * 16 + "C" * 3
3876 self.tcp_port_in = random.randint(1025, 65535)
3877 pkts = self.create_stream_frag(self.pg0,
3878 self.pg1.remote_ip4,
3882 self.pg0.add_stream(pkts[-1])
3883 self.pg_enable_capture(self.pg_interfaces)
3885 frags = self.pg1.get_capture(0)
3886 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3887 capture = self.pg3.get_capture(9)
3888 ipfix = IPFIXDecoder()
3889 # first load template
3891 self.assertTrue(p.haslayer(IPFIX))
3892 self.assertEqual(p[IP].src, self.pg3.local_ip4)
3893 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
3894 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
3895 self.assertEqual(p[UDP].dport, 4739)
3896 self.assertEqual(p[IPFIX].observationDomainID,
3897 self.ipfix_domain_id)
3898 if p.haslayer(Template):
3899 ipfix.add_template(p.getlayer(Template))
3900 # verify events in data set
3902 if p.haslayer(Data):
3903 data = ipfix.decode_data_set(p.getlayer(Set))
3904 self.verify_ipfix_max_fragments_ip4(data, 0,
3905 self.pg0.remote_ip4n)
3908 super(TestNAT44, self).tearDown()
3909 if not self.vpp_dead:
3910 self.logger.info(self.vapi.cli("show nat44 addresses"))
3911 self.logger.info(self.vapi.cli("show nat44 interfaces"))
3912 self.logger.info(self.vapi.cli("show nat44 static mappings"))
3913 self.logger.info(self.vapi.cli("show nat44 interface address"))
3914 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3915 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
3916 self.vapi.cli("nat addr-port-assignment-alg default")
3920 class TestNAT44Out2InDPO(MethodHolder):
3921 """ NAT44 Test Cases using out2in DPO """
3924 def setUpConstants(cls):
3925 super(TestNAT44Out2InDPO, cls).setUpConstants()
3926 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
3929 def setUpClass(cls):
3930 super(TestNAT44Out2InDPO, cls).setUpClass()
3933 cls.tcp_port_in = 6303
3934 cls.tcp_port_out = 6303
3935 cls.udp_port_in = 6304
3936 cls.udp_port_out = 6304
3937 cls.icmp_id_in = 6305
3938 cls.icmp_id_out = 6305
3939 cls.nat_addr = '10.0.0.3'
3940 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3941 cls.dst_ip4 = '192.168.70.1'
3943 cls.create_pg_interfaces(range(2))
3946 cls.pg0.config_ip4()
3947 cls.pg0.resolve_arp()
3950 cls.pg1.config_ip6()
3951 cls.pg1.resolve_ndp()
3953 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
3954 dst_address_length=0,
3955 next_hop_address=cls.pg1.remote_ip6n,
3956 next_hop_sw_if_index=cls.pg1.sw_if_index)
3959 super(TestNAT44Out2InDPO, cls).tearDownClass()
3962 def configure_xlat(self):
3963 self.dst_ip6_pfx = '1:2:3::'
3964 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3966 self.dst_ip6_pfx_len = 96
3967 self.src_ip6_pfx = '4:5:6::'
3968 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3970 self.src_ip6_pfx_len = 96
3971 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3972 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3973 '\x00\x00\x00\x00', 0, is_translation=1,
3976 def test_464xlat_ce(self):
3977 """ Test 464XLAT CE with NAT44 """
3979 self.configure_xlat()
3981 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3982 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
3984 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3985 self.dst_ip6_pfx_len)
3986 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3987 self.src_ip6_pfx_len)
3990 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3991 self.pg0.add_stream(pkts)
3992 self.pg_enable_capture(self.pg_interfaces)
3994 capture = self.pg1.get_capture(len(pkts))
3995 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3998 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4000 self.pg1.add_stream(pkts)
4001 self.pg_enable_capture(self.pg_interfaces)
4003 capture = self.pg0.get_capture(len(pkts))
4004 self.verify_capture_in(capture, self.pg0)
4006 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4008 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4009 self.nat_addr_n, is_add=0)
4011 def test_464xlat_ce_no_nat(self):
4012 """ Test 464XLAT CE without NAT44 """
4014 self.configure_xlat()
4016 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4017 self.dst_ip6_pfx_len)
4018 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4019 self.src_ip6_pfx_len)
4021 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4022 self.pg0.add_stream(pkts)
4023 self.pg_enable_capture(self.pg_interfaces)
4025 capture = self.pg1.get_capture(len(pkts))
4026 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4027 nat_ip=out_dst_ip6, same_port=True)
4029 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4030 self.pg1.add_stream(pkts)
4031 self.pg_enable_capture(self.pg_interfaces)
4033 capture = self.pg0.get_capture(len(pkts))
4034 self.verify_capture_in(capture, self.pg0)
4037 class TestDeterministicNAT(MethodHolder):
4038 """ Deterministic NAT Test Cases """
4041 def setUpConstants(cls):
4042 super(TestDeterministicNAT, cls).setUpConstants()
4043 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4046 def setUpClass(cls):
4047 super(TestDeterministicNAT, cls).setUpClass()
4050 cls.tcp_port_in = 6303
4051 cls.tcp_external_port = 6303
4052 cls.udp_port_in = 6304
4053 cls.udp_external_port = 6304
4054 cls.icmp_id_in = 6305
4055 cls.nat_addr = '10.0.0.3'
4057 cls.create_pg_interfaces(range(3))
4058 cls.interfaces = list(cls.pg_interfaces)
4060 for i in cls.interfaces:
4065 cls.pg0.generate_remote_hosts(2)
4066 cls.pg0.configure_ipv4_neighbors()
4069 super(TestDeterministicNAT, cls).tearDownClass()
4072 def create_stream_in(self, in_if, out_if, ttl=64):
4074 Create packet stream for inside network
4076 :param in_if: Inside interface
4077 :param out_if: Outside interface
4078 :param ttl: TTL of generated packets
4082 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4083 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4084 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4088 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4089 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4090 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4094 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4095 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4096 ICMP(id=self.icmp_id_in, type='echo-request'))
4101 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4103 Create packet stream for outside network
4105 :param out_if: Outside interface
4106 :param dst_ip: Destination IP address (Default use global NAT address)
4107 :param ttl: TTL of generated packets
4110 dst_ip = self.nat_addr
4113 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4114 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4115 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4119 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4120 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4121 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4125 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4126 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4127 ICMP(id=self.icmp_external_id, type='echo-reply'))
4132 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4134 Verify captured packets on outside network
4136 :param capture: Captured packets
4137 :param nat_ip: Translated IP address (Default use global NAT address)
4138 :param same_port: Sorce port number is not translated (Default False)
4139 :param packet_num: Expected number of packets (Default 3)
4142 nat_ip = self.nat_addr
4143 self.assertEqual(packet_num, len(capture))
4144 for packet in capture:
4146 self.assertEqual(packet[IP].src, nat_ip)
4147 if packet.haslayer(TCP):
4148 self.tcp_port_out = packet[TCP].sport
4149 elif packet.haslayer(UDP):
4150 self.udp_port_out = packet[UDP].sport
4152 self.icmp_external_id = packet[ICMP].id
4154 self.logger.error(ppp("Unexpected or invalid packet "
4155 "(outside network):", packet))
4158 def initiate_tcp_session(self, in_if, out_if):
4160 Initiates TCP session
4162 :param in_if: Inside interface
4163 :param out_if: Outside interface
4166 # SYN packet in->out
4167 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4168 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4169 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4172 self.pg_enable_capture(self.pg_interfaces)
4174 capture = out_if.get_capture(1)
4176 self.tcp_port_out = p[TCP].sport
4178 # SYN + ACK packet out->in
4179 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4180 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4181 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4183 out_if.add_stream(p)
4184 self.pg_enable_capture(self.pg_interfaces)
4186 in_if.get_capture(1)
4188 # ACK packet in->out
4189 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4190 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4191 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4194 self.pg_enable_capture(self.pg_interfaces)
4196 out_if.get_capture(1)
4199 self.logger.error("TCP 3 way handshake failed")
4202 def verify_ipfix_max_entries_per_user(self, data):
4204 Verify IPFIX maximum entries per user exceeded event
4206 :param data: Decoded IPFIX data records
4208 self.assertEqual(1, len(data))
4211 self.assertEqual(ord(record[230]), 13)
4212 # natQuotaExceededEvent
4213 self.assertEqual('\x03\x00\x00\x00', record[466])
4215 self.assertEqual('\xe8\x03\x00\x00', record[473])
4217 self.assertEqual(self.pg0.remote_ip4n, record[8])
4219 def test_deterministic_mode(self):
4220 """ NAT plugin run deterministic mode """
4221 in_addr = '172.16.255.0'
4222 out_addr = '172.17.255.50'
4223 in_addr_t = '172.16.255.20'
4224 in_addr_n = socket.inet_aton(in_addr)
4225 out_addr_n = socket.inet_aton(out_addr)
4226 in_addr_t_n = socket.inet_aton(in_addr_t)
4230 nat_config = self.vapi.nat_show_config()
4231 self.assertEqual(1, nat_config.deterministic)
4233 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4235 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4236 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4237 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4238 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4240 deterministic_mappings = self.vapi.nat_det_map_dump()
4241 self.assertEqual(len(deterministic_mappings), 1)
4242 dsm = deterministic_mappings[0]
4243 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4244 self.assertEqual(in_plen, dsm.in_plen)
4245 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4246 self.assertEqual(out_plen, dsm.out_plen)
4248 self.clear_nat_det()
4249 deterministic_mappings = self.vapi.nat_det_map_dump()
4250 self.assertEqual(len(deterministic_mappings), 0)
4252 def test_set_timeouts(self):
4253 """ Set deterministic NAT timeouts """
4254 timeouts_before = self.vapi.nat_det_get_timeouts()
4256 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4257 timeouts_before.tcp_established + 10,
4258 timeouts_before.tcp_transitory + 10,
4259 timeouts_before.icmp + 10)
4261 timeouts_after = self.vapi.nat_det_get_timeouts()
4263 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4264 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4265 self.assertNotEqual(timeouts_before.tcp_established,
4266 timeouts_after.tcp_established)
4267 self.assertNotEqual(timeouts_before.tcp_transitory,
4268 timeouts_after.tcp_transitory)
4270 def test_det_in(self):
4271 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4273 nat_ip = "10.0.0.10"
4275 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4277 socket.inet_aton(nat_ip),
4279 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4280 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4284 pkts = self.create_stream_in(self.pg0, self.pg1)
4285 self.pg0.add_stream(pkts)
4286 self.pg_enable_capture(self.pg_interfaces)
4288 capture = self.pg1.get_capture(len(pkts))
4289 self.verify_capture_out(capture, nat_ip)
4292 pkts = self.create_stream_out(self.pg1, nat_ip)
4293 self.pg1.add_stream(pkts)
4294 self.pg_enable_capture(self.pg_interfaces)
4296 capture = self.pg0.get_capture(len(pkts))
4297 self.verify_capture_in(capture, self.pg0)
4300 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4301 self.assertEqual(len(sessions), 3)
4305 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4306 self.assertEqual(s.in_port, self.tcp_port_in)
4307 self.assertEqual(s.out_port, self.tcp_port_out)
4308 self.assertEqual(s.ext_port, self.tcp_external_port)
4312 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4313 self.assertEqual(s.in_port, self.udp_port_in)
4314 self.assertEqual(s.out_port, self.udp_port_out)
4315 self.assertEqual(s.ext_port, self.udp_external_port)
4319 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4320 self.assertEqual(s.in_port, self.icmp_id_in)
4321 self.assertEqual(s.out_port, self.icmp_external_id)
4323 def test_multiple_users(self):
4324 """ Deterministic NAT multiple users """
4326 nat_ip = "10.0.0.10"
4328 external_port = 6303
4330 host0 = self.pg0.remote_hosts[0]
4331 host1 = self.pg0.remote_hosts[1]
4333 self.vapi.nat_det_add_del_map(host0.ip4n,
4335 socket.inet_aton(nat_ip),
4337 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4338 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4342 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4343 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4344 TCP(sport=port_in, dport=external_port))
4345 self.pg0.add_stream(p)
4346 self.pg_enable_capture(self.pg_interfaces)
4348 capture = self.pg1.get_capture(1)
4353 self.assertEqual(ip.src, nat_ip)
4354 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4355 self.assertEqual(tcp.dport, external_port)
4356 port_out0 = tcp.sport
4358 self.logger.error(ppp("Unexpected or invalid packet:", p))
4362 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4363 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4364 TCP(sport=port_in, dport=external_port))
4365 self.pg0.add_stream(p)
4366 self.pg_enable_capture(self.pg_interfaces)
4368 capture = self.pg1.get_capture(1)
4373 self.assertEqual(ip.src, nat_ip)
4374 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4375 self.assertEqual(tcp.dport, external_port)
4376 port_out1 = tcp.sport
4378 self.logger.error(ppp("Unexpected or invalid packet:", p))
4381 dms = self.vapi.nat_det_map_dump()
4382 self.assertEqual(1, len(dms))
4383 self.assertEqual(2, dms[0].ses_num)
4386 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4387 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4388 TCP(sport=external_port, dport=port_out0))
4389 self.pg1.add_stream(p)
4390 self.pg_enable_capture(self.pg_interfaces)
4392 capture = self.pg0.get_capture(1)
4397 self.assertEqual(ip.src, self.pg1.remote_ip4)
4398 self.assertEqual(ip.dst, host0.ip4)
4399 self.assertEqual(tcp.dport, port_in)
4400 self.assertEqual(tcp.sport, external_port)
4402 self.logger.error(ppp("Unexpected or invalid packet:", p))
4406 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4407 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4408 TCP(sport=external_port, dport=port_out1))
4409 self.pg1.add_stream(p)
4410 self.pg_enable_capture(self.pg_interfaces)
4412 capture = self.pg0.get_capture(1)
4417 self.assertEqual(ip.src, self.pg1.remote_ip4)
4418 self.assertEqual(ip.dst, host1.ip4)
4419 self.assertEqual(tcp.dport, port_in)
4420 self.assertEqual(tcp.sport, external_port)
4422 self.logger.error(ppp("Unexpected or invalid packet", p))
4425 # session close api test
4426 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4428 self.pg1.remote_ip4n,
4430 dms = self.vapi.nat_det_map_dump()
4431 self.assertEqual(dms[0].ses_num, 1)
4433 self.vapi.nat_det_close_session_in(host0.ip4n,
4435 self.pg1.remote_ip4n,
4437 dms = self.vapi.nat_det_map_dump()
4438 self.assertEqual(dms[0].ses_num, 0)
4440 def test_tcp_session_close_detection_in(self):
4441 """ Deterministic NAT TCP session close from inside network """
4442 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4444 socket.inet_aton(self.nat_addr),
4446 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4447 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4450 self.initiate_tcp_session(self.pg0, self.pg1)
4452 # close the session from inside
4454 # FIN packet in -> out
4455 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4456 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4457 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4459 self.pg0.add_stream(p)
4460 self.pg_enable_capture(self.pg_interfaces)
4462 self.pg1.get_capture(1)
4466 # ACK packet out -> in
4467 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4468 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4469 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4473 # FIN packet out -> in
4474 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4475 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4476 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4480 self.pg1.add_stream(pkts)
4481 self.pg_enable_capture(self.pg_interfaces)
4483 self.pg0.get_capture(2)
4485 # ACK packet in -> out
4486 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4487 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4488 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4490 self.pg0.add_stream(p)
4491 self.pg_enable_capture(self.pg_interfaces)
4493 self.pg1.get_capture(1)
4495 # Check if deterministic NAT44 closed the session
4496 dms = self.vapi.nat_det_map_dump()
4497 self.assertEqual(0, dms[0].ses_num)
4499 self.logger.error("TCP session termination failed")
4502 def test_tcp_session_close_detection_out(self):
4503 """ Deterministic NAT TCP session close from outside network """
4504 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4506 socket.inet_aton(self.nat_addr),
4508 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4509 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4512 self.initiate_tcp_session(self.pg0, self.pg1)
4514 # close the session from outside
4516 # FIN packet out -> in
4517 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4518 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4519 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4521 self.pg1.add_stream(p)
4522 self.pg_enable_capture(self.pg_interfaces)
4524 self.pg0.get_capture(1)
4528 # ACK packet in -> out
4529 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4530 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4531 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4535 # ACK packet in -> out
4536 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4537 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4538 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4542 self.pg0.add_stream(pkts)
4543 self.pg_enable_capture(self.pg_interfaces)
4545 self.pg1.get_capture(2)
4547 # ACK packet out -> in
4548 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4549 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4550 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4552 self.pg1.add_stream(p)
4553 self.pg_enable_capture(self.pg_interfaces)
4555 self.pg0.get_capture(1)
4557 # Check if deterministic NAT44 closed the session
4558 dms = self.vapi.nat_det_map_dump()
4559 self.assertEqual(0, dms[0].ses_num)
4561 self.logger.error("TCP session termination failed")
4564 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4565 def test_session_timeout(self):
4566 """ Deterministic NAT session timeouts """
4567 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4569 socket.inet_aton(self.nat_addr),
4571 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4572 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4575 self.initiate_tcp_session(self.pg0, self.pg1)
4576 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4577 pkts = self.create_stream_in(self.pg0, self.pg1)
4578 self.pg0.add_stream(pkts)
4579 self.pg_enable_capture(self.pg_interfaces)
4581 capture = self.pg1.get_capture(len(pkts))
4584 dms = self.vapi.nat_det_map_dump()
4585 self.assertEqual(0, dms[0].ses_num)
4587 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4588 def test_session_limit_per_user(self):
4589 """ Deterministic NAT maximum sessions per user limit """
4590 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4592 socket.inet_aton(self.nat_addr),
4594 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4595 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4597 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4598 src_address=self.pg2.local_ip4n,
4600 template_interval=10)
4601 self.vapi.nat_ipfix()
4604 for port in range(1025, 2025):
4605 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4606 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4607 UDP(sport=port, dport=port))
4610 self.pg0.add_stream(pkts)
4611 self.pg_enable_capture(self.pg_interfaces)
4613 capture = self.pg1.get_capture(len(pkts))
4615 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4616 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4617 UDP(sport=3001, dport=3002))
4618 self.pg0.add_stream(p)
4619 self.pg_enable_capture(self.pg_interfaces)
4621 capture = self.pg1.assert_nothing_captured()
4623 # verify ICMP error packet
4624 capture = self.pg0.get_capture(1)
4626 self.assertTrue(p.haslayer(ICMP))
4628 self.assertEqual(icmp.type, 3)
4629 self.assertEqual(icmp.code, 1)
4630 self.assertTrue(icmp.haslayer(IPerror))
4631 inner_ip = icmp[IPerror]
4632 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4633 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4635 dms = self.vapi.nat_det_map_dump()
4637 self.assertEqual(1000, dms[0].ses_num)
4639 # verify IPFIX logging
4640 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4642 capture = self.pg2.get_capture(2)
4643 ipfix = IPFIXDecoder()
4644 # first load template
4646 self.assertTrue(p.haslayer(IPFIX))
4647 if p.haslayer(Template):
4648 ipfix.add_template(p.getlayer(Template))
4649 # verify events in data set
4651 if p.haslayer(Data):
4652 data = ipfix.decode_data_set(p.getlayer(Set))
4653 self.verify_ipfix_max_entries_per_user(data)
4655 def clear_nat_det(self):
4657 Clear deterministic NAT configuration.
4659 self.vapi.nat_ipfix(enable=0)
4660 self.vapi.nat_det_set_timeouts()
4661 deterministic_mappings = self.vapi.nat_det_map_dump()
4662 for dsm in deterministic_mappings:
4663 self.vapi.nat_det_add_del_map(dsm.in_addr,
4669 interfaces = self.vapi.nat44_interface_dump()
4670 for intf in interfaces:
4671 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4676 super(TestDeterministicNAT, self).tearDown()
4677 if not self.vpp_dead:
4678 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4680 self.vapi.cli("show nat44 deterministic mappings"))
4682 self.vapi.cli("show nat44 deterministic timeouts"))
4684 self.vapi.cli("show nat44 deterministic sessions"))
4685 self.clear_nat_det()
4688 class TestNAT64(MethodHolder):
4689 """ NAT64 Test Cases """
4692 def setUpConstants(cls):
4693 super(TestNAT64, cls).setUpConstants()
4694 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4695 "nat64 st hash buckets 256", "}"])
4698 def setUpClass(cls):
4699 super(TestNAT64, cls).setUpClass()
4702 cls.tcp_port_in = 6303
4703 cls.tcp_port_out = 6303
4704 cls.udp_port_in = 6304
4705 cls.udp_port_out = 6304
4706 cls.icmp_id_in = 6305
4707 cls.icmp_id_out = 6305
4708 cls.nat_addr = '10.0.0.3'
4709 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4711 cls.vrf1_nat_addr = '10.0.10.3'
4712 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4714 cls.ipfix_src_port = 4739
4715 cls.ipfix_domain_id = 1
4717 cls.create_pg_interfaces(range(5))
4718 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4719 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4720 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4722 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4724 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4726 cls.pg0.generate_remote_hosts(2)
4728 for i in cls.ip6_interfaces:
4731 i.configure_ipv6_neighbors()
4733 for i in cls.ip4_interfaces:
4739 cls.pg3.config_ip4()
4740 cls.pg3.resolve_arp()
4741 cls.pg3.config_ip6()
4742 cls.pg3.configure_ipv6_neighbors()
4745 super(TestNAT64, cls).tearDownClass()
4748 def test_pool(self):
4749 """ Add/delete address to NAT64 pool """
4750 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4752 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4754 addresses = self.vapi.nat64_pool_addr_dump()
4755 self.assertEqual(len(addresses), 1)
4756 self.assertEqual(addresses[0].address, nat_addr)
4758 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4760 addresses = self.vapi.nat64_pool_addr_dump()
4761 self.assertEqual(len(addresses), 0)
4763 def test_interface(self):
4764 """ Enable/disable NAT64 feature on the interface """
4765 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4766 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4768 interfaces = self.vapi.nat64_interface_dump()
4769 self.assertEqual(len(interfaces), 2)
4772 for intf in interfaces:
4773 if intf.sw_if_index == self.pg0.sw_if_index:
4774 self.assertEqual(intf.is_inside, 1)
4776 elif intf.sw_if_index == self.pg1.sw_if_index:
4777 self.assertEqual(intf.is_inside, 0)
4779 self.assertTrue(pg0_found)
4780 self.assertTrue(pg1_found)
4782 features = self.vapi.cli("show interface features pg0")
4783 self.assertNotEqual(features.find('nat64-in2out'), -1)
4784 features = self.vapi.cli("show interface features pg1")
4785 self.assertNotEqual(features.find('nat64-out2in'), -1)
4787 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4788 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4790 interfaces = self.vapi.nat64_interface_dump()
4791 self.assertEqual(len(interfaces), 0)
4793 def test_static_bib(self):
4794 """ Add/delete static BIB entry """
4795 in_addr = socket.inet_pton(socket.AF_INET6,
4796 '2001:db8:85a3::8a2e:370:7334')
4797 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4800 proto = IP_PROTOS.tcp
4802 self.vapi.nat64_add_del_static_bib(in_addr,
4807 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4812 self.assertEqual(bibe.i_addr, in_addr)
4813 self.assertEqual(bibe.o_addr, out_addr)
4814 self.assertEqual(bibe.i_port, in_port)
4815 self.assertEqual(bibe.o_port, out_port)
4816 self.assertEqual(static_bib_num, 1)
4818 self.vapi.nat64_add_del_static_bib(in_addr,
4824 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4829 self.assertEqual(static_bib_num, 0)
4831 def test_set_timeouts(self):
4832 """ Set NAT64 timeouts """
4833 # verify default values
4834 timeouts = self.vapi.nat64_get_timeouts()
4835 self.assertEqual(timeouts.udp, 300)
4836 self.assertEqual(timeouts.icmp, 60)
4837 self.assertEqual(timeouts.tcp_trans, 240)
4838 self.assertEqual(timeouts.tcp_est, 7440)
4839 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4841 # set and verify custom values
4842 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4843 tcp_est=7450, tcp_incoming_syn=10)
4844 timeouts = self.vapi.nat64_get_timeouts()
4845 self.assertEqual(timeouts.udp, 200)
4846 self.assertEqual(timeouts.icmp, 30)
4847 self.assertEqual(timeouts.tcp_trans, 250)
4848 self.assertEqual(timeouts.tcp_est, 7450)
4849 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4851 def test_dynamic(self):
4852 """ NAT64 dynamic translation test """
4853 self.tcp_port_in = 6303
4854 self.udp_port_in = 6304
4855 self.icmp_id_in = 6305
4857 ses_num_start = self.nat64_get_ses_num()
4859 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4861 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4862 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4865 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4866 self.pg0.add_stream(pkts)
4867 self.pg_enable_capture(self.pg_interfaces)
4869 capture = self.pg1.get_capture(len(pkts))
4870 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4871 dst_ip=self.pg1.remote_ip4)
4874 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4875 self.pg1.add_stream(pkts)
4876 self.pg_enable_capture(self.pg_interfaces)
4878 capture = self.pg0.get_capture(len(pkts))
4879 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4880 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4883 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4884 self.pg0.add_stream(pkts)
4885 self.pg_enable_capture(self.pg_interfaces)
4887 capture = self.pg1.get_capture(len(pkts))
4888 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4889 dst_ip=self.pg1.remote_ip4)
4892 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4893 self.pg1.add_stream(pkts)
4894 self.pg_enable_capture(self.pg_interfaces)
4896 capture = self.pg0.get_capture(len(pkts))
4897 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4899 ses_num_end = self.nat64_get_ses_num()
4901 self.assertEqual(ses_num_end - ses_num_start, 3)
4903 # tenant with specific VRF
4904 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
4905 self.vrf1_nat_addr_n,
4906 vrf_id=self.vrf1_id)
4907 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
4909 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
4910 self.pg2.add_stream(pkts)
4911 self.pg_enable_capture(self.pg_interfaces)
4913 capture = self.pg1.get_capture(len(pkts))
4914 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
4915 dst_ip=self.pg1.remote_ip4)
4917 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
4918 self.pg1.add_stream(pkts)
4919 self.pg_enable_capture(self.pg_interfaces)
4921 capture = self.pg2.get_capture(len(pkts))
4922 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
4924 def test_static(self):
4925 """ NAT64 static translation test """
4926 self.tcp_port_in = 60303
4927 self.udp_port_in = 60304
4928 self.icmp_id_in = 60305
4929 self.tcp_port_out = 60303
4930 self.udp_port_out = 60304
4931 self.icmp_id_out = 60305
4933 ses_num_start = self.nat64_get_ses_num()
4935 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4937 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4938 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4940 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4945 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4950 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
4957 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4958 self.pg0.add_stream(pkts)
4959 self.pg_enable_capture(self.pg_interfaces)
4961 capture = self.pg1.get_capture(len(pkts))
4962 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4963 dst_ip=self.pg1.remote_ip4, same_port=True)
4966 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4967 self.pg1.add_stream(pkts)
4968 self.pg_enable_capture(self.pg_interfaces)
4970 capture = self.pg0.get_capture(len(pkts))
4971 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4972 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4974 ses_num_end = self.nat64_get_ses_num()
4976 self.assertEqual(ses_num_end - ses_num_start, 3)
4978 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4979 def test_session_timeout(self):
4980 """ NAT64 session timeout """
4981 self.icmp_id_in = 1234
4982 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4984 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4985 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4986 self.vapi.nat64_set_timeouts(icmp=5)
4988 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4989 self.pg0.add_stream(pkts)
4990 self.pg_enable_capture(self.pg_interfaces)
4992 capture = self.pg1.get_capture(len(pkts))
4994 ses_num_before_timeout = self.nat64_get_ses_num()
4998 # ICMP session after timeout
4999 ses_num_after_timeout = self.nat64_get_ses_num()
5000 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5002 def test_icmp_error(self):
5003 """ NAT64 ICMP Error message translation """
5004 self.tcp_port_in = 6303
5005 self.udp_port_in = 6304
5006 self.icmp_id_in = 6305
5008 ses_num_start = self.nat64_get_ses_num()
5010 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5012 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5013 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5015 # send some packets to create sessions
5016 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5017 self.pg0.add_stream(pkts)
5018 self.pg_enable_capture(self.pg_interfaces)
5020 capture_ip4 = self.pg1.get_capture(len(pkts))
5021 self.verify_capture_out(capture_ip4,
5022 nat_ip=self.nat_addr,
5023 dst_ip=self.pg1.remote_ip4)
5025 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5026 self.pg1.add_stream(pkts)
5027 self.pg_enable_capture(self.pg_interfaces)
5029 capture_ip6 = self.pg0.get_capture(len(pkts))
5030 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5031 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5032 self.pg0.remote_ip6)
5035 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5036 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5037 ICMPv6DestUnreach(code=1) /
5038 packet[IPv6] for packet in capture_ip6]
5039 self.pg0.add_stream(pkts)
5040 self.pg_enable_capture(self.pg_interfaces)
5042 capture = self.pg1.get_capture(len(pkts))
5043 for packet in capture:
5045 self.assertEqual(packet[IP].src, self.nat_addr)
5046 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5047 self.assertEqual(packet[ICMP].type, 3)
5048 self.assertEqual(packet[ICMP].code, 13)
5049 inner = packet[IPerror]
5050 self.assertEqual(inner.src, self.pg1.remote_ip4)
5051 self.assertEqual(inner.dst, self.nat_addr)
5052 self.check_icmp_checksum(packet)
5053 if inner.haslayer(TCPerror):
5054 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5055 elif inner.haslayer(UDPerror):
5056 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5058 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5060 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5064 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5065 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5066 ICMP(type=3, code=13) /
5067 packet[IP] for packet in capture_ip4]
5068 self.pg1.add_stream(pkts)
5069 self.pg_enable_capture(self.pg_interfaces)
5071 capture = self.pg0.get_capture(len(pkts))
5072 for packet in capture:
5074 self.assertEqual(packet[IPv6].src, ip.src)
5075 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5076 icmp = packet[ICMPv6DestUnreach]
5077 self.assertEqual(icmp.code, 1)
5078 inner = icmp[IPerror6]
5079 self.assertEqual(inner.src, self.pg0.remote_ip6)
5080 self.assertEqual(inner.dst, ip.src)
5081 self.check_icmpv6_checksum(packet)
5082 if inner.haslayer(TCPerror):
5083 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5084 elif inner.haslayer(UDPerror):
5085 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5087 self.assertEqual(inner[ICMPv6EchoRequest].id,
5090 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5093 def test_hairpinning(self):
5094 """ NAT64 hairpinning """
5096 client = self.pg0.remote_hosts[0]
5097 server = self.pg0.remote_hosts[1]
5098 server_tcp_in_port = 22
5099 server_tcp_out_port = 4022
5100 server_udp_in_port = 23
5101 server_udp_out_port = 4023
5102 client_tcp_in_port = 1234
5103 client_udp_in_port = 1235
5104 client_tcp_out_port = 0
5105 client_udp_out_port = 0
5106 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5107 nat_addr_ip6 = ip.src
5109 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5111 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5112 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5114 self.vapi.nat64_add_del_static_bib(server.ip6n,
5117 server_tcp_out_port,
5119 self.vapi.nat64_add_del_static_bib(server.ip6n,
5122 server_udp_out_port,
5127 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5128 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5129 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5131 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5132 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5133 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5135 self.pg0.add_stream(pkts)
5136 self.pg_enable_capture(self.pg_interfaces)
5138 capture = self.pg0.get_capture(len(pkts))
5139 for packet in capture:
5141 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5142 self.assertEqual(packet[IPv6].dst, server.ip6)
5143 if packet.haslayer(TCP):
5144 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5145 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5146 self.check_tcp_checksum(packet)
5147 client_tcp_out_port = packet[TCP].sport
5149 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5150 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5151 self.check_udp_checksum(packet)
5152 client_udp_out_port = packet[UDP].sport
5154 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5159 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5160 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5161 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5163 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5164 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5165 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5167 self.pg0.add_stream(pkts)
5168 self.pg_enable_capture(self.pg_interfaces)
5170 capture = self.pg0.get_capture(len(pkts))
5171 for packet in capture:
5173 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5174 self.assertEqual(packet[IPv6].dst, client.ip6)
5175 if packet.haslayer(TCP):
5176 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5177 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5178 self.check_tcp_checksum(packet)
5180 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5181 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5182 self.check_udp_checksum(packet)
5184 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5189 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5190 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5191 ICMPv6DestUnreach(code=1) /
5192 packet[IPv6] for packet in capture]
5193 self.pg0.add_stream(pkts)
5194 self.pg_enable_capture(self.pg_interfaces)
5196 capture = self.pg0.get_capture(len(pkts))
5197 for packet in capture:
5199 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5200 self.assertEqual(packet[IPv6].dst, server.ip6)
5201 icmp = packet[ICMPv6DestUnreach]
5202 self.assertEqual(icmp.code, 1)
5203 inner = icmp[IPerror6]
5204 self.assertEqual(inner.src, server.ip6)
5205 self.assertEqual(inner.dst, nat_addr_ip6)
5206 self.check_icmpv6_checksum(packet)
5207 if inner.haslayer(TCPerror):
5208 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5209 self.assertEqual(inner[TCPerror].dport,
5210 client_tcp_out_port)
5212 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5213 self.assertEqual(inner[UDPerror].dport,
5214 client_udp_out_port)
5216 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5219 def test_prefix(self):
5220 """ NAT64 Network-Specific Prefix """
5222 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5224 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5225 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5226 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5227 self.vrf1_nat_addr_n,
5228 vrf_id=self.vrf1_id)
5229 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5232 global_pref64 = "2001:db8::"
5233 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5234 global_pref64_len = 32
5235 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5237 prefix = self.vapi.nat64_prefix_dump()
5238 self.assertEqual(len(prefix), 1)
5239 self.assertEqual(prefix[0].prefix, global_pref64_n)
5240 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5241 self.assertEqual(prefix[0].vrf_id, 0)
5243 # Add tenant specific prefix
5244 vrf1_pref64 = "2001:db8:122:300::"
5245 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5246 vrf1_pref64_len = 56
5247 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5249 vrf_id=self.vrf1_id)
5250 prefix = self.vapi.nat64_prefix_dump()
5251 self.assertEqual(len(prefix), 2)
5254 pkts = self.create_stream_in_ip6(self.pg0,
5257 plen=global_pref64_len)
5258 self.pg0.add_stream(pkts)
5259 self.pg_enable_capture(self.pg_interfaces)
5261 capture = self.pg1.get_capture(len(pkts))
5262 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5263 dst_ip=self.pg1.remote_ip4)
5265 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5266 self.pg1.add_stream(pkts)
5267 self.pg_enable_capture(self.pg_interfaces)
5269 capture = self.pg0.get_capture(len(pkts))
5270 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5273 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5275 # Tenant specific prefix
5276 pkts = self.create_stream_in_ip6(self.pg2,
5279 plen=vrf1_pref64_len)
5280 self.pg2.add_stream(pkts)
5281 self.pg_enable_capture(self.pg_interfaces)
5283 capture = self.pg1.get_capture(len(pkts))
5284 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5285 dst_ip=self.pg1.remote_ip4)
5287 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5288 self.pg1.add_stream(pkts)
5289 self.pg_enable_capture(self.pg_interfaces)
5291 capture = self.pg2.get_capture(len(pkts))
5292 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5295 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5297 def test_unknown_proto(self):
5298 """ NAT64 translate packet with unknown protocol """
5300 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5302 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5303 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5304 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5308 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5309 TCP(sport=self.tcp_port_in, dport=20))
5310 self.pg0.add_stream(p)
5311 self.pg_enable_capture(self.pg_interfaces)
5313 p = self.pg1.get_capture(1)
5315 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5316 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5318 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5319 TCP(sport=1234, dport=1234))
5320 self.pg0.add_stream(p)
5321 self.pg_enable_capture(self.pg_interfaces)
5323 p = self.pg1.get_capture(1)
5326 self.assertEqual(packet[IP].src, self.nat_addr)
5327 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5328 self.assertTrue(packet.haslayer(GRE))
5329 self.check_ip_checksum(packet)
5331 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5335 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5336 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5338 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5339 TCP(sport=1234, dport=1234))
5340 self.pg1.add_stream(p)
5341 self.pg_enable_capture(self.pg_interfaces)
5343 p = self.pg0.get_capture(1)
5346 self.assertEqual(packet[IPv6].src, remote_ip6)
5347 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5348 self.assertEqual(packet[IPv6].nh, 47)
5350 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5353 def test_hairpinning_unknown_proto(self):
5354 """ NAT64 translate packet with unknown protocol - hairpinning """
5356 client = self.pg0.remote_hosts[0]
5357 server = self.pg0.remote_hosts[1]
5358 server_tcp_in_port = 22
5359 server_tcp_out_port = 4022
5360 client_tcp_in_port = 1234
5361 client_tcp_out_port = 1235
5362 server_nat_ip = "10.0.0.100"
5363 client_nat_ip = "10.0.0.110"
5364 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5365 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5366 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5367 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5369 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5371 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5372 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5374 self.vapi.nat64_add_del_static_bib(server.ip6n,
5377 server_tcp_out_port,
5380 self.vapi.nat64_add_del_static_bib(server.ip6n,
5386 self.vapi.nat64_add_del_static_bib(client.ip6n,
5389 client_tcp_out_port,
5393 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5394 IPv6(src=client.ip6, dst=server_nat_ip6) /
5395 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5396 self.pg0.add_stream(p)
5397 self.pg_enable_capture(self.pg_interfaces)
5399 p = self.pg0.get_capture(1)
5401 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5402 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5404 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5405 TCP(sport=1234, dport=1234))
5406 self.pg0.add_stream(p)
5407 self.pg_enable_capture(self.pg_interfaces)
5409 p = self.pg0.get_capture(1)
5412 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5413 self.assertEqual(packet[IPv6].dst, server.ip6)
5414 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5416 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5420 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5421 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5423 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5424 TCP(sport=1234, dport=1234))
5425 self.pg0.add_stream(p)
5426 self.pg_enable_capture(self.pg_interfaces)
5428 p = self.pg0.get_capture(1)
5431 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5432 self.assertEqual(packet[IPv6].dst, client.ip6)
5433 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5435 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5438 def test_one_armed_nat64(self):
5439 """ One armed NAT64 """
5441 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5445 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5447 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5448 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5451 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5452 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5453 TCP(sport=12345, dport=80))
5454 self.pg3.add_stream(p)
5455 self.pg_enable_capture(self.pg_interfaces)
5457 capture = self.pg3.get_capture(1)
5462 self.assertEqual(ip.src, self.nat_addr)
5463 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5464 self.assertNotEqual(tcp.sport, 12345)
5465 external_port = tcp.sport
5466 self.assertEqual(tcp.dport, 80)
5467 self.check_tcp_checksum(p)
5468 self.check_ip_checksum(p)
5470 self.logger.error(ppp("Unexpected or invalid packet:", p))
5474 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5475 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5476 TCP(sport=80, dport=external_port))
5477 self.pg3.add_stream(p)
5478 self.pg_enable_capture(self.pg_interfaces)
5480 capture = self.pg3.get_capture(1)
5485 self.assertEqual(ip.src, remote_host_ip6)
5486 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5487 self.assertEqual(tcp.sport, 80)
5488 self.assertEqual(tcp.dport, 12345)
5489 self.check_tcp_checksum(p)
5491 self.logger.error(ppp("Unexpected or invalid packet:", p))
5494 def test_frag_in_order(self):
5495 """ NAT64 translate fragments arriving in order """
5496 self.tcp_port_in = random.randint(1025, 65535)
5498 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5500 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5501 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5503 reass = self.vapi.nat_reass_dump()
5504 reass_n_start = len(reass)
5508 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5509 self.tcp_port_in, 20, data)
5510 self.pg0.add_stream(pkts)
5511 self.pg_enable_capture(self.pg_interfaces)
5513 frags = self.pg1.get_capture(len(pkts))
5514 p = self.reass_frags_and_verify(frags,
5516 self.pg1.remote_ip4)
5517 self.assertEqual(p[TCP].dport, 20)
5518 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5519 self.tcp_port_out = p[TCP].sport
5520 self.assertEqual(data, p[Raw].load)
5523 data = "A" * 4 + "b" * 16 + "C" * 3
5524 pkts = self.create_stream_frag(self.pg1,
5529 self.pg1.add_stream(pkts)
5530 self.pg_enable_capture(self.pg_interfaces)
5532 frags = self.pg0.get_capture(len(pkts))
5533 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5534 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5535 self.assertEqual(p[TCP].sport, 20)
5536 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5537 self.assertEqual(data, p[Raw].load)
5539 reass = self.vapi.nat_reass_dump()
5540 reass_n_end = len(reass)
5542 self.assertEqual(reass_n_end - reass_n_start, 2)
5544 def test_reass_hairpinning(self):
5545 """ NAT64 fragments hairpinning """
5547 client = self.pg0.remote_hosts[0]
5548 server = self.pg0.remote_hosts[1]
5549 server_in_port = random.randint(1025, 65535)
5550 server_out_port = random.randint(1025, 65535)
5551 client_in_port = random.randint(1025, 65535)
5552 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5553 nat_addr_ip6 = ip.src
5555 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5557 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5558 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5560 # add static BIB entry for server
5561 self.vapi.nat64_add_del_static_bib(server.ip6n,
5567 # send packet from host to server
5568 pkts = self.create_stream_frag_ip6(self.pg0,
5573 self.pg0.add_stream(pkts)
5574 self.pg_enable_capture(self.pg_interfaces)
5576 frags = self.pg0.get_capture(len(pkts))
5577 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5578 self.assertNotEqual(p[TCP].sport, client_in_port)
5579 self.assertEqual(p[TCP].dport, server_in_port)
5580 self.assertEqual(data, p[Raw].load)
5582 def test_frag_out_of_order(self):
5583 """ NAT64 translate fragments arriving out of order """
5584 self.tcp_port_in = random.randint(1025, 65535)
5586 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5588 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5589 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5593 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5594 self.tcp_port_in, 20, data)
5596 self.pg0.add_stream(pkts)
5597 self.pg_enable_capture(self.pg_interfaces)
5599 frags = self.pg1.get_capture(len(pkts))
5600 p = self.reass_frags_and_verify(frags,
5602 self.pg1.remote_ip4)
5603 self.assertEqual(p[TCP].dport, 20)
5604 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5605 self.tcp_port_out = p[TCP].sport
5606 self.assertEqual(data, p[Raw].load)
5609 data = "A" * 4 + "B" * 16 + "C" * 3
5610 pkts = self.create_stream_frag(self.pg1,
5616 self.pg1.add_stream(pkts)
5617 self.pg_enable_capture(self.pg_interfaces)
5619 frags = self.pg0.get_capture(len(pkts))
5620 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5621 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5622 self.assertEqual(p[TCP].sport, 20)
5623 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5624 self.assertEqual(data, p[Raw].load)
5626 def test_interface_addr(self):
5627 """ Acquire NAT64 pool addresses from interface """
5628 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5630 # no address in NAT64 pool
5631 adresses = self.vapi.nat44_address_dump()
5632 self.assertEqual(0, len(adresses))
5634 # configure interface address and check NAT64 address pool
5635 self.pg4.config_ip4()
5636 addresses = self.vapi.nat64_pool_addr_dump()
5637 self.assertEqual(len(addresses), 1)
5638 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5640 # remove interface address and check NAT64 address pool
5641 self.pg4.unconfig_ip4()
5642 addresses = self.vapi.nat64_pool_addr_dump()
5643 self.assertEqual(0, len(adresses))
5645 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5646 def test_ipfix_max_bibs_sessions(self):
5647 """ IPFIX logging maximum session and BIB entries exceeded """
5650 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5654 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5656 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5657 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5661 for i in range(0, max_bibs):
5662 src = "fd01:aa::%x" % (i)
5663 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5664 IPv6(src=src, dst=remote_host_ip6) /
5665 TCP(sport=12345, dport=80))
5667 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5668 IPv6(src=src, dst=remote_host_ip6) /
5669 TCP(sport=12345, dport=22))
5671 self.pg0.add_stream(pkts)
5672 self.pg_enable_capture(self.pg_interfaces)
5674 self.pg1.get_capture(max_sessions)
5676 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5677 src_address=self.pg3.local_ip4n,
5679 template_interval=10)
5680 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5681 src_port=self.ipfix_src_port)
5683 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5684 IPv6(src=src, dst=remote_host_ip6) /
5685 TCP(sport=12345, dport=25))
5686 self.pg0.add_stream(p)
5687 self.pg_enable_capture(self.pg_interfaces)
5689 self.pg1.get_capture(0)
5690 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5691 capture = self.pg3.get_capture(9)
5692 ipfix = IPFIXDecoder()
5693 # first load template
5695 self.assertTrue(p.haslayer(IPFIX))
5696 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5697 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5698 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5699 self.assertEqual(p[UDP].dport, 4739)
5700 self.assertEqual(p[IPFIX].observationDomainID,
5701 self.ipfix_domain_id)
5702 if p.haslayer(Template):
5703 ipfix.add_template(p.getlayer(Template))
5704 # verify events in data set
5706 if p.haslayer(Data):
5707 data = ipfix.decode_data_set(p.getlayer(Set))
5708 self.verify_ipfix_max_sessions(data, max_sessions)
5710 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5711 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5712 TCP(sport=12345, dport=80))
5713 self.pg0.add_stream(p)
5714 self.pg_enable_capture(self.pg_interfaces)
5716 self.pg1.get_capture(0)
5717 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5718 capture = self.pg3.get_capture(1)
5719 # verify events in data set
5721 self.assertTrue(p.haslayer(IPFIX))
5722 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5723 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5724 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5725 self.assertEqual(p[UDP].dport, 4739)
5726 self.assertEqual(p[IPFIX].observationDomainID,
5727 self.ipfix_domain_id)
5728 if p.haslayer(Data):
5729 data = ipfix.decode_data_set(p.getlayer(Set))
5730 self.verify_ipfix_max_bibs(data, max_bibs)
5732 def test_ipfix_max_frags(self):
5733 """ IPFIX logging maximum fragments pending reassembly exceeded """
5734 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5736 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5737 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5738 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5739 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5740 src_address=self.pg3.local_ip4n,
5742 template_interval=10)
5743 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5744 src_port=self.ipfix_src_port)
5747 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5748 self.tcp_port_in, 20, data)
5749 self.pg0.add_stream(pkts[-1])
5750 self.pg_enable_capture(self.pg_interfaces)
5752 self.pg1.get_capture(0)
5753 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5754 capture = self.pg3.get_capture(9)
5755 ipfix = IPFIXDecoder()
5756 # first load template
5758 self.assertTrue(p.haslayer(IPFIX))
5759 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5760 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5761 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5762 self.assertEqual(p[UDP].dport, 4739)
5763 self.assertEqual(p[IPFIX].observationDomainID,
5764 self.ipfix_domain_id)
5765 if p.haslayer(Template):
5766 ipfix.add_template(p.getlayer(Template))
5767 # verify events in data set
5769 if p.haslayer(Data):
5770 data = ipfix.decode_data_set(p.getlayer(Set))
5771 self.verify_ipfix_max_fragments_ip6(data, 0,
5772 self.pg0.remote_ip6n)
5774 def test_ipfix_bib_ses(self):
5775 """ IPFIX logging NAT64 BIB/session create and delete events """
5776 self.tcp_port_in = random.randint(1025, 65535)
5777 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5781 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5783 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5784 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5785 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5786 src_address=self.pg3.local_ip4n,
5788 template_interval=10)
5789 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5790 src_port=self.ipfix_src_port)
5793 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5794 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5795 TCP(sport=self.tcp_port_in, dport=25))
5796 self.pg0.add_stream(p)
5797 self.pg_enable_capture(self.pg_interfaces)
5799 p = self.pg1.get_capture(1)
5800 self.tcp_port_out = p[0][TCP].sport
5801 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5802 capture = self.pg3.get_capture(10)
5803 ipfix = IPFIXDecoder()
5804 # first load template
5806 self.assertTrue(p.haslayer(IPFIX))
5807 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5808 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5809 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5810 self.assertEqual(p[UDP].dport, 4739)
5811 self.assertEqual(p[IPFIX].observationDomainID,
5812 self.ipfix_domain_id)
5813 if p.haslayer(Template):
5814 ipfix.add_template(p.getlayer(Template))
5815 # verify events in data set
5817 if p.haslayer(Data):
5818 data = ipfix.decode_data_set(p.getlayer(Set))
5819 if ord(data[0][230]) == 10:
5820 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5821 elif ord(data[0][230]) == 6:
5822 self.verify_ipfix_nat64_ses(data,
5824 self.pg0.remote_ip6n,
5825 self.pg1.remote_ip4,
5828 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5831 self.pg_enable_capture(self.pg_interfaces)
5832 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5835 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5836 capture = self.pg3.get_capture(2)
5837 # verify events in data set
5839 self.assertTrue(p.haslayer(IPFIX))
5840 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5841 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5842 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5843 self.assertEqual(p[UDP].dport, 4739)
5844 self.assertEqual(p[IPFIX].observationDomainID,
5845 self.ipfix_domain_id)
5846 if p.haslayer(Data):
5847 data = ipfix.decode_data_set(p.getlayer(Set))
5848 if ord(data[0][230]) == 11:
5849 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5850 elif ord(data[0][230]) == 7:
5851 self.verify_ipfix_nat64_ses(data,
5853 self.pg0.remote_ip6n,
5854 self.pg1.remote_ip4,
5857 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5859 def nat64_get_ses_num(self):
5861 Return number of active NAT64 sessions.
5863 st = self.vapi.nat64_st_dump()
5866 def clear_nat64(self):
5868 Clear NAT64 configuration.
5870 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5871 domain_id=self.ipfix_domain_id)
5872 self.ipfix_src_port = 4739
5873 self.ipfix_domain_id = 1
5875 self.vapi.nat64_set_timeouts()
5877 interfaces = self.vapi.nat64_interface_dump()
5878 for intf in interfaces:
5879 if intf.is_inside > 1:
5880 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5883 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5887 bib = self.vapi.nat64_bib_dump(255)
5890 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
5898 adresses = self.vapi.nat64_pool_addr_dump()
5899 for addr in adresses:
5900 self.vapi.nat64_add_del_pool_addr_range(addr.address,
5905 prefixes = self.vapi.nat64_prefix_dump()
5906 for prefix in prefixes:
5907 self.vapi.nat64_add_del_prefix(prefix.prefix,
5909 vrf_id=prefix.vrf_id,
5913 super(TestNAT64, self).tearDown()
5914 if not self.vpp_dead:
5915 self.logger.info(self.vapi.cli("show nat64 pool"))
5916 self.logger.info(self.vapi.cli("show nat64 interfaces"))
5917 self.logger.info(self.vapi.cli("show nat64 prefix"))
5918 self.logger.info(self.vapi.cli("show nat64 bib all"))
5919 self.logger.info(self.vapi.cli("show nat64 session table all"))
5920 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
5924 class TestDSlite(MethodHolder):
5925 """ DS-Lite Test Cases """
5928 def setUpClass(cls):
5929 super(TestDSlite, cls).setUpClass()
5932 cls.nat_addr = '10.0.0.3'
5933 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
5935 cls.create_pg_interfaces(range(2))
5937 cls.pg0.config_ip4()
5938 cls.pg0.resolve_arp()
5940 cls.pg1.config_ip6()
5941 cls.pg1.generate_remote_hosts(2)
5942 cls.pg1.configure_ipv6_neighbors()
5945 super(TestDSlite, cls).tearDownClass()
5948 def test_dslite(self):
5949 """ Test DS-Lite """
5950 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
5952 aftr_ip4 = '192.0.0.1'
5953 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
5954 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
5955 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
5956 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
5959 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5960 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
5961 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5962 UDP(sport=20000, dport=10000))
5963 self.pg1.add_stream(p)
5964 self.pg_enable_capture(self.pg_interfaces)
5966 capture = self.pg0.get_capture(1)
5967 capture = capture[0]
5968 self.assertFalse(capture.haslayer(IPv6))
5969 self.assertEqual(capture[IP].src, self.nat_addr)
5970 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
5971 self.assertNotEqual(capture[UDP].sport, 20000)
5972 self.assertEqual(capture[UDP].dport, 10000)
5973 self.check_ip_checksum(capture)
5974 out_port = capture[UDP].sport
5976 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5977 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
5978 UDP(sport=10000, dport=out_port))
5979 self.pg0.add_stream(p)
5980 self.pg_enable_capture(self.pg_interfaces)
5982 capture = self.pg1.get_capture(1)
5983 capture = capture[0]
5984 self.assertEqual(capture[IPv6].src, aftr_ip6)
5985 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
5986 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
5987 self.assertEqual(capture[IP].dst, '192.168.1.1')
5988 self.assertEqual(capture[UDP].sport, 10000)
5989 self.assertEqual(capture[UDP].dport, 20000)
5990 self.check_ip_checksum(capture)
5993 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5994 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
5995 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
5996 TCP(sport=20001, dport=10001))
5997 self.pg1.add_stream(p)
5998 self.pg_enable_capture(self.pg_interfaces)
6000 capture = self.pg0.get_capture(1)
6001 capture = capture[0]
6002 self.assertFalse(capture.haslayer(IPv6))
6003 self.assertEqual(capture[IP].src, self.nat_addr)
6004 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6005 self.assertNotEqual(capture[TCP].sport, 20001)
6006 self.assertEqual(capture[TCP].dport, 10001)
6007 self.check_ip_checksum(capture)
6008 self.check_tcp_checksum(capture)
6009 out_port = capture[TCP].sport
6011 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6012 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6013 TCP(sport=10001, dport=out_port))
6014 self.pg0.add_stream(p)
6015 self.pg_enable_capture(self.pg_interfaces)
6017 capture = self.pg1.get_capture(1)
6018 capture = capture[0]
6019 self.assertEqual(capture[IPv6].src, aftr_ip6)
6020 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6021 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6022 self.assertEqual(capture[IP].dst, '192.168.1.1')
6023 self.assertEqual(capture[TCP].sport, 10001)
6024 self.assertEqual(capture[TCP].dport, 20001)
6025 self.check_ip_checksum(capture)
6026 self.check_tcp_checksum(capture)
6029 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6030 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6031 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6032 ICMP(id=4000, type='echo-request'))
6033 self.pg1.add_stream(p)
6034 self.pg_enable_capture(self.pg_interfaces)
6036 capture = self.pg0.get_capture(1)
6037 capture = capture[0]
6038 self.assertFalse(capture.haslayer(IPv6))
6039 self.assertEqual(capture[IP].src, self.nat_addr)
6040 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6041 self.assertNotEqual(capture[ICMP].id, 4000)
6042 self.check_ip_checksum(capture)
6043 self.check_icmp_checksum(capture)
6044 out_id = capture[ICMP].id
6046 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6047 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6048 ICMP(id=out_id, type='echo-reply'))
6049 self.pg0.add_stream(p)
6050 self.pg_enable_capture(self.pg_interfaces)
6052 capture = self.pg1.get_capture(1)
6053 capture = capture[0]
6054 self.assertEqual(capture[IPv6].src, aftr_ip6)
6055 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6056 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6057 self.assertEqual(capture[IP].dst, '192.168.1.1')
6058 self.assertEqual(capture[ICMP].id, 4000)
6059 self.check_ip_checksum(capture)
6060 self.check_icmp_checksum(capture)
6062 # ping DS-Lite AFTR tunnel endpoint address
6063 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6064 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6065 ICMPv6EchoRequest())
6066 self.pg1.add_stream(p)
6067 self.pg_enable_capture(self.pg_interfaces)
6069 capture = self.pg1.get_capture(1)
6070 self.assertEqual(1, len(capture))
6071 capture = capture[0]
6072 self.assertEqual(capture[IPv6].src, aftr_ip6)
6073 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6074 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6077 super(TestDSlite, self).tearDown()
6078 if not self.vpp_dead:
6079 self.logger.info(self.vapi.cli("show dslite pool"))
6081 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6082 self.logger.info(self.vapi.cli("show dslite sessions"))
6085 class TestDSliteCE(MethodHolder):
6086 """ DS-Lite CE Test Cases """
6089 def setUpConstants(cls):
6090 super(TestDSliteCE, cls).setUpConstants()
6091 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6094 def setUpClass(cls):
6095 super(TestDSliteCE, cls).setUpClass()
6098 cls.create_pg_interfaces(range(2))
6100 cls.pg0.config_ip4()
6101 cls.pg0.resolve_arp()
6103 cls.pg1.config_ip6()
6104 cls.pg1.generate_remote_hosts(1)
6105 cls.pg1.configure_ipv6_neighbors()
6108 super(TestDSliteCE, cls).tearDownClass()
6111 def test_dslite_ce(self):
6112 """ Test DS-Lite CE """
6114 b4_ip4 = '192.0.0.2'
6115 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6116 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6117 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6118 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6120 aftr_ip4 = '192.0.0.1'
6121 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6122 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6123 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6124 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6126 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6127 dst_address_length=128,
6128 next_hop_address=self.pg1.remote_ip6n,
6129 next_hop_sw_if_index=self.pg1.sw_if_index,
6133 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6134 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6135 UDP(sport=10000, dport=20000))
6136 self.pg0.add_stream(p)
6137 self.pg_enable_capture(self.pg_interfaces)
6139 capture = self.pg1.get_capture(1)
6140 capture = capture[0]
6141 self.assertEqual(capture[IPv6].src, b4_ip6)
6142 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6143 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6144 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6145 self.assertEqual(capture[UDP].sport, 10000)
6146 self.assertEqual(capture[UDP].dport, 20000)
6147 self.check_ip_checksum(capture)
6150 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6151 IPv6(dst=b4_ip6, src=aftr_ip6) /
6152 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6153 UDP(sport=20000, dport=10000))
6154 self.pg1.add_stream(p)
6155 self.pg_enable_capture(self.pg_interfaces)
6157 capture = self.pg0.get_capture(1)
6158 capture = capture[0]
6159 self.assertFalse(capture.haslayer(IPv6))
6160 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6161 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6162 self.assertEqual(capture[UDP].sport, 20000)
6163 self.assertEqual(capture[UDP].dport, 10000)
6164 self.check_ip_checksum(capture)
6166 # ping DS-Lite B4 tunnel endpoint address
6167 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6168 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6169 ICMPv6EchoRequest())
6170 self.pg1.add_stream(p)
6171 self.pg_enable_capture(self.pg_interfaces)
6173 capture = self.pg1.get_capture(1)
6174 self.assertEqual(1, len(capture))
6175 capture = capture[0]
6176 self.assertEqual(capture[IPv6].src, b4_ip6)
6177 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6178 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6181 super(TestDSliteCE, self).tearDown()
6182 if not self.vpp_dead:
6184 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6186 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6189 class TestNAT66(MethodHolder):
6190 """ NAT66 Test Cases """
6193 def setUpClass(cls):
6194 super(TestNAT66, cls).setUpClass()
6197 cls.nat_addr = 'fd01:ff::2'
6198 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6200 cls.create_pg_interfaces(range(2))
6201 cls.interfaces = list(cls.pg_interfaces)
6203 for i in cls.interfaces:
6206 i.configure_ipv6_neighbors()
6209 super(TestNAT66, cls).tearDownClass()
6212 def test_static(self):
6213 """ 1:1 NAT66 test """
6214 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6215 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6216 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6221 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6222 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6225 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6226 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6229 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6230 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6231 ICMPv6EchoRequest())
6233 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6234 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6235 GRE() / IP() / TCP())
6237 self.pg0.add_stream(pkts)
6238 self.pg_enable_capture(self.pg_interfaces)
6240 capture = self.pg1.get_capture(len(pkts))
6241 for packet in capture:
6243 self.assertEqual(packet[IPv6].src, self.nat_addr)
6244 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6245 if packet.haslayer(TCP):
6246 self.check_tcp_checksum(packet)
6247 elif packet.haslayer(UDP):
6248 self.check_udp_checksum(packet)
6249 elif packet.haslayer(ICMPv6EchoRequest):
6250 self.check_icmpv6_checksum(packet)
6252 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6257 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6258 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6262 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6265 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6266 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6269 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6270 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6271 GRE() / IP() / TCP())
6273 self.pg1.add_stream(pkts)
6274 self.pg_enable_capture(self.pg_interfaces)
6276 capture = self.pg0.get_capture(len(pkts))
6277 for packet in capture:
6279 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6280 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6281 if packet.haslayer(TCP):
6282 self.check_tcp_checksum(packet)
6283 elif packet.haslayer(UDP):
6284 self.check_udp_checksum(packet)
6285 elif packet.haslayer(ICMPv6EchoReply):
6286 self.check_icmpv6_checksum(packet)
6288 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6291 sm = self.vapi.nat66_static_mapping_dump()
6292 self.assertEqual(len(sm), 1)
6293 self.assertEqual(sm[0].total_pkts, 8)
6295 def clear_nat66(self):
6297 Clear NAT66 configuration.
6299 interfaces = self.vapi.nat66_interface_dump()
6300 for intf in interfaces:
6301 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6305 static_mappings = self.vapi.nat66_static_mapping_dump()
6306 for sm in static_mappings:
6307 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6308 sm.external_ip_address,
6313 super(TestNAT66, self).tearDown()
6314 if not self.vpp_dead:
6315 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6316 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6319 if __name__ == '__main__':
6320 unittest.main(testRunner=VppTestRunner)