9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1034 external_sw_if_index=sm.external_sw_if_index,
1037 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1038 for lb_sm in lb_static_mappings:
1039 self.vapi.nat44_add_del_lb_static_mapping(
1040 lb_sm.external_addr,
1041 lb_sm.external_port,
1043 vrf_id=lb_sm.vrf_id,
1044 twice_nat=lb_sm.twice_nat,
1045 out2in_only=lb_sm.out2in_only,
1051 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1052 for id_m in identity_mappings:
1053 self.vapi.nat44_add_del_identity_mapping(
1054 addr_only=id_m.addr_only,
1057 sw_if_index=id_m.sw_if_index,
1059 protocol=id_m.protocol,
1062 adresses = self.vapi.nat44_address_dump()
1063 for addr in adresses:
1064 self.vapi.nat44_add_del_address_range(addr.ip_address,
1066 twice_nat=addr.twice_nat,
1069 self.vapi.nat_set_reass()
1070 self.vapi.nat_set_reass(is_ip6=1)
1072 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1073 local_port=0, external_port=0, vrf_id=0,
1074 is_add=1, external_sw_if_index=0xFFFFFFFF,
1075 proto=0, twice_nat=0, out2in_only=0, tag=""):
1077 Add/delete NAT44 static mapping
1079 :param local_ip: Local IP address
1080 :param external_ip: External IP address
1081 :param local_port: Local port number (Optional)
1082 :param external_port: External port number (Optional)
1083 :param vrf_id: VRF ID (Default 0)
1084 :param is_add: 1 if add, 0 if delete (Default add)
1085 :param external_sw_if_index: External interface instead of IP address
1086 :param proto: IP protocol (Mandatory if port specified)
1087 :param twice_nat: 1 if translate external host address and port
1088 :param out2in_only: if 1 rule is matching only out2in direction
1089 :param tag: Opaque string tag
1092 if local_port and external_port:
1094 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1095 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1096 self.vapi.nat44_add_del_static_mapping(
1099 external_sw_if_index,
1110 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1112 Add/delete NAT44 address
1114 :param ip: IP address
1115 :param is_add: 1 if add, 0 if delete (Default add)
1116 :param twice_nat: twice NAT address for extenal hosts
1118 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1119 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1121 twice_nat=twice_nat)
1123 def test_dynamic(self):
1124 """ NAT44 dynamic translation test """
1126 self.nat44_add_address(self.nat_addr)
1127 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1128 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1132 pkts = self.create_stream_in(self.pg0, self.pg1)
1133 self.pg0.add_stream(pkts)
1134 self.pg_enable_capture(self.pg_interfaces)
1136 capture = self.pg1.get_capture(len(pkts))
1137 self.verify_capture_out(capture)
1140 pkts = self.create_stream_out(self.pg1)
1141 self.pg1.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1144 capture = self.pg0.get_capture(len(pkts))
1145 self.verify_capture_in(capture, self.pg0)
1147 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1148 """ NAT44 handling of client packets with TTL=1 """
1150 self.nat44_add_address(self.nat_addr)
1151 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1152 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1155 # Client side - generate traffic
1156 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1157 self.pg0.add_stream(pkts)
1158 self.pg_enable_capture(self.pg_interfaces)
1161 # Client side - verify ICMP type 11 packets
1162 capture = self.pg0.get_capture(len(pkts))
1163 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1165 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1166 """ NAT44 handling of server packets with TTL=1 """
1168 self.nat44_add_address(self.nat_addr)
1169 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1170 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1173 # Client side - create sessions
1174 pkts = self.create_stream_in(self.pg0, self.pg1)
1175 self.pg0.add_stream(pkts)
1176 self.pg_enable_capture(self.pg_interfaces)
1179 # Server side - generate traffic
1180 capture = self.pg1.get_capture(len(pkts))
1181 self.verify_capture_out(capture)
1182 pkts = self.create_stream_out(self.pg1, ttl=1)
1183 self.pg1.add_stream(pkts)
1184 self.pg_enable_capture(self.pg_interfaces)
1187 # Server side - verify ICMP type 11 packets
1188 capture = self.pg1.get_capture(len(pkts))
1189 self.verify_capture_out_with_icmp_errors(capture,
1190 src_ip=self.pg1.local_ip4)
1192 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1193 """ NAT44 handling of error responses to client packets with TTL=2 """
1195 self.nat44_add_address(self.nat_addr)
1196 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1197 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1200 # Client side - generate traffic
1201 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1202 self.pg0.add_stream(pkts)
1203 self.pg_enable_capture(self.pg_interfaces)
1206 # Server side - simulate ICMP type 11 response
1207 capture = self.pg1.get_capture(len(pkts))
1208 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1209 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1210 ICMP(type=11) / packet[IP] for packet in capture]
1211 self.pg1.add_stream(pkts)
1212 self.pg_enable_capture(self.pg_interfaces)
1215 # Client side - verify ICMP type 11 packets
1216 capture = self.pg0.get_capture(len(pkts))
1217 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1219 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1220 """ NAT44 handling of error responses to server packets with TTL=2 """
1222 self.nat44_add_address(self.nat_addr)
1223 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1224 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1227 # Client side - create sessions
1228 pkts = self.create_stream_in(self.pg0, self.pg1)
1229 self.pg0.add_stream(pkts)
1230 self.pg_enable_capture(self.pg_interfaces)
1233 # Server side - generate traffic
1234 capture = self.pg1.get_capture(len(pkts))
1235 self.verify_capture_out(capture)
1236 pkts = self.create_stream_out(self.pg1, ttl=2)
1237 self.pg1.add_stream(pkts)
1238 self.pg_enable_capture(self.pg_interfaces)
1241 # Client side - simulate ICMP type 11 response
1242 capture = self.pg0.get_capture(len(pkts))
1243 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1244 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1245 ICMP(type=11) / packet[IP] for packet in capture]
1246 self.pg0.add_stream(pkts)
1247 self.pg_enable_capture(self.pg_interfaces)
1250 # Server side - verify ICMP type 11 packets
1251 capture = self.pg1.get_capture(len(pkts))
1252 self.verify_capture_out_with_icmp_errors(capture)
1254 def test_ping_out_interface_from_outside(self):
1255 """ Ping NAT44 out interface from outside network """
1257 self.nat44_add_address(self.nat_addr)
1258 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1259 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1262 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1263 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1264 ICMP(id=self.icmp_id_out, type='echo-request'))
1266 self.pg1.add_stream(pkts)
1267 self.pg_enable_capture(self.pg_interfaces)
1269 capture = self.pg1.get_capture(len(pkts))
1270 self.assertEqual(1, len(capture))
1273 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1274 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1275 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1276 self.assertEqual(packet[ICMP].type, 0) # echo reply
1278 self.logger.error(ppp("Unexpected or invalid packet "
1279 "(outside network):", packet))
1282 def test_ping_internal_host_from_outside(self):
1283 """ Ping internal host from outside network """
1285 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1286 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1287 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1291 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1292 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1293 ICMP(id=self.icmp_id_out, type='echo-request'))
1294 self.pg1.add_stream(pkt)
1295 self.pg_enable_capture(self.pg_interfaces)
1297 capture = self.pg0.get_capture(1)
1298 self.verify_capture_in(capture, self.pg0, packet_num=1)
1299 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1302 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1303 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1304 ICMP(id=self.icmp_id_in, type='echo-reply'))
1305 self.pg0.add_stream(pkt)
1306 self.pg_enable_capture(self.pg_interfaces)
1308 capture = self.pg1.get_capture(1)
1309 self.verify_capture_out(capture, same_port=True, packet_num=1)
1310 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1312 def test_forwarding(self):
1313 """ NAT44 forwarding test """
1315 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1316 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1318 self.vapi.nat44_forwarding_enable_disable(1)
1320 real_ip = self.pg0.remote_ip4n
1321 alias_ip = self.nat_addr_n
1322 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1323 external_ip=alias_ip)
1326 # in2out - static mapping match
1328 pkts = self.create_stream_out(self.pg1)
1329 self.pg1.add_stream(pkts)
1330 self.pg_enable_capture(self.pg_interfaces)
1332 capture = self.pg0.get_capture(len(pkts))
1333 self.verify_capture_in(capture, self.pg0)
1335 pkts = self.create_stream_in(self.pg0, self.pg1)
1336 self.pg0.add_stream(pkts)
1337 self.pg_enable_capture(self.pg_interfaces)
1339 capture = self.pg1.get_capture(len(pkts))
1340 self.verify_capture_out(capture, same_port=True)
1342 # in2out - no static mapping match
1344 host0 = self.pg0.remote_hosts[0]
1345 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1347 pkts = self.create_stream_out(self.pg1,
1348 dst_ip=self.pg0.remote_ip4,
1349 use_inside_ports=True)
1350 self.pg1.add_stream(pkts)
1351 self.pg_enable_capture(self.pg_interfaces)
1353 capture = self.pg0.get_capture(len(pkts))
1354 self.verify_capture_in(capture, self.pg0)
1356 pkts = self.create_stream_in(self.pg0, self.pg1)
1357 self.pg0.add_stream(pkts)
1358 self.pg_enable_capture(self.pg_interfaces)
1360 capture = self.pg1.get_capture(len(pkts))
1361 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1364 self.pg0.remote_hosts[0] = host0
1367 self.vapi.nat44_forwarding_enable_disable(0)
1368 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1369 external_ip=alias_ip,
1372 def test_static_in(self):
1373 """ 1:1 NAT initialized from inside network """
1375 nat_ip = "10.0.0.10"
1376 self.tcp_port_out = 6303
1377 self.udp_port_out = 6304
1378 self.icmp_id_out = 6305
1380 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1381 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1382 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1384 sm = self.vapi.nat44_static_mapping_dump()
1385 self.assertEqual(len(sm), 1)
1386 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1387 self.assertEqual(sm[0].protocol, 0)
1388 self.assertEqual(sm[0].local_port, 0)
1389 self.assertEqual(sm[0].external_port, 0)
1392 pkts = self.create_stream_in(self.pg0, self.pg1)
1393 self.pg0.add_stream(pkts)
1394 self.pg_enable_capture(self.pg_interfaces)
1396 capture = self.pg1.get_capture(len(pkts))
1397 self.verify_capture_out(capture, nat_ip, True)
1400 pkts = self.create_stream_out(self.pg1, nat_ip)
1401 self.pg1.add_stream(pkts)
1402 self.pg_enable_capture(self.pg_interfaces)
1404 capture = self.pg0.get_capture(len(pkts))
1405 self.verify_capture_in(capture, self.pg0)
1407 def test_static_out(self):
1408 """ 1:1 NAT initialized from outside network """
1410 nat_ip = "10.0.0.20"
1411 self.tcp_port_out = 6303
1412 self.udp_port_out = 6304
1413 self.icmp_id_out = 6305
1416 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1417 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1418 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1420 sm = self.vapi.nat44_static_mapping_dump()
1421 self.assertEqual(len(sm), 1)
1422 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1425 pkts = self.create_stream_out(self.pg1, nat_ip)
1426 self.pg1.add_stream(pkts)
1427 self.pg_enable_capture(self.pg_interfaces)
1429 capture = self.pg0.get_capture(len(pkts))
1430 self.verify_capture_in(capture, self.pg0)
1433 pkts = self.create_stream_in(self.pg0, self.pg1)
1434 self.pg0.add_stream(pkts)
1435 self.pg_enable_capture(self.pg_interfaces)
1437 capture = self.pg1.get_capture(len(pkts))
1438 self.verify_capture_out(capture, nat_ip, True)
1440 def test_static_with_port_in(self):
1441 """ 1:1 NAPT initialized from inside network """
1443 self.tcp_port_out = 3606
1444 self.udp_port_out = 3607
1445 self.icmp_id_out = 3608
1447 self.nat44_add_address(self.nat_addr)
1448 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1449 self.tcp_port_in, self.tcp_port_out,
1450 proto=IP_PROTOS.tcp)
1451 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1452 self.udp_port_in, self.udp_port_out,
1453 proto=IP_PROTOS.udp)
1454 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1455 self.icmp_id_in, self.icmp_id_out,
1456 proto=IP_PROTOS.icmp)
1457 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1458 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1462 pkts = self.create_stream_in(self.pg0, self.pg1)
1463 self.pg0.add_stream(pkts)
1464 self.pg_enable_capture(self.pg_interfaces)
1466 capture = self.pg1.get_capture(len(pkts))
1467 self.verify_capture_out(capture)
1470 pkts = self.create_stream_out(self.pg1)
1471 self.pg1.add_stream(pkts)
1472 self.pg_enable_capture(self.pg_interfaces)
1474 capture = self.pg0.get_capture(len(pkts))
1475 self.verify_capture_in(capture, self.pg0)
1477 def test_static_with_port_out(self):
1478 """ 1:1 NAPT initialized from outside network """
1480 self.tcp_port_out = 30606
1481 self.udp_port_out = 30607
1482 self.icmp_id_out = 30608
1484 self.nat44_add_address(self.nat_addr)
1485 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1486 self.tcp_port_in, self.tcp_port_out,
1487 proto=IP_PROTOS.tcp)
1488 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1489 self.udp_port_in, self.udp_port_out,
1490 proto=IP_PROTOS.udp)
1491 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1492 self.icmp_id_in, self.icmp_id_out,
1493 proto=IP_PROTOS.icmp)
1494 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1495 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1499 pkts = self.create_stream_out(self.pg1)
1500 self.pg1.add_stream(pkts)
1501 self.pg_enable_capture(self.pg_interfaces)
1503 capture = self.pg0.get_capture(len(pkts))
1504 self.verify_capture_in(capture, self.pg0)
1507 pkts = self.create_stream_in(self.pg0, self.pg1)
1508 self.pg0.add_stream(pkts)
1509 self.pg_enable_capture(self.pg_interfaces)
1511 capture = self.pg1.get_capture(len(pkts))
1512 self.verify_capture_out(capture)
1514 def test_static_with_port_out2(self):
1515 """ 1:1 NAPT symmetrical rule """
1520 self.vapi.nat44_forwarding_enable_disable(1)
1521 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1522 local_port, external_port,
1523 proto=IP_PROTOS.tcp, out2in_only=1)
1524 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1525 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1528 # from client to service
1529 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1530 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1531 TCP(sport=12345, dport=external_port))
1532 self.pg1.add_stream(p)
1533 self.pg_enable_capture(self.pg_interfaces)
1535 capture = self.pg0.get_capture(1)
1541 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1542 self.assertEqual(tcp.dport, local_port)
1543 self.check_tcp_checksum(p)
1544 self.check_ip_checksum(p)
1546 self.logger.error(ppp("Unexpected or invalid packet:", p))
1550 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1551 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1552 ICMP(type=11) / capture[0][IP])
1553 self.pg0.add_stream(p)
1554 self.pg_enable_capture(self.pg_interfaces)
1556 capture = self.pg1.get_capture(1)
1559 self.assertEqual(p[IP].src, self.nat_addr)
1561 self.assertEqual(inner.dst, self.nat_addr)
1562 self.assertEqual(inner[TCPerror].dport, external_port)
1564 self.logger.error(ppp("Unexpected or invalid packet:", p))
1567 # from service back to client
1568 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1569 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1570 TCP(sport=local_port, dport=12345))
1571 self.pg0.add_stream(p)
1572 self.pg_enable_capture(self.pg_interfaces)
1574 capture = self.pg1.get_capture(1)
1579 self.assertEqual(ip.src, self.nat_addr)
1580 self.assertEqual(tcp.sport, external_port)
1581 self.check_tcp_checksum(p)
1582 self.check_ip_checksum(p)
1584 self.logger.error(ppp("Unexpected or invalid packet:", p))
1588 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1589 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1590 ICMP(type=11) / capture[0][IP])
1591 self.pg1.add_stream(p)
1592 self.pg_enable_capture(self.pg_interfaces)
1594 capture = self.pg0.get_capture(1)
1597 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1599 self.assertEqual(inner.src, self.pg0.remote_ip4)
1600 self.assertEqual(inner[TCPerror].sport, local_port)
1602 self.logger.error(ppp("Unexpected or invalid packet:", p))
1605 # from client to server (no translation)
1606 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1607 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1608 TCP(sport=12346, dport=local_port))
1609 self.pg1.add_stream(p)
1610 self.pg_enable_capture(self.pg_interfaces)
1612 capture = self.pg0.get_capture(1)
1618 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1619 self.assertEqual(tcp.dport, local_port)
1620 self.check_tcp_checksum(p)
1621 self.check_ip_checksum(p)
1623 self.logger.error(ppp("Unexpected or invalid packet:", p))
1626 # from service back to client (no translation)
1627 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1628 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1629 TCP(sport=local_port, dport=12346))
1630 self.pg0.add_stream(p)
1631 self.pg_enable_capture(self.pg_interfaces)
1633 capture = self.pg1.get_capture(1)
1638 self.assertEqual(ip.src, self.pg0.remote_ip4)
1639 self.assertEqual(tcp.sport, local_port)
1640 self.check_tcp_checksum(p)
1641 self.check_ip_checksum(p)
1643 self.logger.error(ppp("Unexpected or invalid packet:", p))
1646 def test_static_vrf_aware(self):
1647 """ 1:1 NAT VRF awareness """
1649 nat_ip1 = "10.0.0.30"
1650 nat_ip2 = "10.0.0.40"
1651 self.tcp_port_out = 6303
1652 self.udp_port_out = 6304
1653 self.icmp_id_out = 6305
1655 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1657 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1659 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1661 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1662 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1664 # inside interface VRF match NAT44 static mapping VRF
1665 pkts = self.create_stream_in(self.pg4, self.pg3)
1666 self.pg4.add_stream(pkts)
1667 self.pg_enable_capture(self.pg_interfaces)
1669 capture = self.pg3.get_capture(len(pkts))
1670 self.verify_capture_out(capture, nat_ip1, True)
1672 # inside interface VRF don't match NAT44 static mapping VRF (packets
1674 pkts = self.create_stream_in(self.pg0, self.pg3)
1675 self.pg0.add_stream(pkts)
1676 self.pg_enable_capture(self.pg_interfaces)
1678 self.pg3.assert_nothing_captured()
1680 def test_dynamic_to_static(self):
1681 """ Switch from dynamic translation to 1:1NAT """
1682 nat_ip = "10.0.0.10"
1683 self.tcp_port_out = 6303
1684 self.udp_port_out = 6304
1685 self.icmp_id_out = 6305
1687 self.nat44_add_address(self.nat_addr)
1688 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1689 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1693 pkts = self.create_stream_in(self.pg0, self.pg1)
1694 self.pg0.add_stream(pkts)
1695 self.pg_enable_capture(self.pg_interfaces)
1697 capture = self.pg1.get_capture(len(pkts))
1698 self.verify_capture_out(capture)
1701 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1702 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1703 self.assertEqual(len(sessions), 0)
1704 pkts = self.create_stream_in(self.pg0, self.pg1)
1705 self.pg0.add_stream(pkts)
1706 self.pg_enable_capture(self.pg_interfaces)
1708 capture = self.pg1.get_capture(len(pkts))
1709 self.verify_capture_out(capture, nat_ip, True)
1711 def test_identity_nat(self):
1712 """ Identity NAT """
1714 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1715 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1716 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1719 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1720 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1721 TCP(sport=12345, dport=56789))
1722 self.pg1.add_stream(p)
1723 self.pg_enable_capture(self.pg_interfaces)
1725 capture = self.pg0.get_capture(1)
1730 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1731 self.assertEqual(ip.src, self.pg1.remote_ip4)
1732 self.assertEqual(tcp.dport, 56789)
1733 self.assertEqual(tcp.sport, 12345)
1734 self.check_tcp_checksum(p)
1735 self.check_ip_checksum(p)
1737 self.logger.error(ppp("Unexpected or invalid packet:", p))
1740 def test_static_lb(self):
1741 """ NAT44 local service load balancing """
1742 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1745 server1 = self.pg0.remote_hosts[0]
1746 server2 = self.pg0.remote_hosts[1]
1748 locals = [{'addr': server1.ip4n,
1751 {'addr': server2.ip4n,
1755 self.nat44_add_address(self.nat_addr)
1756 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1759 local_num=len(locals),
1761 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1762 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1765 # from client to service
1766 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1767 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1768 TCP(sport=12345, dport=external_port))
1769 self.pg1.add_stream(p)
1770 self.pg_enable_capture(self.pg_interfaces)
1772 capture = self.pg0.get_capture(1)
1778 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1779 if ip.dst == server1.ip4:
1783 self.assertEqual(tcp.dport, local_port)
1784 self.check_tcp_checksum(p)
1785 self.check_ip_checksum(p)
1787 self.logger.error(ppp("Unexpected or invalid packet:", p))
1790 # from service back to client
1791 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1792 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1793 TCP(sport=local_port, dport=12345))
1794 self.pg0.add_stream(p)
1795 self.pg_enable_capture(self.pg_interfaces)
1797 capture = self.pg1.get_capture(1)
1802 self.assertEqual(ip.src, self.nat_addr)
1803 self.assertEqual(tcp.sport, external_port)
1804 self.check_tcp_checksum(p)
1805 self.check_ip_checksum(p)
1807 self.logger.error(ppp("Unexpected or invalid packet:", p))
1810 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1811 def test_static_lb_multi_clients(self):
1812 """ NAT44 local service load balancing - multiple clients"""
1814 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1817 server1 = self.pg0.remote_hosts[0]
1818 server2 = self.pg0.remote_hosts[1]
1820 locals = [{'addr': server1.ip4n,
1823 {'addr': server2.ip4n,
1827 self.nat44_add_address(self.nat_addr)
1828 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1831 local_num=len(locals),
1833 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1834 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1839 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1841 for client in clients:
1842 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1843 IP(src=client, dst=self.nat_addr) /
1844 TCP(sport=12345, dport=external_port))
1846 self.pg1.add_stream(pkts)
1847 self.pg_enable_capture(self.pg_interfaces)
1849 capture = self.pg0.get_capture(len(pkts))
1851 if p[IP].dst == server1.ip4:
1855 self.assertTrue(server1_n > server2_n)
1857 def test_static_lb_2(self):
1858 """ NAT44 local service load balancing (asymmetrical rule) """
1859 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1862 server1 = self.pg0.remote_hosts[0]
1863 server2 = self.pg0.remote_hosts[1]
1865 locals = [{'addr': server1.ip4n,
1868 {'addr': server2.ip4n,
1872 self.vapi.nat44_forwarding_enable_disable(1)
1873 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1877 local_num=len(locals),
1879 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1880 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1883 # from client to service
1884 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1885 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1886 TCP(sport=12345, dport=external_port))
1887 self.pg1.add_stream(p)
1888 self.pg_enable_capture(self.pg_interfaces)
1890 capture = self.pg0.get_capture(1)
1896 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1897 if ip.dst == server1.ip4:
1901 self.assertEqual(tcp.dport, local_port)
1902 self.check_tcp_checksum(p)
1903 self.check_ip_checksum(p)
1905 self.logger.error(ppp("Unexpected or invalid packet:", p))
1908 # from service back to client
1909 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1910 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1911 TCP(sport=local_port, dport=12345))
1912 self.pg0.add_stream(p)
1913 self.pg_enable_capture(self.pg_interfaces)
1915 capture = self.pg1.get_capture(1)
1920 self.assertEqual(ip.src, self.nat_addr)
1921 self.assertEqual(tcp.sport, external_port)
1922 self.check_tcp_checksum(p)
1923 self.check_ip_checksum(p)
1925 self.logger.error(ppp("Unexpected or invalid packet:", p))
1928 # from client to server (no translation)
1929 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1930 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1931 TCP(sport=12346, dport=local_port))
1932 self.pg1.add_stream(p)
1933 self.pg_enable_capture(self.pg_interfaces)
1935 capture = self.pg0.get_capture(1)
1941 self.assertEqual(ip.dst, server1.ip4)
1942 self.assertEqual(tcp.dport, local_port)
1943 self.check_tcp_checksum(p)
1944 self.check_ip_checksum(p)
1946 self.logger.error(ppp("Unexpected or invalid packet:", p))
1949 # from service back to client (no translation)
1950 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1951 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1952 TCP(sport=local_port, dport=12346))
1953 self.pg0.add_stream(p)
1954 self.pg_enable_capture(self.pg_interfaces)
1956 capture = self.pg1.get_capture(1)
1961 self.assertEqual(ip.src, server1.ip4)
1962 self.assertEqual(tcp.sport, local_port)
1963 self.check_tcp_checksum(p)
1964 self.check_ip_checksum(p)
1966 self.logger.error(ppp("Unexpected or invalid packet:", p))
1969 def test_multiple_inside_interfaces(self):
1970 """ NAT44 multiple non-overlapping address space inside interfaces """
1972 self.nat44_add_address(self.nat_addr)
1973 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1974 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1975 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1978 # between two NAT44 inside interfaces (no translation)
1979 pkts = self.create_stream_in(self.pg0, self.pg1)
1980 self.pg0.add_stream(pkts)
1981 self.pg_enable_capture(self.pg_interfaces)
1983 capture = self.pg1.get_capture(len(pkts))
1984 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1986 # from NAT44 inside to interface without NAT44 feature (no translation)
1987 pkts = self.create_stream_in(self.pg0, self.pg2)
1988 self.pg0.add_stream(pkts)
1989 self.pg_enable_capture(self.pg_interfaces)
1991 capture = self.pg2.get_capture(len(pkts))
1992 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1994 # in2out 1st interface
1995 pkts = self.create_stream_in(self.pg0, self.pg3)
1996 self.pg0.add_stream(pkts)
1997 self.pg_enable_capture(self.pg_interfaces)
1999 capture = self.pg3.get_capture(len(pkts))
2000 self.verify_capture_out(capture)
2002 # out2in 1st interface
2003 pkts = self.create_stream_out(self.pg3)
2004 self.pg3.add_stream(pkts)
2005 self.pg_enable_capture(self.pg_interfaces)
2007 capture = self.pg0.get_capture(len(pkts))
2008 self.verify_capture_in(capture, self.pg0)
2010 # in2out 2nd interface
2011 pkts = self.create_stream_in(self.pg1, self.pg3)
2012 self.pg1.add_stream(pkts)
2013 self.pg_enable_capture(self.pg_interfaces)
2015 capture = self.pg3.get_capture(len(pkts))
2016 self.verify_capture_out(capture)
2018 # out2in 2nd interface
2019 pkts = self.create_stream_out(self.pg3)
2020 self.pg3.add_stream(pkts)
2021 self.pg_enable_capture(self.pg_interfaces)
2023 capture = self.pg1.get_capture(len(pkts))
2024 self.verify_capture_in(capture, self.pg1)
2026 def test_inside_overlapping_interfaces(self):
2027 """ NAT44 multiple inside interfaces with overlapping address space """
2029 static_nat_ip = "10.0.0.10"
2030 self.nat44_add_address(self.nat_addr)
2031 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2033 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2035 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2036 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2039 # between NAT44 inside interfaces with same VRF (no translation)
2040 pkts = self.create_stream_in(self.pg4, self.pg5)
2041 self.pg4.add_stream(pkts)
2042 self.pg_enable_capture(self.pg_interfaces)
2044 capture = self.pg5.get_capture(len(pkts))
2045 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2047 # between NAT44 inside interfaces with different VRF (hairpinning)
2048 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2049 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2050 TCP(sport=1234, dport=5678))
2051 self.pg4.add_stream(p)
2052 self.pg_enable_capture(self.pg_interfaces)
2054 capture = self.pg6.get_capture(1)
2059 self.assertEqual(ip.src, self.nat_addr)
2060 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2061 self.assertNotEqual(tcp.sport, 1234)
2062 self.assertEqual(tcp.dport, 5678)
2064 self.logger.error(ppp("Unexpected or invalid packet:", p))
2067 # in2out 1st interface
2068 pkts = self.create_stream_in(self.pg4, self.pg3)
2069 self.pg4.add_stream(pkts)
2070 self.pg_enable_capture(self.pg_interfaces)
2072 capture = self.pg3.get_capture(len(pkts))
2073 self.verify_capture_out(capture)
2075 # out2in 1st interface
2076 pkts = self.create_stream_out(self.pg3)
2077 self.pg3.add_stream(pkts)
2078 self.pg_enable_capture(self.pg_interfaces)
2080 capture = self.pg4.get_capture(len(pkts))
2081 self.verify_capture_in(capture, self.pg4)
2083 # in2out 2nd interface
2084 pkts = self.create_stream_in(self.pg5, self.pg3)
2085 self.pg5.add_stream(pkts)
2086 self.pg_enable_capture(self.pg_interfaces)
2088 capture = self.pg3.get_capture(len(pkts))
2089 self.verify_capture_out(capture)
2091 # out2in 2nd interface
2092 pkts = self.create_stream_out(self.pg3)
2093 self.pg3.add_stream(pkts)
2094 self.pg_enable_capture(self.pg_interfaces)
2096 capture = self.pg5.get_capture(len(pkts))
2097 self.verify_capture_in(capture, self.pg5)
2100 addresses = self.vapi.nat44_address_dump()
2101 self.assertEqual(len(addresses), 1)
2102 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2103 self.assertEqual(len(sessions), 3)
2104 for session in sessions:
2105 self.assertFalse(session.is_static)
2106 self.assertEqual(session.inside_ip_address[0:4],
2107 self.pg5.remote_ip4n)
2108 self.assertEqual(session.outside_ip_address,
2109 addresses[0].ip_address)
2110 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2111 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2112 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2113 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2114 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2115 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2116 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2117 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2118 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2120 # in2out 3rd interface
2121 pkts = self.create_stream_in(self.pg6, self.pg3)
2122 self.pg6.add_stream(pkts)
2123 self.pg_enable_capture(self.pg_interfaces)
2125 capture = self.pg3.get_capture(len(pkts))
2126 self.verify_capture_out(capture, static_nat_ip, True)
2128 # out2in 3rd interface
2129 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2130 self.pg3.add_stream(pkts)
2131 self.pg_enable_capture(self.pg_interfaces)
2133 capture = self.pg6.get_capture(len(pkts))
2134 self.verify_capture_in(capture, self.pg6)
2136 # general user and session dump verifications
2137 users = self.vapi.nat44_user_dump()
2138 self.assertTrue(len(users) >= 3)
2139 addresses = self.vapi.nat44_address_dump()
2140 self.assertEqual(len(addresses), 1)
2142 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2144 for session in sessions:
2145 self.assertEqual(user.ip_address, session.inside_ip_address)
2146 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2147 self.assertTrue(session.protocol in
2148 [IP_PROTOS.tcp, IP_PROTOS.udp,
2152 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2153 self.assertTrue(len(sessions) >= 4)
2154 for session in sessions:
2155 self.assertFalse(session.is_static)
2156 self.assertEqual(session.inside_ip_address[0:4],
2157 self.pg4.remote_ip4n)
2158 self.assertEqual(session.outside_ip_address,
2159 addresses[0].ip_address)
2162 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2163 self.assertTrue(len(sessions) >= 3)
2164 for session in sessions:
2165 self.assertTrue(session.is_static)
2166 self.assertEqual(session.inside_ip_address[0:4],
2167 self.pg6.remote_ip4n)
2168 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2169 map(int, static_nat_ip.split('.')))
2170 self.assertTrue(session.inside_port in
2171 [self.tcp_port_in, self.udp_port_in,
2174 def test_hairpinning(self):
2175 """ NAT44 hairpinning - 1:1 NAPT """
2177 host = self.pg0.remote_hosts[0]
2178 server = self.pg0.remote_hosts[1]
2181 server_in_port = 5678
2182 server_out_port = 8765
2184 self.nat44_add_address(self.nat_addr)
2185 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2186 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2188 # add static mapping for server
2189 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2190 server_in_port, server_out_port,
2191 proto=IP_PROTOS.tcp)
2193 # send packet from host to server
2194 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2195 IP(src=host.ip4, dst=self.nat_addr) /
2196 TCP(sport=host_in_port, dport=server_out_port))
2197 self.pg0.add_stream(p)
2198 self.pg_enable_capture(self.pg_interfaces)
2200 capture = self.pg0.get_capture(1)
2205 self.assertEqual(ip.src, self.nat_addr)
2206 self.assertEqual(ip.dst, server.ip4)
2207 self.assertNotEqual(tcp.sport, host_in_port)
2208 self.assertEqual(tcp.dport, server_in_port)
2209 self.check_tcp_checksum(p)
2210 host_out_port = tcp.sport
2212 self.logger.error(ppp("Unexpected or invalid packet:", p))
2215 # send reply from server to host
2216 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2217 IP(src=server.ip4, dst=self.nat_addr) /
2218 TCP(sport=server_in_port, dport=host_out_port))
2219 self.pg0.add_stream(p)
2220 self.pg_enable_capture(self.pg_interfaces)
2222 capture = self.pg0.get_capture(1)
2227 self.assertEqual(ip.src, self.nat_addr)
2228 self.assertEqual(ip.dst, host.ip4)
2229 self.assertEqual(tcp.sport, server_out_port)
2230 self.assertEqual(tcp.dport, host_in_port)
2231 self.check_tcp_checksum(p)
2233 self.logger.error(ppp("Unexpected or invalid packet:", p))
2236 def test_hairpinning2(self):
2237 """ NAT44 hairpinning - 1:1 NAT"""
2239 server1_nat_ip = "10.0.0.10"
2240 server2_nat_ip = "10.0.0.11"
2241 host = self.pg0.remote_hosts[0]
2242 server1 = self.pg0.remote_hosts[1]
2243 server2 = self.pg0.remote_hosts[2]
2244 server_tcp_port = 22
2245 server_udp_port = 20
2247 self.nat44_add_address(self.nat_addr)
2248 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2249 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2252 # add static mapping for servers
2253 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2254 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2258 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2259 IP(src=host.ip4, dst=server1_nat_ip) /
2260 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2262 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2263 IP(src=host.ip4, dst=server1_nat_ip) /
2264 UDP(sport=self.udp_port_in, dport=server_udp_port))
2266 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2267 IP(src=host.ip4, dst=server1_nat_ip) /
2268 ICMP(id=self.icmp_id_in, type='echo-request'))
2270 self.pg0.add_stream(pkts)
2271 self.pg_enable_capture(self.pg_interfaces)
2273 capture = self.pg0.get_capture(len(pkts))
2274 for packet in capture:
2276 self.assertEqual(packet[IP].src, self.nat_addr)
2277 self.assertEqual(packet[IP].dst, server1.ip4)
2278 if packet.haslayer(TCP):
2279 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2280 self.assertEqual(packet[TCP].dport, server_tcp_port)
2281 self.tcp_port_out = packet[TCP].sport
2282 self.check_tcp_checksum(packet)
2283 elif packet.haslayer(UDP):
2284 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2285 self.assertEqual(packet[UDP].dport, server_udp_port)
2286 self.udp_port_out = packet[UDP].sport
2288 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2289 self.icmp_id_out = packet[ICMP].id
2291 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2296 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2297 IP(src=server1.ip4, dst=self.nat_addr) /
2298 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2300 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2301 IP(src=server1.ip4, dst=self.nat_addr) /
2302 UDP(sport=server_udp_port, dport=self.udp_port_out))
2304 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2305 IP(src=server1.ip4, dst=self.nat_addr) /
2306 ICMP(id=self.icmp_id_out, type='echo-reply'))
2308 self.pg0.add_stream(pkts)
2309 self.pg_enable_capture(self.pg_interfaces)
2311 capture = self.pg0.get_capture(len(pkts))
2312 for packet in capture:
2314 self.assertEqual(packet[IP].src, server1_nat_ip)
2315 self.assertEqual(packet[IP].dst, host.ip4)
2316 if packet.haslayer(TCP):
2317 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2318 self.assertEqual(packet[TCP].sport, server_tcp_port)
2319 self.check_tcp_checksum(packet)
2320 elif packet.haslayer(UDP):
2321 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2322 self.assertEqual(packet[UDP].sport, server_udp_port)
2324 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2326 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2329 # server2 to server1
2331 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2332 IP(src=server2.ip4, dst=server1_nat_ip) /
2333 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2335 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2336 IP(src=server2.ip4, dst=server1_nat_ip) /
2337 UDP(sport=self.udp_port_in, dport=server_udp_port))
2339 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2340 IP(src=server2.ip4, dst=server1_nat_ip) /
2341 ICMP(id=self.icmp_id_in, type='echo-request'))
2343 self.pg0.add_stream(pkts)
2344 self.pg_enable_capture(self.pg_interfaces)
2346 capture = self.pg0.get_capture(len(pkts))
2347 for packet in capture:
2349 self.assertEqual(packet[IP].src, server2_nat_ip)
2350 self.assertEqual(packet[IP].dst, server1.ip4)
2351 if packet.haslayer(TCP):
2352 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2353 self.assertEqual(packet[TCP].dport, server_tcp_port)
2354 self.tcp_port_out = packet[TCP].sport
2355 self.check_tcp_checksum(packet)
2356 elif packet.haslayer(UDP):
2357 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2358 self.assertEqual(packet[UDP].dport, server_udp_port)
2359 self.udp_port_out = packet[UDP].sport
2361 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2362 self.icmp_id_out = packet[ICMP].id
2364 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2367 # server1 to server2
2369 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2370 IP(src=server1.ip4, dst=server2_nat_ip) /
2371 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2373 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2374 IP(src=server1.ip4, dst=server2_nat_ip) /
2375 UDP(sport=server_udp_port, dport=self.udp_port_out))
2377 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2378 IP(src=server1.ip4, dst=server2_nat_ip) /
2379 ICMP(id=self.icmp_id_out, type='echo-reply'))
2381 self.pg0.add_stream(pkts)
2382 self.pg_enable_capture(self.pg_interfaces)
2384 capture = self.pg0.get_capture(len(pkts))
2385 for packet in capture:
2387 self.assertEqual(packet[IP].src, server1_nat_ip)
2388 self.assertEqual(packet[IP].dst, server2.ip4)
2389 if packet.haslayer(TCP):
2390 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2391 self.assertEqual(packet[TCP].sport, server_tcp_port)
2392 self.check_tcp_checksum(packet)
2393 elif packet.haslayer(UDP):
2394 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2395 self.assertEqual(packet[UDP].sport, server_udp_port)
2397 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2399 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2402 def test_max_translations_per_user(self):
2403 """ MAX translations per user - recycle the least recently used """
2405 self.nat44_add_address(self.nat_addr)
2406 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2407 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2410 # get maximum number of translations per user
2411 nat44_config = self.vapi.nat_show_config()
2413 # send more than maximum number of translations per user packets
2414 pkts_num = nat44_config.max_translations_per_user + 5
2416 for port in range(0, pkts_num):
2417 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2418 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2419 TCP(sport=1025 + port))
2421 self.pg0.add_stream(pkts)
2422 self.pg_enable_capture(self.pg_interfaces)
2425 # verify number of translated packet
2426 self.pg1.get_capture(pkts_num)
2428 def test_interface_addr(self):
2429 """ Acquire NAT44 addresses from interface """
2430 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2432 # no address in NAT pool
2433 adresses = self.vapi.nat44_address_dump()
2434 self.assertEqual(0, len(adresses))
2436 # configure interface address and check NAT address pool
2437 self.pg7.config_ip4()
2438 adresses = self.vapi.nat44_address_dump()
2439 self.assertEqual(1, len(adresses))
2440 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2442 # remove interface address and check NAT address pool
2443 self.pg7.unconfig_ip4()
2444 adresses = self.vapi.nat44_address_dump()
2445 self.assertEqual(0, len(adresses))
2447 def test_interface_addr_static_mapping(self):
2448 """ Static mapping with addresses from interface """
2451 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2452 self.nat44_add_static_mapping(
2454 external_sw_if_index=self.pg7.sw_if_index,
2457 # static mappings with external interface
2458 static_mappings = self.vapi.nat44_static_mapping_dump()
2459 self.assertEqual(1, len(static_mappings))
2460 self.assertEqual(self.pg7.sw_if_index,
2461 static_mappings[0].external_sw_if_index)
2462 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2464 # configure interface address and check static mappings
2465 self.pg7.config_ip4()
2466 static_mappings = self.vapi.nat44_static_mapping_dump()
2467 self.assertEqual(2, len(static_mappings))
2469 for sm in static_mappings:
2470 if sm.external_sw_if_index == 0xFFFFFFFF:
2471 self.assertEqual(sm.external_ip_address[0:4],
2472 self.pg7.local_ip4n)
2473 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2475 self.assertTrue(resolved)
2477 # remove interface address and check static mappings
2478 self.pg7.unconfig_ip4()
2479 static_mappings = self.vapi.nat44_static_mapping_dump()
2480 self.assertEqual(1, len(static_mappings))
2481 self.assertEqual(self.pg7.sw_if_index,
2482 static_mappings[0].external_sw_if_index)
2483 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2485 # configure interface address again and check static mappings
2486 self.pg7.config_ip4()
2487 static_mappings = self.vapi.nat44_static_mapping_dump()
2488 self.assertEqual(2, len(static_mappings))
2490 for sm in static_mappings:
2491 if sm.external_sw_if_index == 0xFFFFFFFF:
2492 self.assertEqual(sm.external_ip_address[0:4],
2493 self.pg7.local_ip4n)
2494 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2496 self.assertTrue(resolved)
2498 # remove static mapping
2499 self.nat44_add_static_mapping(
2501 external_sw_if_index=self.pg7.sw_if_index,
2504 static_mappings = self.vapi.nat44_static_mapping_dump()
2505 self.assertEqual(0, len(static_mappings))
2507 def test_interface_addr_identity_nat(self):
2508 """ Identity NAT with addresses from interface """
2511 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2512 self.vapi.nat44_add_del_identity_mapping(
2513 sw_if_index=self.pg7.sw_if_index,
2515 protocol=IP_PROTOS.tcp,
2518 # identity mappings with external interface
2519 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2520 self.assertEqual(1, len(identity_mappings))
2521 self.assertEqual(self.pg7.sw_if_index,
2522 identity_mappings[0].sw_if_index)
2524 # configure interface address and check identity mappings
2525 self.pg7.config_ip4()
2526 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2528 self.assertEqual(2, len(identity_mappings))
2529 for sm in identity_mappings:
2530 if sm.sw_if_index == 0xFFFFFFFF:
2531 self.assertEqual(identity_mappings[0].ip_address,
2532 self.pg7.local_ip4n)
2533 self.assertEqual(port, identity_mappings[0].port)
2534 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2536 self.assertTrue(resolved)
2538 # remove interface address and check identity mappings
2539 self.pg7.unconfig_ip4()
2540 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2541 self.assertEqual(1, len(identity_mappings))
2542 self.assertEqual(self.pg7.sw_if_index,
2543 identity_mappings[0].sw_if_index)
2545 def test_ipfix_nat44_sess(self):
2546 """ IPFIX logging NAT44 session created/delted """
2547 self.ipfix_domain_id = 10
2548 self.ipfix_src_port = 20202
2549 colector_port = 30303
2550 bind_layers(UDP, IPFIX, dport=30303)
2551 self.nat44_add_address(self.nat_addr)
2552 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2553 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2555 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2556 src_address=self.pg3.local_ip4n,
2558 template_interval=10,
2559 collector_port=colector_port)
2560 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2561 src_port=self.ipfix_src_port)
2563 pkts = self.create_stream_in(self.pg0, self.pg1)
2564 self.pg0.add_stream(pkts)
2565 self.pg_enable_capture(self.pg_interfaces)
2567 capture = self.pg1.get_capture(len(pkts))
2568 self.verify_capture_out(capture)
2569 self.nat44_add_address(self.nat_addr, is_add=0)
2570 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2571 capture = self.pg3.get_capture(9)
2572 ipfix = IPFIXDecoder()
2573 # first load template
2575 self.assertTrue(p.haslayer(IPFIX))
2576 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2577 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2578 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2579 self.assertEqual(p[UDP].dport, colector_port)
2580 self.assertEqual(p[IPFIX].observationDomainID,
2581 self.ipfix_domain_id)
2582 if p.haslayer(Template):
2583 ipfix.add_template(p.getlayer(Template))
2584 # verify events in data set
2586 if p.haslayer(Data):
2587 data = ipfix.decode_data_set(p.getlayer(Set))
2588 self.verify_ipfix_nat44_ses(data)
2590 def test_ipfix_addr_exhausted(self):
2591 """ IPFIX logging NAT addresses exhausted """
2592 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2593 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2595 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2596 src_address=self.pg3.local_ip4n,
2598 template_interval=10)
2599 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2600 src_port=self.ipfix_src_port)
2602 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2603 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2605 self.pg0.add_stream(p)
2606 self.pg_enable_capture(self.pg_interfaces)
2608 capture = self.pg1.get_capture(0)
2609 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2610 capture = self.pg3.get_capture(9)
2611 ipfix = IPFIXDecoder()
2612 # first load template
2614 self.assertTrue(p.haslayer(IPFIX))
2615 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2616 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2617 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2618 self.assertEqual(p[UDP].dport, 4739)
2619 self.assertEqual(p[IPFIX].observationDomainID,
2620 self.ipfix_domain_id)
2621 if p.haslayer(Template):
2622 ipfix.add_template(p.getlayer(Template))
2623 # verify events in data set
2625 if p.haslayer(Data):
2626 data = ipfix.decode_data_set(p.getlayer(Set))
2627 self.verify_ipfix_addr_exhausted(data)
2629 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2630 def test_ipfix_max_sessions(self):
2631 """ IPFIX logging maximum session entries exceeded """
2632 self.nat44_add_address(self.nat_addr)
2633 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2634 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2637 nat44_config = self.vapi.nat_show_config()
2638 max_sessions = 10 * nat44_config.translation_buckets
2641 for i in range(0, max_sessions):
2642 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2643 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2644 IP(src=src, dst=self.pg1.remote_ip4) /
2647 self.pg0.add_stream(pkts)
2648 self.pg_enable_capture(self.pg_interfaces)
2651 self.pg1.get_capture(max_sessions)
2652 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2653 src_address=self.pg3.local_ip4n,
2655 template_interval=10)
2656 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2657 src_port=self.ipfix_src_port)
2659 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2660 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2662 self.pg0.add_stream(p)
2663 self.pg_enable_capture(self.pg_interfaces)
2665 self.pg1.get_capture(0)
2666 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2667 capture = self.pg3.get_capture(9)
2668 ipfix = IPFIXDecoder()
2669 # first load template
2671 self.assertTrue(p.haslayer(IPFIX))
2672 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2673 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2674 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2675 self.assertEqual(p[UDP].dport, 4739)
2676 self.assertEqual(p[IPFIX].observationDomainID,
2677 self.ipfix_domain_id)
2678 if p.haslayer(Template):
2679 ipfix.add_template(p.getlayer(Template))
2680 # verify events in data set
2682 if p.haslayer(Data):
2683 data = ipfix.decode_data_set(p.getlayer(Set))
2684 self.verify_ipfix_max_sessions(data, max_sessions)
2686 def test_pool_addr_fib(self):
2687 """ NAT44 add pool addresses to FIB """
2688 static_addr = '10.0.0.10'
2689 self.nat44_add_address(self.nat_addr)
2690 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2691 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2693 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2696 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2697 ARP(op=ARP.who_has, pdst=self.nat_addr,
2698 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2699 self.pg1.add_stream(p)
2700 self.pg_enable_capture(self.pg_interfaces)
2702 capture = self.pg1.get_capture(1)
2703 self.assertTrue(capture[0].haslayer(ARP))
2704 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2707 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2708 ARP(op=ARP.who_has, pdst=static_addr,
2709 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2710 self.pg1.add_stream(p)
2711 self.pg_enable_capture(self.pg_interfaces)
2713 capture = self.pg1.get_capture(1)
2714 self.assertTrue(capture[0].haslayer(ARP))
2715 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2717 # send ARP to non-NAT44 interface
2718 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2719 ARP(op=ARP.who_has, pdst=self.nat_addr,
2720 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2721 self.pg2.add_stream(p)
2722 self.pg_enable_capture(self.pg_interfaces)
2724 capture = self.pg1.get_capture(0)
2726 # remove addresses and verify
2727 self.nat44_add_address(self.nat_addr, is_add=0)
2728 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2731 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2732 ARP(op=ARP.who_has, pdst=self.nat_addr,
2733 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2734 self.pg1.add_stream(p)
2735 self.pg_enable_capture(self.pg_interfaces)
2737 capture = self.pg1.get_capture(0)
2739 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2740 ARP(op=ARP.who_has, pdst=static_addr,
2741 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2742 self.pg1.add_stream(p)
2743 self.pg_enable_capture(self.pg_interfaces)
2745 capture = self.pg1.get_capture(0)
2747 def test_vrf_mode(self):
2748 """ NAT44 tenant VRF aware address pool mode """
2752 nat_ip1 = "10.0.0.10"
2753 nat_ip2 = "10.0.0.11"
2755 self.pg0.unconfig_ip4()
2756 self.pg1.unconfig_ip4()
2757 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2758 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2759 self.pg0.set_table_ip4(vrf_id1)
2760 self.pg1.set_table_ip4(vrf_id2)
2761 self.pg0.config_ip4()
2762 self.pg1.config_ip4()
2764 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2765 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2768 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2772 pkts = self.create_stream_in(self.pg0, self.pg2)
2773 self.pg0.add_stream(pkts)
2774 self.pg_enable_capture(self.pg_interfaces)
2776 capture = self.pg2.get_capture(len(pkts))
2777 self.verify_capture_out(capture, nat_ip1)
2780 pkts = self.create_stream_in(self.pg1, self.pg2)
2781 self.pg1.add_stream(pkts)
2782 self.pg_enable_capture(self.pg_interfaces)
2784 capture = self.pg2.get_capture(len(pkts))
2785 self.verify_capture_out(capture, nat_ip2)
2787 self.pg0.unconfig_ip4()
2788 self.pg1.unconfig_ip4()
2789 self.pg0.set_table_ip4(0)
2790 self.pg1.set_table_ip4(0)
2791 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2792 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2794 def test_vrf_feature_independent(self):
2795 """ NAT44 tenant VRF independent address pool mode """
2797 nat_ip1 = "10.0.0.10"
2798 nat_ip2 = "10.0.0.11"
2800 self.nat44_add_address(nat_ip1)
2801 self.nat44_add_address(nat_ip2, vrf_id=99)
2802 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2803 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2804 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2808 pkts = self.create_stream_in(self.pg0, self.pg2)
2809 self.pg0.add_stream(pkts)
2810 self.pg_enable_capture(self.pg_interfaces)
2812 capture = self.pg2.get_capture(len(pkts))
2813 self.verify_capture_out(capture, nat_ip1)
2816 pkts = self.create_stream_in(self.pg1, self.pg2)
2817 self.pg1.add_stream(pkts)
2818 self.pg_enable_capture(self.pg_interfaces)
2820 capture = self.pg2.get_capture(len(pkts))
2821 self.verify_capture_out(capture, nat_ip1)
2823 def test_dynamic_ipless_interfaces(self):
2824 """ NAT44 interfaces without configured IP address """
2826 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2827 mactobinary(self.pg7.remote_mac),
2828 self.pg7.remote_ip4n,
2830 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2831 mactobinary(self.pg8.remote_mac),
2832 self.pg8.remote_ip4n,
2835 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2836 dst_address_length=32,
2837 next_hop_address=self.pg7.remote_ip4n,
2838 next_hop_sw_if_index=self.pg7.sw_if_index)
2839 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2840 dst_address_length=32,
2841 next_hop_address=self.pg8.remote_ip4n,
2842 next_hop_sw_if_index=self.pg8.sw_if_index)
2844 self.nat44_add_address(self.nat_addr)
2845 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2846 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2850 pkts = self.create_stream_in(self.pg7, self.pg8)
2851 self.pg7.add_stream(pkts)
2852 self.pg_enable_capture(self.pg_interfaces)
2854 capture = self.pg8.get_capture(len(pkts))
2855 self.verify_capture_out(capture)
2858 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2859 self.pg8.add_stream(pkts)
2860 self.pg_enable_capture(self.pg_interfaces)
2862 capture = self.pg7.get_capture(len(pkts))
2863 self.verify_capture_in(capture, self.pg7)
2865 def test_static_ipless_interfaces(self):
2866 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2868 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2869 mactobinary(self.pg7.remote_mac),
2870 self.pg7.remote_ip4n,
2872 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2873 mactobinary(self.pg8.remote_mac),
2874 self.pg8.remote_ip4n,
2877 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2878 dst_address_length=32,
2879 next_hop_address=self.pg7.remote_ip4n,
2880 next_hop_sw_if_index=self.pg7.sw_if_index)
2881 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2882 dst_address_length=32,
2883 next_hop_address=self.pg8.remote_ip4n,
2884 next_hop_sw_if_index=self.pg8.sw_if_index)
2886 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2887 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2888 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2892 pkts = self.create_stream_out(self.pg8)
2893 self.pg8.add_stream(pkts)
2894 self.pg_enable_capture(self.pg_interfaces)
2896 capture = self.pg7.get_capture(len(pkts))
2897 self.verify_capture_in(capture, self.pg7)
2900 pkts = self.create_stream_in(self.pg7, self.pg8)
2901 self.pg7.add_stream(pkts)
2902 self.pg_enable_capture(self.pg_interfaces)
2904 capture = self.pg8.get_capture(len(pkts))
2905 self.verify_capture_out(capture, self.nat_addr, True)
2907 def test_static_with_port_ipless_interfaces(self):
2908 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2910 self.tcp_port_out = 30606
2911 self.udp_port_out = 30607
2912 self.icmp_id_out = 30608
2914 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2915 mactobinary(self.pg7.remote_mac),
2916 self.pg7.remote_ip4n,
2918 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2919 mactobinary(self.pg8.remote_mac),
2920 self.pg8.remote_ip4n,
2923 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2924 dst_address_length=32,
2925 next_hop_address=self.pg7.remote_ip4n,
2926 next_hop_sw_if_index=self.pg7.sw_if_index)
2927 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2928 dst_address_length=32,
2929 next_hop_address=self.pg8.remote_ip4n,
2930 next_hop_sw_if_index=self.pg8.sw_if_index)
2932 self.nat44_add_address(self.nat_addr)
2933 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2934 self.tcp_port_in, self.tcp_port_out,
2935 proto=IP_PROTOS.tcp)
2936 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2937 self.udp_port_in, self.udp_port_out,
2938 proto=IP_PROTOS.udp)
2939 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2940 self.icmp_id_in, self.icmp_id_out,
2941 proto=IP_PROTOS.icmp)
2942 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2943 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2947 pkts = self.create_stream_out(self.pg8)
2948 self.pg8.add_stream(pkts)
2949 self.pg_enable_capture(self.pg_interfaces)
2951 capture = self.pg7.get_capture(len(pkts))
2952 self.verify_capture_in(capture, self.pg7)
2955 pkts = self.create_stream_in(self.pg7, self.pg8)
2956 self.pg7.add_stream(pkts)
2957 self.pg_enable_capture(self.pg_interfaces)
2959 capture = self.pg8.get_capture(len(pkts))
2960 self.verify_capture_out(capture)
2962 def test_static_unknown_proto(self):
2963 """ 1:1 NAT translate packet with unknown protocol """
2964 nat_ip = "10.0.0.10"
2965 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2966 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2967 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2971 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2972 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2974 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2975 TCP(sport=1234, dport=1234))
2976 self.pg0.add_stream(p)
2977 self.pg_enable_capture(self.pg_interfaces)
2979 p = self.pg1.get_capture(1)
2982 self.assertEqual(packet[IP].src, nat_ip)
2983 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2984 self.assertTrue(packet.haslayer(GRE))
2985 self.check_ip_checksum(packet)
2987 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2991 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2992 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2994 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2995 TCP(sport=1234, dport=1234))
2996 self.pg1.add_stream(p)
2997 self.pg_enable_capture(self.pg_interfaces)
2999 p = self.pg0.get_capture(1)
3002 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3003 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3004 self.assertTrue(packet.haslayer(GRE))
3005 self.check_ip_checksum(packet)
3007 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3010 def test_hairpinning_static_unknown_proto(self):
3011 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
3013 host = self.pg0.remote_hosts[0]
3014 server = self.pg0.remote_hosts[1]
3016 host_nat_ip = "10.0.0.10"
3017 server_nat_ip = "10.0.0.11"
3019 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
3020 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3021 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3022 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3026 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3027 IP(src=host.ip4, dst=server_nat_ip) /
3029 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3030 TCP(sport=1234, dport=1234))
3031 self.pg0.add_stream(p)
3032 self.pg_enable_capture(self.pg_interfaces)
3034 p = self.pg0.get_capture(1)
3037 self.assertEqual(packet[IP].src, host_nat_ip)
3038 self.assertEqual(packet[IP].dst, server.ip4)
3039 self.assertTrue(packet.haslayer(GRE))
3040 self.check_ip_checksum(packet)
3042 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3046 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3047 IP(src=server.ip4, dst=host_nat_ip) /
3049 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3050 TCP(sport=1234, dport=1234))
3051 self.pg0.add_stream(p)
3052 self.pg_enable_capture(self.pg_interfaces)
3054 p = self.pg0.get_capture(1)
3057 self.assertEqual(packet[IP].src, server_nat_ip)
3058 self.assertEqual(packet[IP].dst, host.ip4)
3059 self.assertTrue(packet.haslayer(GRE))
3060 self.check_ip_checksum(packet)
3062 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3065 def test_unknown_proto(self):
3066 """ NAT44 translate packet with unknown protocol """
3067 self.nat44_add_address(self.nat_addr)
3068 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3069 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3073 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3074 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3075 TCP(sport=self.tcp_port_in, dport=20))
3076 self.pg0.add_stream(p)
3077 self.pg_enable_capture(self.pg_interfaces)
3079 p = self.pg1.get_capture(1)
3081 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3082 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3084 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3085 TCP(sport=1234, dport=1234))
3086 self.pg0.add_stream(p)
3087 self.pg_enable_capture(self.pg_interfaces)
3089 p = self.pg1.get_capture(1)
3092 self.assertEqual(packet[IP].src, self.nat_addr)
3093 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3094 self.assertTrue(packet.haslayer(GRE))
3095 self.check_ip_checksum(packet)
3097 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3101 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3102 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3104 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3105 TCP(sport=1234, dport=1234))
3106 self.pg1.add_stream(p)
3107 self.pg_enable_capture(self.pg_interfaces)
3109 p = self.pg0.get_capture(1)
3112 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3113 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3114 self.assertTrue(packet.haslayer(GRE))
3115 self.check_ip_checksum(packet)
3117 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3120 def test_hairpinning_unknown_proto(self):
3121 """ NAT44 translate packet with unknown protocol - hairpinning """
3122 host = self.pg0.remote_hosts[0]
3123 server = self.pg0.remote_hosts[1]
3126 server_in_port = 5678
3127 server_out_port = 8765
3128 server_nat_ip = "10.0.0.11"
3130 self.nat44_add_address(self.nat_addr)
3131 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3132 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3135 # add static mapping for server
3136 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3139 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3140 IP(src=host.ip4, dst=server_nat_ip) /
3141 TCP(sport=host_in_port, dport=server_out_port))
3142 self.pg0.add_stream(p)
3143 self.pg_enable_capture(self.pg_interfaces)
3145 capture = self.pg0.get_capture(1)
3147 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3148 IP(src=host.ip4, dst=server_nat_ip) /
3150 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3151 TCP(sport=1234, dport=1234))
3152 self.pg0.add_stream(p)
3153 self.pg_enable_capture(self.pg_interfaces)
3155 p = self.pg0.get_capture(1)
3158 self.assertEqual(packet[IP].src, self.nat_addr)
3159 self.assertEqual(packet[IP].dst, server.ip4)
3160 self.assertTrue(packet.haslayer(GRE))
3161 self.check_ip_checksum(packet)
3163 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3167 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3168 IP(src=server.ip4, dst=self.nat_addr) /
3170 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3171 TCP(sport=1234, dport=1234))
3172 self.pg0.add_stream(p)
3173 self.pg_enable_capture(self.pg_interfaces)
3175 p = self.pg0.get_capture(1)
3178 self.assertEqual(packet[IP].src, server_nat_ip)
3179 self.assertEqual(packet[IP].dst, host.ip4)
3180 self.assertTrue(packet.haslayer(GRE))
3181 self.check_ip_checksum(packet)
3183 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3186 def test_output_feature(self):
3187 """ NAT44 interface output feature (in2out postrouting) """
3188 self.nat44_add_address(self.nat_addr)
3189 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3190 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3191 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3195 pkts = self.create_stream_in(self.pg0, self.pg3)
3196 self.pg0.add_stream(pkts)
3197 self.pg_enable_capture(self.pg_interfaces)
3199 capture = self.pg3.get_capture(len(pkts))
3200 self.verify_capture_out(capture)
3203 pkts = self.create_stream_out(self.pg3)
3204 self.pg3.add_stream(pkts)
3205 self.pg_enable_capture(self.pg_interfaces)
3207 capture = self.pg0.get_capture(len(pkts))
3208 self.verify_capture_in(capture, self.pg0)
3210 # from non-NAT interface to NAT inside interface
3211 pkts = self.create_stream_in(self.pg2, self.pg0)
3212 self.pg2.add_stream(pkts)
3213 self.pg_enable_capture(self.pg_interfaces)
3215 capture = self.pg0.get_capture(len(pkts))
3216 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3218 def test_output_feature_vrf_aware(self):
3219 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3220 nat_ip_vrf10 = "10.0.0.10"
3221 nat_ip_vrf20 = "10.0.0.20"
3223 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3224 dst_address_length=32,
3225 next_hop_address=self.pg3.remote_ip4n,
3226 next_hop_sw_if_index=self.pg3.sw_if_index,
3228 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3229 dst_address_length=32,
3230 next_hop_address=self.pg3.remote_ip4n,
3231 next_hop_sw_if_index=self.pg3.sw_if_index,
3234 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3235 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3236 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3237 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3238 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3242 pkts = self.create_stream_in(self.pg4, self.pg3)
3243 self.pg4.add_stream(pkts)
3244 self.pg_enable_capture(self.pg_interfaces)
3246 capture = self.pg3.get_capture(len(pkts))
3247 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3250 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3251 self.pg3.add_stream(pkts)
3252 self.pg_enable_capture(self.pg_interfaces)
3254 capture = self.pg4.get_capture(len(pkts))
3255 self.verify_capture_in(capture, self.pg4)
3258 pkts = self.create_stream_in(self.pg6, self.pg3)
3259 self.pg6.add_stream(pkts)
3260 self.pg_enable_capture(self.pg_interfaces)
3262 capture = self.pg3.get_capture(len(pkts))
3263 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3266 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3267 self.pg3.add_stream(pkts)
3268 self.pg_enable_capture(self.pg_interfaces)
3270 capture = self.pg6.get_capture(len(pkts))
3271 self.verify_capture_in(capture, self.pg6)
3273 def test_output_feature_hairpinning(self):
3274 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3275 host = self.pg0.remote_hosts[0]
3276 server = self.pg0.remote_hosts[1]
3279 server_in_port = 5678
3280 server_out_port = 8765
3282 self.nat44_add_address(self.nat_addr)
3283 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3284 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3287 # add static mapping for server
3288 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3289 server_in_port, server_out_port,
3290 proto=IP_PROTOS.tcp)
3292 # send packet from host to server
3293 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3294 IP(src=host.ip4, dst=self.nat_addr) /
3295 TCP(sport=host_in_port, dport=server_out_port))
3296 self.pg0.add_stream(p)
3297 self.pg_enable_capture(self.pg_interfaces)
3299 capture = self.pg0.get_capture(1)
3304 self.assertEqual(ip.src, self.nat_addr)
3305 self.assertEqual(ip.dst, server.ip4)
3306 self.assertNotEqual(tcp.sport, host_in_port)
3307 self.assertEqual(tcp.dport, server_in_port)
3308 self.check_tcp_checksum(p)
3309 host_out_port = tcp.sport
3311 self.logger.error(ppp("Unexpected or invalid packet:", p))
3314 # send reply from server to host
3315 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3316 IP(src=server.ip4, dst=self.nat_addr) /
3317 TCP(sport=server_in_port, dport=host_out_port))
3318 self.pg0.add_stream(p)
3319 self.pg_enable_capture(self.pg_interfaces)
3321 capture = self.pg0.get_capture(1)
3326 self.assertEqual(ip.src, self.nat_addr)
3327 self.assertEqual(ip.dst, host.ip4)
3328 self.assertEqual(tcp.sport, server_out_port)
3329 self.assertEqual(tcp.dport, host_in_port)
3330 self.check_tcp_checksum(p)
3332 self.logger.error(ppp("Unexpected or invalid packet:", p))
3335 def test_output_feature_and_service(self):
3336 """ NAT44 interface output feature and services """
3337 external_addr = '1.2.3.4'
3341 self.vapi.nat44_forwarding_enable_disable(1)
3342 self.nat44_add_address(self.nat_addr)
3343 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3344 local_port, external_port,
3345 proto=IP_PROTOS.tcp, out2in_only=1)
3346 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3347 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3350 # from client to service
3351 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3352 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3353 TCP(sport=12345, dport=external_port))
3354 self.pg1.add_stream(p)
3355 self.pg_enable_capture(self.pg_interfaces)
3357 capture = self.pg0.get_capture(1)
3363 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3364 self.assertEqual(tcp.dport, local_port)
3365 self.check_tcp_checksum(p)
3366 self.check_ip_checksum(p)
3368 self.logger.error(ppp("Unexpected or invalid packet:", p))
3371 # from service back to client
3372 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3373 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3374 TCP(sport=local_port, dport=12345))
3375 self.pg0.add_stream(p)
3376 self.pg_enable_capture(self.pg_interfaces)
3378 capture = self.pg1.get_capture(1)
3383 self.assertEqual(ip.src, external_addr)
3384 self.assertEqual(tcp.sport, external_port)
3385 self.check_tcp_checksum(p)
3386 self.check_ip_checksum(p)
3388 self.logger.error(ppp("Unexpected or invalid packet:", p))
3391 # from local network host to external network
3392 pkts = self.create_stream_in(self.pg0, self.pg1)
3393 self.pg0.add_stream(pkts)
3394 self.pg_enable_capture(self.pg_interfaces)
3396 capture = self.pg1.get_capture(len(pkts))
3397 self.verify_capture_out(capture)
3398 pkts = self.create_stream_in(self.pg0, self.pg1)
3399 self.pg0.add_stream(pkts)
3400 self.pg_enable_capture(self.pg_interfaces)
3402 capture = self.pg1.get_capture(len(pkts))
3403 self.verify_capture_out(capture)
3405 # from external network back to local network host
3406 pkts = self.create_stream_out(self.pg1)
3407 self.pg1.add_stream(pkts)
3408 self.pg_enable_capture(self.pg_interfaces)
3410 capture = self.pg0.get_capture(len(pkts))
3411 self.verify_capture_in(capture, self.pg0)
3413 def test_output_feature_and_service2(self):
3414 """ NAT44 interface output feature and service host direct access """
3415 self.vapi.nat44_forwarding_enable_disable(1)
3416 self.nat44_add_address(self.nat_addr)
3417 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3420 # session initiaded from service host - translate
3421 pkts = self.create_stream_in(self.pg0, self.pg1)
3422 self.pg0.add_stream(pkts)
3423 self.pg_enable_capture(self.pg_interfaces)
3425 capture = self.pg1.get_capture(len(pkts))
3426 self.verify_capture_out(capture)
3428 pkts = self.create_stream_out(self.pg1)
3429 self.pg1.add_stream(pkts)
3430 self.pg_enable_capture(self.pg_interfaces)
3432 capture = self.pg0.get_capture(len(pkts))
3433 self.verify_capture_in(capture, self.pg0)
3435 tcp_port_out = self.tcp_port_out
3436 udp_port_out = self.udp_port_out
3437 icmp_id_out = self.icmp_id_out
3439 # session initiaded from remote host - do not translate
3440 pkts = self.create_stream_out(self.pg1,
3441 self.pg0.remote_ip4,
3442 use_inside_ports=True)
3443 self.pg1.add_stream(pkts)
3444 self.pg_enable_capture(self.pg_interfaces)
3446 capture = self.pg0.get_capture(len(pkts))
3447 self.verify_capture_in(capture, self.pg0)
3449 pkts = self.create_stream_in(self.pg0, self.pg1)
3450 self.pg0.add_stream(pkts)
3451 self.pg_enable_capture(self.pg_interfaces)
3453 capture = self.pg1.get_capture(len(pkts))
3454 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3457 def test_output_feature_and_service3(self):
3458 """ NAT44 interface output feature and DST NAT """
3459 external_addr = '1.2.3.4'
3463 self.vapi.nat44_forwarding_enable_disable(1)
3464 self.nat44_add_address(self.nat_addr)
3465 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3466 local_port, external_port,
3467 proto=IP_PROTOS.tcp, out2in_only=1)
3468 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3469 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3471 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3474 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3475 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3476 TCP(sport=12345, dport=external_port))
3477 self.pg0.add_stream(p)
3478 self.pg_enable_capture(self.pg_interfaces)
3480 capture = self.pg1.get_capture(1)
3485 self.assertEqual(ip.src, self.pg0.remote_ip4)
3486 self.assertEqual(tcp.sport, 12345)
3487 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3488 self.assertEqual(tcp.dport, local_port)
3489 self.check_tcp_checksum(p)
3490 self.check_ip_checksum(p)
3492 self.logger.error(ppp("Unexpected or invalid packet:", p))
3495 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3496 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3497 TCP(sport=local_port, dport=12345))
3498 self.pg1.add_stream(p)
3499 self.pg_enable_capture(self.pg_interfaces)
3501 capture = self.pg0.get_capture(1)
3506 self.assertEqual(ip.src, external_addr)
3507 self.assertEqual(tcp.sport, external_port)
3508 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3509 self.assertEqual(tcp.dport, 12345)
3510 self.check_tcp_checksum(p)
3511 self.check_ip_checksum(p)
3513 self.logger.error(ppp("Unexpected or invalid packet:", p))
3516 def test_one_armed_nat44(self):
3517 """ One armed NAT44 """
3518 remote_host = self.pg9.remote_hosts[0]
3519 local_host = self.pg9.remote_hosts[1]
3522 self.nat44_add_address(self.nat_addr)
3523 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3524 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3528 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3529 IP(src=local_host.ip4, dst=remote_host.ip4) /
3530 TCP(sport=12345, dport=80))
3531 self.pg9.add_stream(p)
3532 self.pg_enable_capture(self.pg_interfaces)
3534 capture = self.pg9.get_capture(1)
3539 self.assertEqual(ip.src, self.nat_addr)
3540 self.assertEqual(ip.dst, remote_host.ip4)
3541 self.assertNotEqual(tcp.sport, 12345)
3542 external_port = tcp.sport
3543 self.assertEqual(tcp.dport, 80)
3544 self.check_tcp_checksum(p)
3545 self.check_ip_checksum(p)
3547 self.logger.error(ppp("Unexpected or invalid packet:", p))
3551 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3552 IP(src=remote_host.ip4, dst=self.nat_addr) /
3553 TCP(sport=80, dport=external_port))
3554 self.pg9.add_stream(p)
3555 self.pg_enable_capture(self.pg_interfaces)
3557 capture = self.pg9.get_capture(1)
3562 self.assertEqual(ip.src, remote_host.ip4)
3563 self.assertEqual(ip.dst, local_host.ip4)
3564 self.assertEqual(tcp.sport, 80)
3565 self.assertEqual(tcp.dport, 12345)
3566 self.check_tcp_checksum(p)
3567 self.check_ip_checksum(p)
3569 self.logger.error(ppp("Unexpected or invalid packet:", p))
3572 def test_one_armed_nat44_static(self):
3573 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3574 remote_host = self.pg9.remote_hosts[0]
3575 local_host = self.pg9.remote_hosts[1]
3580 self.vapi.nat44_forwarding_enable_disable(1)
3581 self.nat44_add_address(self.nat_addr, twice_nat=1)
3582 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3583 local_port, external_port,
3584 proto=IP_PROTOS.tcp, out2in_only=1,
3586 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3587 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3590 # from client to service
3591 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3592 IP(src=remote_host.ip4, dst=self.nat_addr) /
3593 TCP(sport=12345, dport=external_port))
3594 self.pg9.add_stream(p)
3595 self.pg_enable_capture(self.pg_interfaces)
3597 capture = self.pg9.get_capture(1)
3603 self.assertEqual(ip.dst, local_host.ip4)
3604 self.assertEqual(ip.src, self.nat_addr)
3605 self.assertEqual(tcp.dport, local_port)
3606 self.assertNotEqual(tcp.sport, 12345)
3607 eh_port_in = tcp.sport
3608 self.check_tcp_checksum(p)
3609 self.check_ip_checksum(p)
3611 self.logger.error(ppp("Unexpected or invalid packet:", p))
3614 # from service back to client
3615 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3616 IP(src=local_host.ip4, dst=self.nat_addr) /
3617 TCP(sport=local_port, dport=eh_port_in))
3618 self.pg9.add_stream(p)
3619 self.pg_enable_capture(self.pg_interfaces)
3621 capture = self.pg9.get_capture(1)
3626 self.assertEqual(ip.src, self.nat_addr)
3627 self.assertEqual(ip.dst, remote_host.ip4)
3628 self.assertEqual(tcp.sport, external_port)
3629 self.assertEqual(tcp.dport, 12345)
3630 self.check_tcp_checksum(p)
3631 self.check_ip_checksum(p)
3633 self.logger.error(ppp("Unexpected or invalid packet:", p))
3636 def test_del_session(self):
3637 """ Delete NAT44 session """
3638 self.nat44_add_address(self.nat_addr)
3639 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3640 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3643 pkts = self.create_stream_in(self.pg0, self.pg1)
3644 self.pg0.add_stream(pkts)
3645 self.pg_enable_capture(self.pg_interfaces)
3647 capture = self.pg1.get_capture(len(pkts))
3649 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3650 nsessions = len(sessions)
3652 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3653 sessions[0].inside_port,
3654 sessions[0].protocol)
3655 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3656 sessions[1].outside_port,
3657 sessions[1].protocol,
3660 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3661 self.assertEqual(nsessions - len(sessions), 2)
3663 def test_set_get_reass(self):
3664 """ NAT44 set/get virtual fragmentation reassembly """
3665 reas_cfg1 = self.vapi.nat_get_reass()
3667 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3668 max_reass=reas_cfg1.ip4_max_reass * 2,
3669 max_frag=reas_cfg1.ip4_max_frag * 2)
3671 reas_cfg2 = self.vapi.nat_get_reass()
3673 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3674 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3675 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3677 self.vapi.nat_set_reass(drop_frag=1)
3678 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3680 def test_frag_in_order(self):
3681 """ NAT44 translate fragments arriving in order """
3682 self.nat44_add_address(self.nat_addr)
3683 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3684 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3687 data = "A" * 4 + "B" * 16 + "C" * 3
3688 self.tcp_port_in = random.randint(1025, 65535)
3690 reass = self.vapi.nat_reass_dump()
3691 reass_n_start = len(reass)
3694 pkts = self.create_stream_frag(self.pg0,
3695 self.pg1.remote_ip4,
3699 self.pg0.add_stream(pkts)
3700 self.pg_enable_capture(self.pg_interfaces)
3702 frags = self.pg1.get_capture(len(pkts))
3703 p = self.reass_frags_and_verify(frags,
3705 self.pg1.remote_ip4)
3706 self.assertEqual(p[TCP].dport, 20)
3707 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3708 self.tcp_port_out = p[TCP].sport
3709 self.assertEqual(data, p[Raw].load)
3712 pkts = self.create_stream_frag(self.pg1,
3717 self.pg1.add_stream(pkts)
3718 self.pg_enable_capture(self.pg_interfaces)
3720 frags = self.pg0.get_capture(len(pkts))
3721 p = self.reass_frags_and_verify(frags,
3722 self.pg1.remote_ip4,
3723 self.pg0.remote_ip4)
3724 self.assertEqual(p[TCP].sport, 20)
3725 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3726 self.assertEqual(data, p[Raw].load)
3728 reass = self.vapi.nat_reass_dump()
3729 reass_n_end = len(reass)
3731 self.assertEqual(reass_n_end - reass_n_start, 2)
3733 def test_reass_hairpinning(self):
3734 """ NAT44 fragments hairpinning """
3735 host = self.pg0.remote_hosts[0]
3736 server = self.pg0.remote_hosts[1]
3737 host_in_port = random.randint(1025, 65535)
3739 server_in_port = random.randint(1025, 65535)
3740 server_out_port = random.randint(1025, 65535)
3741 data = "A" * 4 + "B" * 16 + "C" * 3
3743 self.nat44_add_address(self.nat_addr)
3744 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3745 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3747 # add static mapping for server
3748 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3749 server_in_port, server_out_port,
3750 proto=IP_PROTOS.tcp)
3752 # send packet from host to server
3753 pkts = self.create_stream_frag(self.pg0,
3758 self.pg0.add_stream(pkts)
3759 self.pg_enable_capture(self.pg_interfaces)
3761 frags = self.pg0.get_capture(len(pkts))
3762 p = self.reass_frags_and_verify(frags,
3765 self.assertNotEqual(p[TCP].sport, host_in_port)
3766 self.assertEqual(p[TCP].dport, server_in_port)
3767 self.assertEqual(data, p[Raw].load)
3769 def test_frag_out_of_order(self):
3770 """ NAT44 translate fragments arriving out of order """
3771 self.nat44_add_address(self.nat_addr)
3772 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3773 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3776 data = "A" * 4 + "B" * 16 + "C" * 3
3777 random.randint(1025, 65535)
3780 pkts = self.create_stream_frag(self.pg0,
3781 self.pg1.remote_ip4,
3786 self.pg0.add_stream(pkts)
3787 self.pg_enable_capture(self.pg_interfaces)
3789 frags = self.pg1.get_capture(len(pkts))
3790 p = self.reass_frags_and_verify(frags,
3792 self.pg1.remote_ip4)
3793 self.assertEqual(p[TCP].dport, 20)
3794 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3795 self.tcp_port_out = p[TCP].sport
3796 self.assertEqual(data, p[Raw].load)
3799 pkts = self.create_stream_frag(self.pg1,
3805 self.pg1.add_stream(pkts)
3806 self.pg_enable_capture(self.pg_interfaces)
3808 frags = self.pg0.get_capture(len(pkts))
3809 p = self.reass_frags_and_verify(frags,
3810 self.pg1.remote_ip4,
3811 self.pg0.remote_ip4)
3812 self.assertEqual(p[TCP].sport, 20)
3813 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3814 self.assertEqual(data, p[Raw].load)
3816 def test_port_restricted(self):
3817 """ Port restricted NAT44 (MAP-E CE) """
3818 self.nat44_add_address(self.nat_addr)
3819 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3820 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3822 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3823 "psid-offset 6 psid-len 6")
3825 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3826 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3827 TCP(sport=4567, dport=22))
3828 self.pg0.add_stream(p)
3829 self.pg_enable_capture(self.pg_interfaces)
3831 capture = self.pg1.get_capture(1)
3836 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3837 self.assertEqual(ip.src, self.nat_addr)
3838 self.assertEqual(tcp.dport, 22)
3839 self.assertNotEqual(tcp.sport, 4567)
3840 self.assertEqual((tcp.sport >> 6) & 63, 10)
3841 self.check_tcp_checksum(p)
3842 self.check_ip_checksum(p)
3844 self.logger.error(ppp("Unexpected or invalid packet:", p))
3847 def test_twice_nat(self):
3849 twice_nat_addr = '10.0.1.3'
3854 self.nat44_add_address(self.nat_addr)
3855 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3856 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3857 port_in, port_out, proto=IP_PROTOS.tcp,
3859 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3860 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3863 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3864 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3865 TCP(sport=eh_port_out, dport=port_out))
3866 self.pg1.add_stream(p)
3867 self.pg_enable_capture(self.pg_interfaces)
3869 capture = self.pg0.get_capture(1)
3874 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3875 self.assertEqual(ip.src, twice_nat_addr)
3876 self.assertEqual(tcp.dport, port_in)
3877 self.assertNotEqual(tcp.sport, eh_port_out)
3878 eh_port_in = tcp.sport
3879 self.check_tcp_checksum(p)
3880 self.check_ip_checksum(p)
3882 self.logger.error(ppp("Unexpected or invalid packet:", p))
3885 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3886 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3887 TCP(sport=port_in, dport=eh_port_in))
3888 self.pg0.add_stream(p)
3889 self.pg_enable_capture(self.pg_interfaces)
3891 capture = self.pg1.get_capture(1)
3896 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3897 self.assertEqual(ip.src, self.nat_addr)
3898 self.assertEqual(tcp.dport, eh_port_out)
3899 self.assertEqual(tcp.sport, port_out)
3900 self.check_tcp_checksum(p)
3901 self.check_ip_checksum(p)
3903 self.logger.error(ppp("Unexpected or invalid packet:", p))
3906 def test_twice_nat_lb(self):
3907 """ Twice NAT44 local service load balancing """
3908 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3909 twice_nat_addr = '10.0.1.3'
3914 server1 = self.pg0.remote_hosts[0]
3915 server2 = self.pg0.remote_hosts[1]
3917 locals = [{'addr': server1.ip4n,
3920 {'addr': server2.ip4n,
3924 self.nat44_add_address(self.nat_addr)
3925 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3927 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3931 local_num=len(locals),
3933 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3934 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3937 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3938 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3939 TCP(sport=eh_port_out, dport=external_port))
3940 self.pg1.add_stream(p)
3941 self.pg_enable_capture(self.pg_interfaces)
3943 capture = self.pg0.get_capture(1)
3949 self.assertEqual(ip.src, twice_nat_addr)
3950 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3951 if ip.dst == server1.ip4:
3955 self.assertNotEqual(tcp.sport, eh_port_out)
3956 eh_port_in = tcp.sport
3957 self.assertEqual(tcp.dport, local_port)
3958 self.check_tcp_checksum(p)
3959 self.check_ip_checksum(p)
3961 self.logger.error(ppp("Unexpected or invalid packet:", p))
3964 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3965 IP(src=server.ip4, dst=twice_nat_addr) /
3966 TCP(sport=local_port, dport=eh_port_in))
3967 self.pg0.add_stream(p)
3968 self.pg_enable_capture(self.pg_interfaces)
3970 capture = self.pg1.get_capture(1)
3975 self.assertEqual(ip.src, self.nat_addr)
3976 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3977 self.assertEqual(tcp.sport, external_port)
3978 self.assertEqual(tcp.dport, eh_port_out)
3979 self.check_tcp_checksum(p)
3980 self.check_ip_checksum(p)
3982 self.logger.error(ppp("Unexpected or invalid packet:", p))
3985 def test_twice_nat_interface_addr(self):
3986 """ Acquire twice NAT44 addresses from interface """
3987 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3989 # no address in NAT pool
3990 adresses = self.vapi.nat44_address_dump()
3991 self.assertEqual(0, len(adresses))
3993 # configure interface address and check NAT address pool
3994 self.pg7.config_ip4()
3995 adresses = self.vapi.nat44_address_dump()
3996 self.assertEqual(1, len(adresses))
3997 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3998 self.assertEqual(adresses[0].twice_nat, 1)
4000 # remove interface address and check NAT address pool
4001 self.pg7.unconfig_ip4()
4002 adresses = self.vapi.nat44_address_dump()
4003 self.assertEqual(0, len(adresses))
4005 def test_ipfix_max_frags(self):
4006 """ IPFIX logging maximum fragments pending reassembly exceeded """
4007 self.nat44_add_address(self.nat_addr)
4008 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4009 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4011 self.vapi.nat_set_reass(max_frag=0)
4012 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4013 src_address=self.pg3.local_ip4n,
4015 template_interval=10)
4016 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4017 src_port=self.ipfix_src_port)
4019 data = "A" * 4 + "B" * 16 + "C" * 3
4020 self.tcp_port_in = random.randint(1025, 65535)
4021 pkts = self.create_stream_frag(self.pg0,
4022 self.pg1.remote_ip4,
4026 self.pg0.add_stream(pkts[-1])
4027 self.pg_enable_capture(self.pg_interfaces)
4029 frags = self.pg1.get_capture(0)
4030 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4031 capture = self.pg3.get_capture(9)
4032 ipfix = IPFIXDecoder()
4033 # first load template
4035 self.assertTrue(p.haslayer(IPFIX))
4036 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4037 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4038 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4039 self.assertEqual(p[UDP].dport, 4739)
4040 self.assertEqual(p[IPFIX].observationDomainID,
4041 self.ipfix_domain_id)
4042 if p.haslayer(Template):
4043 ipfix.add_template(p.getlayer(Template))
4044 # verify events in data set
4046 if p.haslayer(Data):
4047 data = ipfix.decode_data_set(p.getlayer(Set))
4048 self.verify_ipfix_max_fragments_ip4(data, 0,
4049 self.pg0.remote_ip4n)
4052 super(TestNAT44, self).tearDown()
4053 if not self.vpp_dead:
4054 self.logger.info(self.vapi.cli("show nat44 addresses"))
4055 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4056 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4057 self.logger.info(self.vapi.cli("show nat44 interface address"))
4058 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4059 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4060 self.vapi.cli("nat addr-port-assignment-alg default")
4064 class TestNAT44Out2InDPO(MethodHolder):
4065 """ NAT44 Test Cases using out2in DPO """
4068 def setUpConstants(cls):
4069 super(TestNAT44Out2InDPO, cls).setUpConstants()
4070 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4073 def setUpClass(cls):
4074 super(TestNAT44Out2InDPO, cls).setUpClass()
4077 cls.tcp_port_in = 6303
4078 cls.tcp_port_out = 6303
4079 cls.udp_port_in = 6304
4080 cls.udp_port_out = 6304
4081 cls.icmp_id_in = 6305
4082 cls.icmp_id_out = 6305
4083 cls.nat_addr = '10.0.0.3'
4084 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4085 cls.dst_ip4 = '192.168.70.1'
4087 cls.create_pg_interfaces(range(2))
4090 cls.pg0.config_ip4()
4091 cls.pg0.resolve_arp()
4094 cls.pg1.config_ip6()
4095 cls.pg1.resolve_ndp()
4097 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4098 dst_address_length=0,
4099 next_hop_address=cls.pg1.remote_ip6n,
4100 next_hop_sw_if_index=cls.pg1.sw_if_index)
4103 super(TestNAT44Out2InDPO, cls).tearDownClass()
4106 def configure_xlat(self):
4107 self.dst_ip6_pfx = '1:2:3::'
4108 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4110 self.dst_ip6_pfx_len = 96
4111 self.src_ip6_pfx = '4:5:6::'
4112 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4114 self.src_ip6_pfx_len = 96
4115 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4116 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4117 '\x00\x00\x00\x00', 0, is_translation=1,
4120 def test_464xlat_ce(self):
4121 """ Test 464XLAT CE with NAT44 """
4123 self.configure_xlat()
4125 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4126 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4128 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4129 self.dst_ip6_pfx_len)
4130 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4131 self.src_ip6_pfx_len)
4134 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4135 self.pg0.add_stream(pkts)
4136 self.pg_enable_capture(self.pg_interfaces)
4138 capture = self.pg1.get_capture(len(pkts))
4139 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4142 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4144 self.pg1.add_stream(pkts)
4145 self.pg_enable_capture(self.pg_interfaces)
4147 capture = self.pg0.get_capture(len(pkts))
4148 self.verify_capture_in(capture, self.pg0)
4150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4152 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4153 self.nat_addr_n, is_add=0)
4155 def test_464xlat_ce_no_nat(self):
4156 """ Test 464XLAT CE without NAT44 """
4158 self.configure_xlat()
4160 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4161 self.dst_ip6_pfx_len)
4162 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4163 self.src_ip6_pfx_len)
4165 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4166 self.pg0.add_stream(pkts)
4167 self.pg_enable_capture(self.pg_interfaces)
4169 capture = self.pg1.get_capture(len(pkts))
4170 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4171 nat_ip=out_dst_ip6, same_port=True)
4173 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4174 self.pg1.add_stream(pkts)
4175 self.pg_enable_capture(self.pg_interfaces)
4177 capture = self.pg0.get_capture(len(pkts))
4178 self.verify_capture_in(capture, self.pg0)
4181 class TestDeterministicNAT(MethodHolder):
4182 """ Deterministic NAT Test Cases """
4185 def setUpConstants(cls):
4186 super(TestDeterministicNAT, cls).setUpConstants()
4187 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4190 def setUpClass(cls):
4191 super(TestDeterministicNAT, cls).setUpClass()
4194 cls.tcp_port_in = 6303
4195 cls.tcp_external_port = 6303
4196 cls.udp_port_in = 6304
4197 cls.udp_external_port = 6304
4198 cls.icmp_id_in = 6305
4199 cls.nat_addr = '10.0.0.3'
4201 cls.create_pg_interfaces(range(3))
4202 cls.interfaces = list(cls.pg_interfaces)
4204 for i in cls.interfaces:
4209 cls.pg0.generate_remote_hosts(2)
4210 cls.pg0.configure_ipv4_neighbors()
4213 super(TestDeterministicNAT, cls).tearDownClass()
4216 def create_stream_in(self, in_if, out_if, ttl=64):
4218 Create packet stream for inside network
4220 :param in_if: Inside interface
4221 :param out_if: Outside interface
4222 :param ttl: TTL of generated packets
4226 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4227 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4228 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4232 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4233 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4234 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4238 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4239 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4240 ICMP(id=self.icmp_id_in, type='echo-request'))
4245 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4247 Create packet stream for outside network
4249 :param out_if: Outside interface
4250 :param dst_ip: Destination IP address (Default use global NAT address)
4251 :param ttl: TTL of generated packets
4254 dst_ip = self.nat_addr
4257 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4258 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4259 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4263 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4264 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4265 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4269 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4270 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4271 ICMP(id=self.icmp_external_id, type='echo-reply'))
4276 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4278 Verify captured packets on outside network
4280 :param capture: Captured packets
4281 :param nat_ip: Translated IP address (Default use global NAT address)
4282 :param same_port: Sorce port number is not translated (Default False)
4283 :param packet_num: Expected number of packets (Default 3)
4286 nat_ip = self.nat_addr
4287 self.assertEqual(packet_num, len(capture))
4288 for packet in capture:
4290 self.assertEqual(packet[IP].src, nat_ip)
4291 if packet.haslayer(TCP):
4292 self.tcp_port_out = packet[TCP].sport
4293 elif packet.haslayer(UDP):
4294 self.udp_port_out = packet[UDP].sport
4296 self.icmp_external_id = packet[ICMP].id
4298 self.logger.error(ppp("Unexpected or invalid packet "
4299 "(outside network):", packet))
4302 def initiate_tcp_session(self, in_if, out_if):
4304 Initiates TCP session
4306 :param in_if: Inside interface
4307 :param out_if: Outside interface
4310 # SYN packet in->out
4311 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4312 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4313 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4316 self.pg_enable_capture(self.pg_interfaces)
4318 capture = out_if.get_capture(1)
4320 self.tcp_port_out = p[TCP].sport
4322 # SYN + ACK packet out->in
4323 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4324 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4325 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4327 out_if.add_stream(p)
4328 self.pg_enable_capture(self.pg_interfaces)
4330 in_if.get_capture(1)
4332 # ACK packet in->out
4333 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4334 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4335 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4338 self.pg_enable_capture(self.pg_interfaces)
4340 out_if.get_capture(1)
4343 self.logger.error("TCP 3 way handshake failed")
4346 def verify_ipfix_max_entries_per_user(self, data):
4348 Verify IPFIX maximum entries per user exceeded event
4350 :param data: Decoded IPFIX data records
4352 self.assertEqual(1, len(data))
4355 self.assertEqual(ord(record[230]), 13)
4356 # natQuotaExceededEvent
4357 self.assertEqual('\x03\x00\x00\x00', record[466])
4359 self.assertEqual('\xe8\x03\x00\x00', record[473])
4361 self.assertEqual(self.pg0.remote_ip4n, record[8])
4363 def test_deterministic_mode(self):
4364 """ NAT plugin run deterministic mode """
4365 in_addr = '172.16.255.0'
4366 out_addr = '172.17.255.50'
4367 in_addr_t = '172.16.255.20'
4368 in_addr_n = socket.inet_aton(in_addr)
4369 out_addr_n = socket.inet_aton(out_addr)
4370 in_addr_t_n = socket.inet_aton(in_addr_t)
4374 nat_config = self.vapi.nat_show_config()
4375 self.assertEqual(1, nat_config.deterministic)
4377 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4379 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4380 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4381 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4382 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4384 deterministic_mappings = self.vapi.nat_det_map_dump()
4385 self.assertEqual(len(deterministic_mappings), 1)
4386 dsm = deterministic_mappings[0]
4387 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4388 self.assertEqual(in_plen, dsm.in_plen)
4389 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4390 self.assertEqual(out_plen, dsm.out_plen)
4392 self.clear_nat_det()
4393 deterministic_mappings = self.vapi.nat_det_map_dump()
4394 self.assertEqual(len(deterministic_mappings), 0)
4396 def test_set_timeouts(self):
4397 """ Set deterministic NAT timeouts """
4398 timeouts_before = self.vapi.nat_det_get_timeouts()
4400 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4401 timeouts_before.tcp_established + 10,
4402 timeouts_before.tcp_transitory + 10,
4403 timeouts_before.icmp + 10)
4405 timeouts_after = self.vapi.nat_det_get_timeouts()
4407 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4408 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4409 self.assertNotEqual(timeouts_before.tcp_established,
4410 timeouts_after.tcp_established)
4411 self.assertNotEqual(timeouts_before.tcp_transitory,
4412 timeouts_after.tcp_transitory)
4414 def test_det_in(self):
4415 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4417 nat_ip = "10.0.0.10"
4419 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4421 socket.inet_aton(nat_ip),
4423 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4424 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4428 pkts = self.create_stream_in(self.pg0, self.pg1)
4429 self.pg0.add_stream(pkts)
4430 self.pg_enable_capture(self.pg_interfaces)
4432 capture = self.pg1.get_capture(len(pkts))
4433 self.verify_capture_out(capture, nat_ip)
4436 pkts = self.create_stream_out(self.pg1, nat_ip)
4437 self.pg1.add_stream(pkts)
4438 self.pg_enable_capture(self.pg_interfaces)
4440 capture = self.pg0.get_capture(len(pkts))
4441 self.verify_capture_in(capture, self.pg0)
4444 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4445 self.assertEqual(len(sessions), 3)
4449 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4450 self.assertEqual(s.in_port, self.tcp_port_in)
4451 self.assertEqual(s.out_port, self.tcp_port_out)
4452 self.assertEqual(s.ext_port, self.tcp_external_port)
4456 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4457 self.assertEqual(s.in_port, self.udp_port_in)
4458 self.assertEqual(s.out_port, self.udp_port_out)
4459 self.assertEqual(s.ext_port, self.udp_external_port)
4463 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4464 self.assertEqual(s.in_port, self.icmp_id_in)
4465 self.assertEqual(s.out_port, self.icmp_external_id)
4467 def test_multiple_users(self):
4468 """ Deterministic NAT multiple users """
4470 nat_ip = "10.0.0.10"
4472 external_port = 6303
4474 host0 = self.pg0.remote_hosts[0]
4475 host1 = self.pg0.remote_hosts[1]
4477 self.vapi.nat_det_add_del_map(host0.ip4n,
4479 socket.inet_aton(nat_ip),
4481 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4482 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4486 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4487 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4488 TCP(sport=port_in, dport=external_port))
4489 self.pg0.add_stream(p)
4490 self.pg_enable_capture(self.pg_interfaces)
4492 capture = self.pg1.get_capture(1)
4497 self.assertEqual(ip.src, nat_ip)
4498 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4499 self.assertEqual(tcp.dport, external_port)
4500 port_out0 = tcp.sport
4502 self.logger.error(ppp("Unexpected or invalid packet:", p))
4506 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4507 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4508 TCP(sport=port_in, dport=external_port))
4509 self.pg0.add_stream(p)
4510 self.pg_enable_capture(self.pg_interfaces)
4512 capture = self.pg1.get_capture(1)
4517 self.assertEqual(ip.src, nat_ip)
4518 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4519 self.assertEqual(tcp.dport, external_port)
4520 port_out1 = tcp.sport
4522 self.logger.error(ppp("Unexpected or invalid packet:", p))
4525 dms = self.vapi.nat_det_map_dump()
4526 self.assertEqual(1, len(dms))
4527 self.assertEqual(2, dms[0].ses_num)
4530 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4531 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4532 TCP(sport=external_port, dport=port_out0))
4533 self.pg1.add_stream(p)
4534 self.pg_enable_capture(self.pg_interfaces)
4536 capture = self.pg0.get_capture(1)
4541 self.assertEqual(ip.src, self.pg1.remote_ip4)
4542 self.assertEqual(ip.dst, host0.ip4)
4543 self.assertEqual(tcp.dport, port_in)
4544 self.assertEqual(tcp.sport, external_port)
4546 self.logger.error(ppp("Unexpected or invalid packet:", p))
4550 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4551 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4552 TCP(sport=external_port, dport=port_out1))
4553 self.pg1.add_stream(p)
4554 self.pg_enable_capture(self.pg_interfaces)
4556 capture = self.pg0.get_capture(1)
4561 self.assertEqual(ip.src, self.pg1.remote_ip4)
4562 self.assertEqual(ip.dst, host1.ip4)
4563 self.assertEqual(tcp.dport, port_in)
4564 self.assertEqual(tcp.sport, external_port)
4566 self.logger.error(ppp("Unexpected or invalid packet", p))
4569 # session close api test
4570 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4572 self.pg1.remote_ip4n,
4574 dms = self.vapi.nat_det_map_dump()
4575 self.assertEqual(dms[0].ses_num, 1)
4577 self.vapi.nat_det_close_session_in(host0.ip4n,
4579 self.pg1.remote_ip4n,
4581 dms = self.vapi.nat_det_map_dump()
4582 self.assertEqual(dms[0].ses_num, 0)
4584 def test_tcp_session_close_detection_in(self):
4585 """ Deterministic NAT TCP session close from inside network """
4586 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4588 socket.inet_aton(self.nat_addr),
4590 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4591 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4594 self.initiate_tcp_session(self.pg0, self.pg1)
4596 # close the session from inside
4598 # FIN packet in -> out
4599 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4600 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4601 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4603 self.pg0.add_stream(p)
4604 self.pg_enable_capture(self.pg_interfaces)
4606 self.pg1.get_capture(1)
4610 # ACK packet out -> in
4611 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4612 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4613 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4617 # FIN packet out -> in
4618 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4619 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4620 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4624 self.pg1.add_stream(pkts)
4625 self.pg_enable_capture(self.pg_interfaces)
4627 self.pg0.get_capture(2)
4629 # ACK packet in -> out
4630 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4631 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4632 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4634 self.pg0.add_stream(p)
4635 self.pg_enable_capture(self.pg_interfaces)
4637 self.pg1.get_capture(1)
4639 # Check if deterministic NAT44 closed the session
4640 dms = self.vapi.nat_det_map_dump()
4641 self.assertEqual(0, dms[0].ses_num)
4643 self.logger.error("TCP session termination failed")
4646 def test_tcp_session_close_detection_out(self):
4647 """ Deterministic NAT TCP session close from outside network """
4648 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4650 socket.inet_aton(self.nat_addr),
4652 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4653 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4656 self.initiate_tcp_session(self.pg0, self.pg1)
4658 # close the session from outside
4660 # FIN packet out -> in
4661 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4662 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4663 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4665 self.pg1.add_stream(p)
4666 self.pg_enable_capture(self.pg_interfaces)
4668 self.pg0.get_capture(1)
4672 # ACK packet in -> out
4673 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4674 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4675 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4679 # ACK packet in -> out
4680 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4681 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4682 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4686 self.pg0.add_stream(pkts)
4687 self.pg_enable_capture(self.pg_interfaces)
4689 self.pg1.get_capture(2)
4691 # ACK packet out -> in
4692 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4693 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4694 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4696 self.pg1.add_stream(p)
4697 self.pg_enable_capture(self.pg_interfaces)
4699 self.pg0.get_capture(1)
4701 # Check if deterministic NAT44 closed the session
4702 dms = self.vapi.nat_det_map_dump()
4703 self.assertEqual(0, dms[0].ses_num)
4705 self.logger.error("TCP session termination failed")
4708 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4709 def test_session_timeout(self):
4710 """ Deterministic NAT session timeouts """
4711 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4713 socket.inet_aton(self.nat_addr),
4715 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4716 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4719 self.initiate_tcp_session(self.pg0, self.pg1)
4720 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4721 pkts = self.create_stream_in(self.pg0, self.pg1)
4722 self.pg0.add_stream(pkts)
4723 self.pg_enable_capture(self.pg_interfaces)
4725 capture = self.pg1.get_capture(len(pkts))
4728 dms = self.vapi.nat_det_map_dump()
4729 self.assertEqual(0, dms[0].ses_num)
4731 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4732 def test_session_limit_per_user(self):
4733 """ Deterministic NAT maximum sessions per user limit """
4734 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4736 socket.inet_aton(self.nat_addr),
4738 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4739 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4741 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4742 src_address=self.pg2.local_ip4n,
4744 template_interval=10)
4745 self.vapi.nat_ipfix()
4748 for port in range(1025, 2025):
4749 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4750 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4751 UDP(sport=port, dport=port))
4754 self.pg0.add_stream(pkts)
4755 self.pg_enable_capture(self.pg_interfaces)
4757 capture = self.pg1.get_capture(len(pkts))
4759 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4760 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4761 UDP(sport=3001, dport=3002))
4762 self.pg0.add_stream(p)
4763 self.pg_enable_capture(self.pg_interfaces)
4765 capture = self.pg1.assert_nothing_captured()
4767 # verify ICMP error packet
4768 capture = self.pg0.get_capture(1)
4770 self.assertTrue(p.haslayer(ICMP))
4772 self.assertEqual(icmp.type, 3)
4773 self.assertEqual(icmp.code, 1)
4774 self.assertTrue(icmp.haslayer(IPerror))
4775 inner_ip = icmp[IPerror]
4776 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4777 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4779 dms = self.vapi.nat_det_map_dump()
4781 self.assertEqual(1000, dms[0].ses_num)
4783 # verify IPFIX logging
4784 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4786 capture = self.pg2.get_capture(2)
4787 ipfix = IPFIXDecoder()
4788 # first load template
4790 self.assertTrue(p.haslayer(IPFIX))
4791 if p.haslayer(Template):
4792 ipfix.add_template(p.getlayer(Template))
4793 # verify events in data set
4795 if p.haslayer(Data):
4796 data = ipfix.decode_data_set(p.getlayer(Set))
4797 self.verify_ipfix_max_entries_per_user(data)
4799 def clear_nat_det(self):
4801 Clear deterministic NAT configuration.
4803 self.vapi.nat_ipfix(enable=0)
4804 self.vapi.nat_det_set_timeouts()
4805 deterministic_mappings = self.vapi.nat_det_map_dump()
4806 for dsm in deterministic_mappings:
4807 self.vapi.nat_det_add_del_map(dsm.in_addr,
4813 interfaces = self.vapi.nat44_interface_dump()
4814 for intf in interfaces:
4815 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4820 super(TestDeterministicNAT, self).tearDown()
4821 if not self.vpp_dead:
4822 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4824 self.vapi.cli("show nat44 deterministic mappings"))
4826 self.vapi.cli("show nat44 deterministic timeouts"))
4828 self.vapi.cli("show nat44 deterministic sessions"))
4829 self.clear_nat_det()
4832 class TestNAT64(MethodHolder):
4833 """ NAT64 Test Cases """
4836 def setUpConstants(cls):
4837 super(TestNAT64, cls).setUpConstants()
4838 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4839 "nat64 st hash buckets 256", "}"])
4842 def setUpClass(cls):
4843 super(TestNAT64, cls).setUpClass()
4846 cls.tcp_port_in = 6303
4847 cls.tcp_port_out = 6303
4848 cls.udp_port_in = 6304
4849 cls.udp_port_out = 6304
4850 cls.icmp_id_in = 6305
4851 cls.icmp_id_out = 6305
4852 cls.nat_addr = '10.0.0.3'
4853 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4855 cls.vrf1_nat_addr = '10.0.10.3'
4856 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4858 cls.ipfix_src_port = 4739
4859 cls.ipfix_domain_id = 1
4861 cls.create_pg_interfaces(range(5))
4862 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4863 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4864 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4866 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4868 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4870 cls.pg0.generate_remote_hosts(2)
4872 for i in cls.ip6_interfaces:
4875 i.configure_ipv6_neighbors()
4877 for i in cls.ip4_interfaces:
4883 cls.pg3.config_ip4()
4884 cls.pg3.resolve_arp()
4885 cls.pg3.config_ip6()
4886 cls.pg3.configure_ipv6_neighbors()
4889 super(TestNAT64, cls).tearDownClass()
4892 def test_pool(self):
4893 """ Add/delete address to NAT64 pool """
4894 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4896 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4898 addresses = self.vapi.nat64_pool_addr_dump()
4899 self.assertEqual(len(addresses), 1)
4900 self.assertEqual(addresses[0].address, nat_addr)
4902 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4904 addresses = self.vapi.nat64_pool_addr_dump()
4905 self.assertEqual(len(addresses), 0)
4907 def test_interface(self):
4908 """ Enable/disable NAT64 feature on the interface """
4909 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4910 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4912 interfaces = self.vapi.nat64_interface_dump()
4913 self.assertEqual(len(interfaces), 2)
4916 for intf in interfaces:
4917 if intf.sw_if_index == self.pg0.sw_if_index:
4918 self.assertEqual(intf.is_inside, 1)
4920 elif intf.sw_if_index == self.pg1.sw_if_index:
4921 self.assertEqual(intf.is_inside, 0)
4923 self.assertTrue(pg0_found)
4924 self.assertTrue(pg1_found)
4926 features = self.vapi.cli("show interface features pg0")
4927 self.assertNotEqual(features.find('nat64-in2out'), -1)
4928 features = self.vapi.cli("show interface features pg1")
4929 self.assertNotEqual(features.find('nat64-out2in'), -1)
4931 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4932 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4934 interfaces = self.vapi.nat64_interface_dump()
4935 self.assertEqual(len(interfaces), 0)
4937 def test_static_bib(self):
4938 """ Add/delete static BIB entry """
4939 in_addr = socket.inet_pton(socket.AF_INET6,
4940 '2001:db8:85a3::8a2e:370:7334')
4941 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4944 proto = IP_PROTOS.tcp
4946 self.vapi.nat64_add_del_static_bib(in_addr,
4951 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4956 self.assertEqual(bibe.i_addr, in_addr)
4957 self.assertEqual(bibe.o_addr, out_addr)
4958 self.assertEqual(bibe.i_port, in_port)
4959 self.assertEqual(bibe.o_port, out_port)
4960 self.assertEqual(static_bib_num, 1)
4962 self.vapi.nat64_add_del_static_bib(in_addr,
4968 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4973 self.assertEqual(static_bib_num, 0)
4975 def test_set_timeouts(self):
4976 """ Set NAT64 timeouts """
4977 # verify default values
4978 timeouts = self.vapi.nat64_get_timeouts()
4979 self.assertEqual(timeouts.udp, 300)
4980 self.assertEqual(timeouts.icmp, 60)
4981 self.assertEqual(timeouts.tcp_trans, 240)
4982 self.assertEqual(timeouts.tcp_est, 7440)
4983 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4985 # set and verify custom values
4986 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4987 tcp_est=7450, tcp_incoming_syn=10)
4988 timeouts = self.vapi.nat64_get_timeouts()
4989 self.assertEqual(timeouts.udp, 200)
4990 self.assertEqual(timeouts.icmp, 30)
4991 self.assertEqual(timeouts.tcp_trans, 250)
4992 self.assertEqual(timeouts.tcp_est, 7450)
4993 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4995 def test_dynamic(self):
4996 """ NAT64 dynamic translation test """
4997 self.tcp_port_in = 6303
4998 self.udp_port_in = 6304
4999 self.icmp_id_in = 6305
5001 ses_num_start = self.nat64_get_ses_num()
5003 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5005 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5006 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5009 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5010 self.pg0.add_stream(pkts)
5011 self.pg_enable_capture(self.pg_interfaces)
5013 capture = self.pg1.get_capture(len(pkts))
5014 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5015 dst_ip=self.pg1.remote_ip4)
5018 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5019 self.pg1.add_stream(pkts)
5020 self.pg_enable_capture(self.pg_interfaces)
5022 capture = self.pg0.get_capture(len(pkts))
5023 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5024 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5027 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5028 self.pg0.add_stream(pkts)
5029 self.pg_enable_capture(self.pg_interfaces)
5031 capture = self.pg1.get_capture(len(pkts))
5032 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5033 dst_ip=self.pg1.remote_ip4)
5036 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5037 self.pg1.add_stream(pkts)
5038 self.pg_enable_capture(self.pg_interfaces)
5040 capture = self.pg0.get_capture(len(pkts))
5041 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5043 ses_num_end = self.nat64_get_ses_num()
5045 self.assertEqual(ses_num_end - ses_num_start, 3)
5047 # tenant with specific VRF
5048 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5049 self.vrf1_nat_addr_n,
5050 vrf_id=self.vrf1_id)
5051 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5053 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5054 self.pg2.add_stream(pkts)
5055 self.pg_enable_capture(self.pg_interfaces)
5057 capture = self.pg1.get_capture(len(pkts))
5058 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5059 dst_ip=self.pg1.remote_ip4)
5061 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5062 self.pg1.add_stream(pkts)
5063 self.pg_enable_capture(self.pg_interfaces)
5065 capture = self.pg2.get_capture(len(pkts))
5066 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5068 def test_static(self):
5069 """ NAT64 static translation test """
5070 self.tcp_port_in = 60303
5071 self.udp_port_in = 60304
5072 self.icmp_id_in = 60305
5073 self.tcp_port_out = 60303
5074 self.udp_port_out = 60304
5075 self.icmp_id_out = 60305
5077 ses_num_start = self.nat64_get_ses_num()
5079 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5081 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5082 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5084 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5089 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5094 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5101 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5102 self.pg0.add_stream(pkts)
5103 self.pg_enable_capture(self.pg_interfaces)
5105 capture = self.pg1.get_capture(len(pkts))
5106 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5107 dst_ip=self.pg1.remote_ip4, same_port=True)
5110 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5111 self.pg1.add_stream(pkts)
5112 self.pg_enable_capture(self.pg_interfaces)
5114 capture = self.pg0.get_capture(len(pkts))
5115 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5116 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5118 ses_num_end = self.nat64_get_ses_num()
5120 self.assertEqual(ses_num_end - ses_num_start, 3)
5122 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5123 def test_session_timeout(self):
5124 """ NAT64 session timeout """
5125 self.icmp_id_in = 1234
5126 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5128 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5129 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5130 self.vapi.nat64_set_timeouts(icmp=5)
5132 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5133 self.pg0.add_stream(pkts)
5134 self.pg_enable_capture(self.pg_interfaces)
5136 capture = self.pg1.get_capture(len(pkts))
5138 ses_num_before_timeout = self.nat64_get_ses_num()
5142 # ICMP session after timeout
5143 ses_num_after_timeout = self.nat64_get_ses_num()
5144 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5146 def test_icmp_error(self):
5147 """ NAT64 ICMP Error message translation """
5148 self.tcp_port_in = 6303
5149 self.udp_port_in = 6304
5150 self.icmp_id_in = 6305
5152 ses_num_start = self.nat64_get_ses_num()
5154 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5156 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5157 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5159 # send some packets to create sessions
5160 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5161 self.pg0.add_stream(pkts)
5162 self.pg_enable_capture(self.pg_interfaces)
5164 capture_ip4 = self.pg1.get_capture(len(pkts))
5165 self.verify_capture_out(capture_ip4,
5166 nat_ip=self.nat_addr,
5167 dst_ip=self.pg1.remote_ip4)
5169 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5170 self.pg1.add_stream(pkts)
5171 self.pg_enable_capture(self.pg_interfaces)
5173 capture_ip6 = self.pg0.get_capture(len(pkts))
5174 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5175 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5176 self.pg0.remote_ip6)
5179 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5180 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5181 ICMPv6DestUnreach(code=1) /
5182 packet[IPv6] for packet in capture_ip6]
5183 self.pg0.add_stream(pkts)
5184 self.pg_enable_capture(self.pg_interfaces)
5186 capture = self.pg1.get_capture(len(pkts))
5187 for packet in capture:
5189 self.assertEqual(packet[IP].src, self.nat_addr)
5190 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5191 self.assertEqual(packet[ICMP].type, 3)
5192 self.assertEqual(packet[ICMP].code, 13)
5193 inner = packet[IPerror]
5194 self.assertEqual(inner.src, self.pg1.remote_ip4)
5195 self.assertEqual(inner.dst, self.nat_addr)
5196 self.check_icmp_checksum(packet)
5197 if inner.haslayer(TCPerror):
5198 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5199 elif inner.haslayer(UDPerror):
5200 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5202 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5204 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5208 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5209 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5210 ICMP(type=3, code=13) /
5211 packet[IP] for packet in capture_ip4]
5212 self.pg1.add_stream(pkts)
5213 self.pg_enable_capture(self.pg_interfaces)
5215 capture = self.pg0.get_capture(len(pkts))
5216 for packet in capture:
5218 self.assertEqual(packet[IPv6].src, ip.src)
5219 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5220 icmp = packet[ICMPv6DestUnreach]
5221 self.assertEqual(icmp.code, 1)
5222 inner = icmp[IPerror6]
5223 self.assertEqual(inner.src, self.pg0.remote_ip6)
5224 self.assertEqual(inner.dst, ip.src)
5225 self.check_icmpv6_checksum(packet)
5226 if inner.haslayer(TCPerror):
5227 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5228 elif inner.haslayer(UDPerror):
5229 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5231 self.assertEqual(inner[ICMPv6EchoRequest].id,
5234 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5237 def test_hairpinning(self):
5238 """ NAT64 hairpinning """
5240 client = self.pg0.remote_hosts[0]
5241 server = self.pg0.remote_hosts[1]
5242 server_tcp_in_port = 22
5243 server_tcp_out_port = 4022
5244 server_udp_in_port = 23
5245 server_udp_out_port = 4023
5246 client_tcp_in_port = 1234
5247 client_udp_in_port = 1235
5248 client_tcp_out_port = 0
5249 client_udp_out_port = 0
5250 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5251 nat_addr_ip6 = ip.src
5253 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5255 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5256 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5258 self.vapi.nat64_add_del_static_bib(server.ip6n,
5261 server_tcp_out_port,
5263 self.vapi.nat64_add_del_static_bib(server.ip6n,
5266 server_udp_out_port,
5271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5272 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5273 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5275 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5276 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5277 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5279 self.pg0.add_stream(pkts)
5280 self.pg_enable_capture(self.pg_interfaces)
5282 capture = self.pg0.get_capture(len(pkts))
5283 for packet in capture:
5285 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5286 self.assertEqual(packet[IPv6].dst, server.ip6)
5287 if packet.haslayer(TCP):
5288 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5289 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5290 self.check_tcp_checksum(packet)
5291 client_tcp_out_port = packet[TCP].sport
5293 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5294 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5295 self.check_udp_checksum(packet)
5296 client_udp_out_port = packet[UDP].sport
5298 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5304 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5305 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5308 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5309 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5311 self.pg0.add_stream(pkts)
5312 self.pg_enable_capture(self.pg_interfaces)
5314 capture = self.pg0.get_capture(len(pkts))
5315 for packet in capture:
5317 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5318 self.assertEqual(packet[IPv6].dst, client.ip6)
5319 if packet.haslayer(TCP):
5320 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5321 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5322 self.check_tcp_checksum(packet)
5324 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5325 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5326 self.check_udp_checksum(packet)
5328 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5333 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5334 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5335 ICMPv6DestUnreach(code=1) /
5336 packet[IPv6] for packet in capture]
5337 self.pg0.add_stream(pkts)
5338 self.pg_enable_capture(self.pg_interfaces)
5340 capture = self.pg0.get_capture(len(pkts))
5341 for packet in capture:
5343 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5344 self.assertEqual(packet[IPv6].dst, server.ip6)
5345 icmp = packet[ICMPv6DestUnreach]
5346 self.assertEqual(icmp.code, 1)
5347 inner = icmp[IPerror6]
5348 self.assertEqual(inner.src, server.ip6)
5349 self.assertEqual(inner.dst, nat_addr_ip6)
5350 self.check_icmpv6_checksum(packet)
5351 if inner.haslayer(TCPerror):
5352 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5353 self.assertEqual(inner[TCPerror].dport,
5354 client_tcp_out_port)
5356 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5357 self.assertEqual(inner[UDPerror].dport,
5358 client_udp_out_port)
5360 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5363 def test_prefix(self):
5364 """ NAT64 Network-Specific Prefix """
5366 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5368 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5369 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5370 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5371 self.vrf1_nat_addr_n,
5372 vrf_id=self.vrf1_id)
5373 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5376 global_pref64 = "2001:db8::"
5377 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5378 global_pref64_len = 32
5379 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5381 prefix = self.vapi.nat64_prefix_dump()
5382 self.assertEqual(len(prefix), 1)
5383 self.assertEqual(prefix[0].prefix, global_pref64_n)
5384 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5385 self.assertEqual(prefix[0].vrf_id, 0)
5387 # Add tenant specific prefix
5388 vrf1_pref64 = "2001:db8:122:300::"
5389 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5390 vrf1_pref64_len = 56
5391 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5393 vrf_id=self.vrf1_id)
5394 prefix = self.vapi.nat64_prefix_dump()
5395 self.assertEqual(len(prefix), 2)
5398 pkts = self.create_stream_in_ip6(self.pg0,
5401 plen=global_pref64_len)
5402 self.pg0.add_stream(pkts)
5403 self.pg_enable_capture(self.pg_interfaces)
5405 capture = self.pg1.get_capture(len(pkts))
5406 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5407 dst_ip=self.pg1.remote_ip4)
5409 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5410 self.pg1.add_stream(pkts)
5411 self.pg_enable_capture(self.pg_interfaces)
5413 capture = self.pg0.get_capture(len(pkts))
5414 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5417 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5419 # Tenant specific prefix
5420 pkts = self.create_stream_in_ip6(self.pg2,
5423 plen=vrf1_pref64_len)
5424 self.pg2.add_stream(pkts)
5425 self.pg_enable_capture(self.pg_interfaces)
5427 capture = self.pg1.get_capture(len(pkts))
5428 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5429 dst_ip=self.pg1.remote_ip4)
5431 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5432 self.pg1.add_stream(pkts)
5433 self.pg_enable_capture(self.pg_interfaces)
5435 capture = self.pg2.get_capture(len(pkts))
5436 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5439 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5441 def test_unknown_proto(self):
5442 """ NAT64 translate packet with unknown protocol """
5444 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5446 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5447 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5448 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5451 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5452 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5453 TCP(sport=self.tcp_port_in, dport=20))
5454 self.pg0.add_stream(p)
5455 self.pg_enable_capture(self.pg_interfaces)
5457 p = self.pg1.get_capture(1)
5459 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5460 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5462 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5463 TCP(sport=1234, dport=1234))
5464 self.pg0.add_stream(p)
5465 self.pg_enable_capture(self.pg_interfaces)
5467 p = self.pg1.get_capture(1)
5470 self.assertEqual(packet[IP].src, self.nat_addr)
5471 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5472 self.assertTrue(packet.haslayer(GRE))
5473 self.check_ip_checksum(packet)
5475 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5479 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5480 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5482 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5483 TCP(sport=1234, dport=1234))
5484 self.pg1.add_stream(p)
5485 self.pg_enable_capture(self.pg_interfaces)
5487 p = self.pg0.get_capture(1)
5490 self.assertEqual(packet[IPv6].src, remote_ip6)
5491 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5492 self.assertEqual(packet[IPv6].nh, 47)
5494 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5497 def test_hairpinning_unknown_proto(self):
5498 """ NAT64 translate packet with unknown protocol - hairpinning """
5500 client = self.pg0.remote_hosts[0]
5501 server = self.pg0.remote_hosts[1]
5502 server_tcp_in_port = 22
5503 server_tcp_out_port = 4022
5504 client_tcp_in_port = 1234
5505 client_tcp_out_port = 1235
5506 server_nat_ip = "10.0.0.100"
5507 client_nat_ip = "10.0.0.110"
5508 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5509 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5510 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5511 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5513 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5515 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5516 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5518 self.vapi.nat64_add_del_static_bib(server.ip6n,
5521 server_tcp_out_port,
5524 self.vapi.nat64_add_del_static_bib(server.ip6n,
5530 self.vapi.nat64_add_del_static_bib(client.ip6n,
5533 client_tcp_out_port,
5537 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5538 IPv6(src=client.ip6, dst=server_nat_ip6) /
5539 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5540 self.pg0.add_stream(p)
5541 self.pg_enable_capture(self.pg_interfaces)
5543 p = self.pg0.get_capture(1)
5545 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5546 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5548 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5549 TCP(sport=1234, dport=1234))
5550 self.pg0.add_stream(p)
5551 self.pg_enable_capture(self.pg_interfaces)
5553 p = self.pg0.get_capture(1)
5556 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5557 self.assertEqual(packet[IPv6].dst, server.ip6)
5558 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5560 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5564 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5565 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5567 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5568 TCP(sport=1234, dport=1234))
5569 self.pg0.add_stream(p)
5570 self.pg_enable_capture(self.pg_interfaces)
5572 p = self.pg0.get_capture(1)
5575 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5576 self.assertEqual(packet[IPv6].dst, client.ip6)
5577 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5579 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5582 def test_one_armed_nat64(self):
5583 """ One armed NAT64 """
5585 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5589 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5591 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5592 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5595 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5596 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5597 TCP(sport=12345, dport=80))
5598 self.pg3.add_stream(p)
5599 self.pg_enable_capture(self.pg_interfaces)
5601 capture = self.pg3.get_capture(1)
5606 self.assertEqual(ip.src, self.nat_addr)
5607 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5608 self.assertNotEqual(tcp.sport, 12345)
5609 external_port = tcp.sport
5610 self.assertEqual(tcp.dport, 80)
5611 self.check_tcp_checksum(p)
5612 self.check_ip_checksum(p)
5614 self.logger.error(ppp("Unexpected or invalid packet:", p))
5618 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5619 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5620 TCP(sport=80, dport=external_port))
5621 self.pg3.add_stream(p)
5622 self.pg_enable_capture(self.pg_interfaces)
5624 capture = self.pg3.get_capture(1)
5629 self.assertEqual(ip.src, remote_host_ip6)
5630 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5631 self.assertEqual(tcp.sport, 80)
5632 self.assertEqual(tcp.dport, 12345)
5633 self.check_tcp_checksum(p)
5635 self.logger.error(ppp("Unexpected or invalid packet:", p))
5638 def test_frag_in_order(self):
5639 """ NAT64 translate fragments arriving in order """
5640 self.tcp_port_in = random.randint(1025, 65535)
5642 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5644 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5645 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5647 reass = self.vapi.nat_reass_dump()
5648 reass_n_start = len(reass)
5652 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5653 self.tcp_port_in, 20, data)
5654 self.pg0.add_stream(pkts)
5655 self.pg_enable_capture(self.pg_interfaces)
5657 frags = self.pg1.get_capture(len(pkts))
5658 p = self.reass_frags_and_verify(frags,
5660 self.pg1.remote_ip4)
5661 self.assertEqual(p[TCP].dport, 20)
5662 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5663 self.tcp_port_out = p[TCP].sport
5664 self.assertEqual(data, p[Raw].load)
5667 data = "A" * 4 + "b" * 16 + "C" * 3
5668 pkts = self.create_stream_frag(self.pg1,
5673 self.pg1.add_stream(pkts)
5674 self.pg_enable_capture(self.pg_interfaces)
5676 frags = self.pg0.get_capture(len(pkts))
5677 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5678 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5679 self.assertEqual(p[TCP].sport, 20)
5680 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5681 self.assertEqual(data, p[Raw].load)
5683 reass = self.vapi.nat_reass_dump()
5684 reass_n_end = len(reass)
5686 self.assertEqual(reass_n_end - reass_n_start, 2)
5688 def test_reass_hairpinning(self):
5689 """ NAT64 fragments hairpinning """
5691 client = self.pg0.remote_hosts[0]
5692 server = self.pg0.remote_hosts[1]
5693 server_in_port = random.randint(1025, 65535)
5694 server_out_port = random.randint(1025, 65535)
5695 client_in_port = random.randint(1025, 65535)
5696 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5697 nat_addr_ip6 = ip.src
5699 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5701 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5702 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5704 # add static BIB entry for server
5705 self.vapi.nat64_add_del_static_bib(server.ip6n,
5711 # send packet from host to server
5712 pkts = self.create_stream_frag_ip6(self.pg0,
5717 self.pg0.add_stream(pkts)
5718 self.pg_enable_capture(self.pg_interfaces)
5720 frags = self.pg0.get_capture(len(pkts))
5721 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5722 self.assertNotEqual(p[TCP].sport, client_in_port)
5723 self.assertEqual(p[TCP].dport, server_in_port)
5724 self.assertEqual(data, p[Raw].load)
5726 def test_frag_out_of_order(self):
5727 """ NAT64 translate fragments arriving out of order """
5728 self.tcp_port_in = random.randint(1025, 65535)
5730 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5732 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5733 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5737 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5738 self.tcp_port_in, 20, data)
5740 self.pg0.add_stream(pkts)
5741 self.pg_enable_capture(self.pg_interfaces)
5743 frags = self.pg1.get_capture(len(pkts))
5744 p = self.reass_frags_and_verify(frags,
5746 self.pg1.remote_ip4)
5747 self.assertEqual(p[TCP].dport, 20)
5748 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5749 self.tcp_port_out = p[TCP].sport
5750 self.assertEqual(data, p[Raw].load)
5753 data = "A" * 4 + "B" * 16 + "C" * 3
5754 pkts = self.create_stream_frag(self.pg1,
5760 self.pg1.add_stream(pkts)
5761 self.pg_enable_capture(self.pg_interfaces)
5763 frags = self.pg0.get_capture(len(pkts))
5764 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5765 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5766 self.assertEqual(p[TCP].sport, 20)
5767 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5768 self.assertEqual(data, p[Raw].load)
5770 def test_interface_addr(self):
5771 """ Acquire NAT64 pool addresses from interface """
5772 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5774 # no address in NAT64 pool
5775 adresses = self.vapi.nat44_address_dump()
5776 self.assertEqual(0, len(adresses))
5778 # configure interface address and check NAT64 address pool
5779 self.pg4.config_ip4()
5780 addresses = self.vapi.nat64_pool_addr_dump()
5781 self.assertEqual(len(addresses), 1)
5782 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5784 # remove interface address and check NAT64 address pool
5785 self.pg4.unconfig_ip4()
5786 addresses = self.vapi.nat64_pool_addr_dump()
5787 self.assertEqual(0, len(adresses))
5789 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5790 def test_ipfix_max_bibs_sessions(self):
5791 """ IPFIX logging maximum session and BIB entries exceeded """
5794 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5798 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5800 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5801 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5805 for i in range(0, max_bibs):
5806 src = "fd01:aa::%x" % (i)
5807 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5808 IPv6(src=src, dst=remote_host_ip6) /
5809 TCP(sport=12345, dport=80))
5811 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5812 IPv6(src=src, dst=remote_host_ip6) /
5813 TCP(sport=12345, dport=22))
5815 self.pg0.add_stream(pkts)
5816 self.pg_enable_capture(self.pg_interfaces)
5818 self.pg1.get_capture(max_sessions)
5820 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5821 src_address=self.pg3.local_ip4n,
5823 template_interval=10)
5824 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5825 src_port=self.ipfix_src_port)
5827 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5828 IPv6(src=src, dst=remote_host_ip6) /
5829 TCP(sport=12345, dport=25))
5830 self.pg0.add_stream(p)
5831 self.pg_enable_capture(self.pg_interfaces)
5833 self.pg1.get_capture(0)
5834 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5835 capture = self.pg3.get_capture(9)
5836 ipfix = IPFIXDecoder()
5837 # first load template
5839 self.assertTrue(p.haslayer(IPFIX))
5840 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5841 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5842 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5843 self.assertEqual(p[UDP].dport, 4739)
5844 self.assertEqual(p[IPFIX].observationDomainID,
5845 self.ipfix_domain_id)
5846 if p.haslayer(Template):
5847 ipfix.add_template(p.getlayer(Template))
5848 # verify events in data set
5850 if p.haslayer(Data):
5851 data = ipfix.decode_data_set(p.getlayer(Set))
5852 self.verify_ipfix_max_sessions(data, max_sessions)
5854 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5855 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5856 TCP(sport=12345, dport=80))
5857 self.pg0.add_stream(p)
5858 self.pg_enable_capture(self.pg_interfaces)
5860 self.pg1.get_capture(0)
5861 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5862 capture = self.pg3.get_capture(1)
5863 # verify events in data set
5865 self.assertTrue(p.haslayer(IPFIX))
5866 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5867 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5868 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5869 self.assertEqual(p[UDP].dport, 4739)
5870 self.assertEqual(p[IPFIX].observationDomainID,
5871 self.ipfix_domain_id)
5872 if p.haslayer(Data):
5873 data = ipfix.decode_data_set(p.getlayer(Set))
5874 self.verify_ipfix_max_bibs(data, max_bibs)
5876 def test_ipfix_max_frags(self):
5877 """ IPFIX logging maximum fragments pending reassembly exceeded """
5878 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5880 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5881 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5882 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5883 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5884 src_address=self.pg3.local_ip4n,
5886 template_interval=10)
5887 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5888 src_port=self.ipfix_src_port)
5891 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5892 self.tcp_port_in, 20, data)
5893 self.pg0.add_stream(pkts[-1])
5894 self.pg_enable_capture(self.pg_interfaces)
5896 self.pg1.get_capture(0)
5897 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5898 capture = self.pg3.get_capture(9)
5899 ipfix = IPFIXDecoder()
5900 # first load template
5902 self.assertTrue(p.haslayer(IPFIX))
5903 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5904 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5905 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5906 self.assertEqual(p[UDP].dport, 4739)
5907 self.assertEqual(p[IPFIX].observationDomainID,
5908 self.ipfix_domain_id)
5909 if p.haslayer(Template):
5910 ipfix.add_template(p.getlayer(Template))
5911 # verify events in data set
5913 if p.haslayer(Data):
5914 data = ipfix.decode_data_set(p.getlayer(Set))
5915 self.verify_ipfix_max_fragments_ip6(data, 0,
5916 self.pg0.remote_ip6n)
5918 def test_ipfix_bib_ses(self):
5919 """ IPFIX logging NAT64 BIB/session create and delete events """
5920 self.tcp_port_in = random.randint(1025, 65535)
5921 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5925 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5927 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5928 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5929 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5930 src_address=self.pg3.local_ip4n,
5932 template_interval=10)
5933 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5934 src_port=self.ipfix_src_port)
5937 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5938 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5939 TCP(sport=self.tcp_port_in, dport=25))
5940 self.pg0.add_stream(p)
5941 self.pg_enable_capture(self.pg_interfaces)
5943 p = self.pg1.get_capture(1)
5944 self.tcp_port_out = p[0][TCP].sport
5945 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5946 capture = self.pg3.get_capture(10)
5947 ipfix = IPFIXDecoder()
5948 # first load template
5950 self.assertTrue(p.haslayer(IPFIX))
5951 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5952 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5953 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5954 self.assertEqual(p[UDP].dport, 4739)
5955 self.assertEqual(p[IPFIX].observationDomainID,
5956 self.ipfix_domain_id)
5957 if p.haslayer(Template):
5958 ipfix.add_template(p.getlayer(Template))
5959 # verify events in data set
5961 if p.haslayer(Data):
5962 data = ipfix.decode_data_set(p.getlayer(Set))
5963 if ord(data[0][230]) == 10:
5964 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5965 elif ord(data[0][230]) == 6:
5966 self.verify_ipfix_nat64_ses(data,
5968 self.pg0.remote_ip6n,
5969 self.pg1.remote_ip4,
5972 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5975 self.pg_enable_capture(self.pg_interfaces)
5976 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5979 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5980 capture = self.pg3.get_capture(2)
5981 # verify events in data set
5983 self.assertTrue(p.haslayer(IPFIX))
5984 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5985 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5986 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5987 self.assertEqual(p[UDP].dport, 4739)
5988 self.assertEqual(p[IPFIX].observationDomainID,
5989 self.ipfix_domain_id)
5990 if p.haslayer(Data):
5991 data = ipfix.decode_data_set(p.getlayer(Set))
5992 if ord(data[0][230]) == 11:
5993 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5994 elif ord(data[0][230]) == 7:
5995 self.verify_ipfix_nat64_ses(data,
5997 self.pg0.remote_ip6n,
5998 self.pg1.remote_ip4,
6001 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6003 def nat64_get_ses_num(self):
6005 Return number of active NAT64 sessions.
6007 st = self.vapi.nat64_st_dump()
6010 def clear_nat64(self):
6012 Clear NAT64 configuration.
6014 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6015 domain_id=self.ipfix_domain_id)
6016 self.ipfix_src_port = 4739
6017 self.ipfix_domain_id = 1
6019 self.vapi.nat64_set_timeouts()
6021 interfaces = self.vapi.nat64_interface_dump()
6022 for intf in interfaces:
6023 if intf.is_inside > 1:
6024 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6027 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6031 bib = self.vapi.nat64_bib_dump(255)
6034 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6042 adresses = self.vapi.nat64_pool_addr_dump()
6043 for addr in adresses:
6044 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6049 prefixes = self.vapi.nat64_prefix_dump()
6050 for prefix in prefixes:
6051 self.vapi.nat64_add_del_prefix(prefix.prefix,
6053 vrf_id=prefix.vrf_id,
6057 super(TestNAT64, self).tearDown()
6058 if not self.vpp_dead:
6059 self.logger.info(self.vapi.cli("show nat64 pool"))
6060 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6061 self.logger.info(self.vapi.cli("show nat64 prefix"))
6062 self.logger.info(self.vapi.cli("show nat64 bib all"))
6063 self.logger.info(self.vapi.cli("show nat64 session table all"))
6064 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6068 class TestDSlite(MethodHolder):
6069 """ DS-Lite Test Cases """
6072 def setUpClass(cls):
6073 super(TestDSlite, cls).setUpClass()
6076 cls.nat_addr = '10.0.0.3'
6077 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6079 cls.create_pg_interfaces(range(2))
6081 cls.pg0.config_ip4()
6082 cls.pg0.resolve_arp()
6084 cls.pg1.config_ip6()
6085 cls.pg1.generate_remote_hosts(2)
6086 cls.pg1.configure_ipv6_neighbors()
6089 super(TestDSlite, cls).tearDownClass()
6092 def test_dslite(self):
6093 """ Test DS-Lite """
6094 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6096 aftr_ip4 = '192.0.0.1'
6097 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6098 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6099 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6100 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6103 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6104 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6105 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6106 UDP(sport=20000, dport=10000))
6107 self.pg1.add_stream(p)
6108 self.pg_enable_capture(self.pg_interfaces)
6110 capture = self.pg0.get_capture(1)
6111 capture = capture[0]
6112 self.assertFalse(capture.haslayer(IPv6))
6113 self.assertEqual(capture[IP].src, self.nat_addr)
6114 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6115 self.assertNotEqual(capture[UDP].sport, 20000)
6116 self.assertEqual(capture[UDP].dport, 10000)
6117 self.check_ip_checksum(capture)
6118 out_port = capture[UDP].sport
6120 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6121 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6122 UDP(sport=10000, dport=out_port))
6123 self.pg0.add_stream(p)
6124 self.pg_enable_capture(self.pg_interfaces)
6126 capture = self.pg1.get_capture(1)
6127 capture = capture[0]
6128 self.assertEqual(capture[IPv6].src, aftr_ip6)
6129 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6130 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6131 self.assertEqual(capture[IP].dst, '192.168.1.1')
6132 self.assertEqual(capture[UDP].sport, 10000)
6133 self.assertEqual(capture[UDP].dport, 20000)
6134 self.check_ip_checksum(capture)
6137 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6138 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6139 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6140 TCP(sport=20001, dport=10001))
6141 self.pg1.add_stream(p)
6142 self.pg_enable_capture(self.pg_interfaces)
6144 capture = self.pg0.get_capture(1)
6145 capture = capture[0]
6146 self.assertFalse(capture.haslayer(IPv6))
6147 self.assertEqual(capture[IP].src, self.nat_addr)
6148 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6149 self.assertNotEqual(capture[TCP].sport, 20001)
6150 self.assertEqual(capture[TCP].dport, 10001)
6151 self.check_ip_checksum(capture)
6152 self.check_tcp_checksum(capture)
6153 out_port = capture[TCP].sport
6155 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6156 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6157 TCP(sport=10001, dport=out_port))
6158 self.pg0.add_stream(p)
6159 self.pg_enable_capture(self.pg_interfaces)
6161 capture = self.pg1.get_capture(1)
6162 capture = capture[0]
6163 self.assertEqual(capture[IPv6].src, aftr_ip6)
6164 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6165 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6166 self.assertEqual(capture[IP].dst, '192.168.1.1')
6167 self.assertEqual(capture[TCP].sport, 10001)
6168 self.assertEqual(capture[TCP].dport, 20001)
6169 self.check_ip_checksum(capture)
6170 self.check_tcp_checksum(capture)
6173 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6174 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6175 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6176 ICMP(id=4000, type='echo-request'))
6177 self.pg1.add_stream(p)
6178 self.pg_enable_capture(self.pg_interfaces)
6180 capture = self.pg0.get_capture(1)
6181 capture = capture[0]
6182 self.assertFalse(capture.haslayer(IPv6))
6183 self.assertEqual(capture[IP].src, self.nat_addr)
6184 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6185 self.assertNotEqual(capture[ICMP].id, 4000)
6186 self.check_ip_checksum(capture)
6187 self.check_icmp_checksum(capture)
6188 out_id = capture[ICMP].id
6190 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6191 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6192 ICMP(id=out_id, type='echo-reply'))
6193 self.pg0.add_stream(p)
6194 self.pg_enable_capture(self.pg_interfaces)
6196 capture = self.pg1.get_capture(1)
6197 capture = capture[0]
6198 self.assertEqual(capture[IPv6].src, aftr_ip6)
6199 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6200 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6201 self.assertEqual(capture[IP].dst, '192.168.1.1')
6202 self.assertEqual(capture[ICMP].id, 4000)
6203 self.check_ip_checksum(capture)
6204 self.check_icmp_checksum(capture)
6206 # ping DS-Lite AFTR tunnel endpoint address
6207 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6208 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6209 ICMPv6EchoRequest())
6210 self.pg1.add_stream(p)
6211 self.pg_enable_capture(self.pg_interfaces)
6213 capture = self.pg1.get_capture(1)
6214 self.assertEqual(1, len(capture))
6215 capture = capture[0]
6216 self.assertEqual(capture[IPv6].src, aftr_ip6)
6217 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6218 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6221 super(TestDSlite, self).tearDown()
6222 if not self.vpp_dead:
6223 self.logger.info(self.vapi.cli("show dslite pool"))
6225 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6226 self.logger.info(self.vapi.cli("show dslite sessions"))
6229 class TestDSliteCE(MethodHolder):
6230 """ DS-Lite CE Test Cases """
6233 def setUpConstants(cls):
6234 super(TestDSliteCE, cls).setUpConstants()
6235 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6238 def setUpClass(cls):
6239 super(TestDSliteCE, cls).setUpClass()
6242 cls.create_pg_interfaces(range(2))
6244 cls.pg0.config_ip4()
6245 cls.pg0.resolve_arp()
6247 cls.pg1.config_ip6()
6248 cls.pg1.generate_remote_hosts(1)
6249 cls.pg1.configure_ipv6_neighbors()
6252 super(TestDSliteCE, cls).tearDownClass()
6255 def test_dslite_ce(self):
6256 """ Test DS-Lite CE """
6258 b4_ip4 = '192.0.0.2'
6259 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6260 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6261 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6262 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6264 aftr_ip4 = '192.0.0.1'
6265 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6266 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6267 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6268 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6270 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6271 dst_address_length=128,
6272 next_hop_address=self.pg1.remote_ip6n,
6273 next_hop_sw_if_index=self.pg1.sw_if_index,
6277 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6278 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6279 UDP(sport=10000, dport=20000))
6280 self.pg0.add_stream(p)
6281 self.pg_enable_capture(self.pg_interfaces)
6283 capture = self.pg1.get_capture(1)
6284 capture = capture[0]
6285 self.assertEqual(capture[IPv6].src, b4_ip6)
6286 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6287 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6288 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6289 self.assertEqual(capture[UDP].sport, 10000)
6290 self.assertEqual(capture[UDP].dport, 20000)
6291 self.check_ip_checksum(capture)
6294 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6295 IPv6(dst=b4_ip6, src=aftr_ip6) /
6296 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6297 UDP(sport=20000, dport=10000))
6298 self.pg1.add_stream(p)
6299 self.pg_enable_capture(self.pg_interfaces)
6301 capture = self.pg0.get_capture(1)
6302 capture = capture[0]
6303 self.assertFalse(capture.haslayer(IPv6))
6304 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6305 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6306 self.assertEqual(capture[UDP].sport, 20000)
6307 self.assertEqual(capture[UDP].dport, 10000)
6308 self.check_ip_checksum(capture)
6310 # ping DS-Lite B4 tunnel endpoint address
6311 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6312 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6313 ICMPv6EchoRequest())
6314 self.pg1.add_stream(p)
6315 self.pg_enable_capture(self.pg_interfaces)
6317 capture = self.pg1.get_capture(1)
6318 self.assertEqual(1, len(capture))
6319 capture = capture[0]
6320 self.assertEqual(capture[IPv6].src, b4_ip6)
6321 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6322 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6325 super(TestDSliteCE, self).tearDown()
6326 if not self.vpp_dead:
6328 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6330 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6333 class TestNAT66(MethodHolder):
6334 """ NAT66 Test Cases """
6337 def setUpClass(cls):
6338 super(TestNAT66, cls).setUpClass()
6341 cls.nat_addr = 'fd01:ff::2'
6342 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6344 cls.create_pg_interfaces(range(2))
6345 cls.interfaces = list(cls.pg_interfaces)
6347 for i in cls.interfaces:
6350 i.configure_ipv6_neighbors()
6353 super(TestNAT66, cls).tearDownClass()
6356 def test_static(self):
6357 """ 1:1 NAT66 test """
6358 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6359 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6360 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6365 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6366 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6369 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6370 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6373 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6374 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6375 ICMPv6EchoRequest())
6377 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6378 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6379 GRE() / IP() / TCP())
6381 self.pg0.add_stream(pkts)
6382 self.pg_enable_capture(self.pg_interfaces)
6384 capture = self.pg1.get_capture(len(pkts))
6385 for packet in capture:
6387 self.assertEqual(packet[IPv6].src, self.nat_addr)
6388 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6389 if packet.haslayer(TCP):
6390 self.check_tcp_checksum(packet)
6391 elif packet.haslayer(UDP):
6392 self.check_udp_checksum(packet)
6393 elif packet.haslayer(ICMPv6EchoRequest):
6394 self.check_icmpv6_checksum(packet)
6396 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6401 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6402 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6405 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6406 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6409 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6410 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6413 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6414 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6415 GRE() / IP() / TCP())
6417 self.pg1.add_stream(pkts)
6418 self.pg_enable_capture(self.pg_interfaces)
6420 capture = self.pg0.get_capture(len(pkts))
6421 for packet in capture:
6423 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6424 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6425 if packet.haslayer(TCP):
6426 self.check_tcp_checksum(packet)
6427 elif packet.haslayer(UDP):
6428 self.check_udp_checksum(packet)
6429 elif packet.haslayer(ICMPv6EchoReply):
6430 self.check_icmpv6_checksum(packet)
6432 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6435 sm = self.vapi.nat66_static_mapping_dump()
6436 self.assertEqual(len(sm), 1)
6437 self.assertEqual(sm[0].total_pkts, 8)
6439 def clear_nat66(self):
6441 Clear NAT66 configuration.
6443 interfaces = self.vapi.nat66_interface_dump()
6444 for intf in interfaces:
6445 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6449 static_mappings = self.vapi.nat66_static_mapping_dump()
6450 for sm in static_mappings:
6451 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6452 sm.external_ip_address,
6457 super(TestNAT66, self).tearDown()
6458 if not self.vpp_dead:
6459 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6460 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6463 if __name__ == '__main__':
6464 unittest.main(testRunner=VppTestRunner)