9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
11 from scapy.layers.inet import IP, TCP, UDP, ICMP
12 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
13 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
14 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
15 from scapy.layers.l2 import Ether, ARP, GRE
16 from scapy.data import IP_PROTOS
17 from scapy.packet import bind_layers, Raw
18 from scapy.all import fragment6
20 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
21 from time import sleep
22 from util import ip4_range
23 from util import mactobinary
26 class MethodHolder(VppTestCase):
27 """ NAT create capture and verify method holder """
31 super(MethodHolder, cls).setUpClass()
34 super(MethodHolder, self).tearDown()
36 def check_ip_checksum(self, pkt):
38 Check IP checksum of the packet
40 :param pkt: Packet to check IP checksum
42 new = pkt.__class__(str(pkt))
44 new = new.__class__(str(new))
45 self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
47 def check_tcp_checksum(self, pkt):
49 Check TCP checksum in IP packet
51 :param pkt: Packet to check TCP checksum
53 new = pkt.__class__(str(pkt))
55 new = new.__class__(str(new))
56 self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
58 def check_udp_checksum(self, pkt):
60 Check UDP checksum in IP packet
62 :param pkt: Packet to check UDP checksum
64 new = pkt.__class__(str(pkt))
66 new = new.__class__(str(new))
67 self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
69 def check_icmp_errror_embedded(self, pkt):
71 Check ICMP error embeded packet checksum
73 :param pkt: Packet to check ICMP error embeded packet checksum
75 if pkt.haslayer(IPerror):
76 new = pkt.__class__(str(pkt))
77 del new['IPerror'].chksum
78 new = new.__class__(str(new))
79 self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
81 if pkt.haslayer(TCPerror):
82 new = pkt.__class__(str(pkt))
83 del new['TCPerror'].chksum
84 new = new.__class__(str(new))
85 self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
87 if pkt.haslayer(UDPerror):
88 if pkt['UDPerror'].chksum != 0:
89 new = pkt.__class__(str(pkt))
90 del new['UDPerror'].chksum
91 new = new.__class__(str(new))
92 self.assertEqual(new['UDPerror'].chksum,
93 pkt['UDPerror'].chksum)
95 if pkt.haslayer(ICMPerror):
96 del new['ICMPerror'].chksum
97 new = new.__class__(str(new))
98 self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
100 def check_icmp_checksum(self, pkt):
102 Check ICMP checksum in IPv4 packet
104 :param pkt: Packet to check ICMP checksum
106 new = pkt.__class__(str(pkt))
107 del new['ICMP'].chksum
108 new = new.__class__(str(new))
109 self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
110 if pkt.haslayer(IPerror):
111 self.check_icmp_errror_embedded(pkt)
113 def check_icmpv6_checksum(self, pkt):
115 Check ICMPv6 checksum in IPv4 packet
117 :param pkt: Packet to check ICMPv6 checksum
119 new = pkt.__class__(str(pkt))
120 if pkt.haslayer(ICMPv6DestUnreach):
121 del new['ICMPv6DestUnreach'].cksum
122 new = new.__class__(str(new))
123 self.assertEqual(new['ICMPv6DestUnreach'].cksum,
124 pkt['ICMPv6DestUnreach'].cksum)
125 self.check_icmp_errror_embedded(pkt)
126 if pkt.haslayer(ICMPv6EchoRequest):
127 del new['ICMPv6EchoRequest'].cksum
128 new = new.__class__(str(new))
129 self.assertEqual(new['ICMPv6EchoRequest'].cksum,
130 pkt['ICMPv6EchoRequest'].cksum)
131 if pkt.haslayer(ICMPv6EchoReply):
132 del new['ICMPv6EchoReply'].cksum
133 new = new.__class__(str(new))
134 self.assertEqual(new['ICMPv6EchoReply'].cksum,
135 pkt['ICMPv6EchoReply'].cksum)
137 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
139 Create packet stream for inside network
141 :param in_if: Inside interface
142 :param out_if: Outside interface
143 :param dst_ip: Destination address
144 :param ttl: TTL of generated packets
147 dst_ip = out_if.remote_ip4
151 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
152 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
153 TCP(sport=self.tcp_port_in, dport=20))
157 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
158 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
159 UDP(sport=self.udp_port_in, dport=20))
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
165 ICMP(id=self.icmp_id_in, type='echo-request'))
170 def compose_ip6(self, ip4, pref, plen):
172 Compose IPv4-embedded IPv6 addresses
174 :param ip4: IPv4 address
175 :param pref: IPv6 prefix
176 :param plen: IPv6 prefix length
177 :returns: IPv4-embedded IPv6 addresses
179 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
180 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
195 pref_n[10] = ip4_n[3]
199 pref_n[10] = ip4_n[2]
200 pref_n[11] = ip4_n[3]
203 pref_n[10] = ip4_n[1]
204 pref_n[11] = ip4_n[2]
205 pref_n[12] = ip4_n[3]
207 pref_n[12] = ip4_n[0]
208 pref_n[13] = ip4_n[1]
209 pref_n[14] = ip4_n[2]
210 pref_n[15] = ip4_n[3]
211 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
213 def extract_ip4(self, ip6, plen):
215 Extract IPv4 address embedded in IPv6 addresses
217 :param ip6: IPv6 address
218 :param plen: IPv6 prefix length
219 :returns: extracted IPv4 address
221 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
253 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
255 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
257 Create IPv6 packet stream for inside network
259 :param in_if: Inside interface
260 :param out_if: Outside interface
261 :param ttl: Hop Limit of generated packets
262 :param pref: NAT64 prefix
263 :param plen: NAT64 prefix length
267 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
269 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
272 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
273 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
274 TCP(sport=self.tcp_port_in, dport=20))
278 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
279 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
280 UDP(sport=self.udp_port_in, dport=20))
284 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
285 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
286 ICMPv6EchoRequest(id=self.icmp_id_in))
291 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
292 use_inside_ports=False):
294 Create packet stream for outside network
296 :param out_if: Outside interface
297 :param dst_ip: Destination IP address (Default use global NAT address)
298 :param ttl: TTL of generated packets
299 :param use_inside_ports: Use inside NAT ports as destination ports
300 instead of outside ports
303 dst_ip = self.nat_addr
304 if not use_inside_ports:
305 tcp_port = self.tcp_port_out
306 udp_port = self.udp_port_out
307 icmp_id = self.icmp_id_out
309 tcp_port = self.tcp_port_in
310 udp_port = self.udp_port_in
311 icmp_id = self.icmp_id_in
314 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
315 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
316 TCP(dport=tcp_port, sport=20))
320 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
321 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
322 UDP(dport=udp_port, sport=20))
326 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
327 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
328 ICMP(id=icmp_id, type='echo-reply'))
333 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
335 Create packet stream for outside network
337 :param out_if: Outside interface
338 :param dst_ip: Destination IP address (Default use global NAT address)
339 :param hl: HL of generated packets
343 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
344 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
345 TCP(dport=self.tcp_port_out, sport=20))
349 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
350 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
351 UDP(dport=self.udp_port_out, sport=20))
355 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
356 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
357 ICMPv6EchoReply(id=self.icmp_id_out))
362 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
363 packet_num=3, dst_ip=None, is_ip6=False):
365 Verify captured packets on outside network
367 :param capture: Captured packets
368 :param nat_ip: Translated IP address (Default use global NAT address)
369 :param same_port: Sorce port number is not translated (Default False)
370 :param packet_num: Expected number of packets (Default 3)
371 :param dst_ip: Destination IP address (Default do not verify)
372 :param is_ip6: If L3 protocol is IPv6 (Default False)
376 ICMP46 = ICMPv6EchoRequest
381 nat_ip = self.nat_addr
382 self.assertEqual(packet_num, len(capture))
383 for packet in capture:
386 self.check_ip_checksum(packet)
387 self.assertEqual(packet[IP46].src, nat_ip)
388 if dst_ip is not None:
389 self.assertEqual(packet[IP46].dst, dst_ip)
390 if packet.haslayer(TCP):
392 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
395 packet[TCP].sport, self.tcp_port_in)
396 self.tcp_port_out = packet[TCP].sport
397 self.check_tcp_checksum(packet)
398 elif packet.haslayer(UDP):
400 self.assertEqual(packet[UDP].sport, self.udp_port_in)
403 packet[UDP].sport, self.udp_port_in)
404 self.udp_port_out = packet[UDP].sport
407 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
409 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
410 self.icmp_id_out = packet[ICMP46].id
412 self.check_icmpv6_checksum(packet)
414 self.check_icmp_checksum(packet)
416 self.logger.error(ppp("Unexpected or invalid packet "
417 "(outside network):", packet))
420 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
421 packet_num=3, dst_ip=None):
423 Verify captured packets on outside network
425 :param capture: Captured packets
426 :param nat_ip: Translated IP address
427 :param same_port: Sorce port number is not translated (Default False)
428 :param packet_num: Expected number of packets (Default 3)
429 :param dst_ip: Destination IP address (Default do not verify)
431 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
434 def verify_capture_in(self, capture, in_if, packet_num=3):
436 Verify captured packets on inside network
438 :param capture: Captured packets
439 :param in_if: Inside interface
440 :param packet_num: Expected number of packets (Default 3)
442 self.assertEqual(packet_num, len(capture))
443 for packet in capture:
445 self.check_ip_checksum(packet)
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 if packet.haslayer(TCP):
448 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
449 self.check_tcp_checksum(packet)
450 elif packet.haslayer(UDP):
451 self.assertEqual(packet[UDP].dport, self.udp_port_in)
453 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
454 self.check_icmp_checksum(packet)
456 self.logger.error(ppp("Unexpected or invalid packet "
457 "(inside network):", packet))
460 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
462 Verify captured IPv6 packets on inside network
464 :param capture: Captured packets
465 :param src_ip: Source IP
466 :param dst_ip: Destination IP address
467 :param packet_num: Expected number of packets (Default 3)
469 self.assertEqual(packet_num, len(capture))
470 for packet in capture:
472 self.assertEqual(packet[IPv6].src, src_ip)
473 self.assertEqual(packet[IPv6].dst, dst_ip)
474 if packet.haslayer(TCP):
475 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
476 self.check_tcp_checksum(packet)
477 elif packet.haslayer(UDP):
478 self.assertEqual(packet[UDP].dport, self.udp_port_in)
479 self.check_udp_checksum(packet)
481 self.assertEqual(packet[ICMPv6EchoReply].id,
483 self.check_icmpv6_checksum(packet)
485 self.logger.error(ppp("Unexpected or invalid packet "
486 "(inside network):", packet))
489 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
491 Verify captured packet that don't have to be translated
493 :param capture: Captured packets
494 :param ingress_if: Ingress interface
495 :param egress_if: Egress interface
497 for packet in capture:
499 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
500 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
501 if packet.haslayer(TCP):
502 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
503 elif packet.haslayer(UDP):
504 self.assertEqual(packet[UDP].sport, self.udp_port_in)
506 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
508 self.logger.error(ppp("Unexpected or invalid packet "
509 "(inside network):", packet))
512 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
513 packet_num=3, icmp_type=11):
515 Verify captured packets with ICMP errors on outside network
517 :param capture: Captured packets
518 :param src_ip: Translated IP address or IP address of VPP
519 (Default use global NAT address)
520 :param packet_num: Expected number of packets (Default 3)
521 :param icmp_type: Type of error ICMP packet
522 we are expecting (Default 11)
525 src_ip = self.nat_addr
526 self.assertEqual(packet_num, len(capture))
527 for packet in capture:
529 self.assertEqual(packet[IP].src, src_ip)
530 self.assertTrue(packet.haslayer(ICMP))
532 self.assertEqual(icmp.type, icmp_type)
533 self.assertTrue(icmp.haslayer(IPerror))
534 inner_ip = icmp[IPerror]
535 if inner_ip.haslayer(TCPerror):
536 self.assertEqual(inner_ip[TCPerror].dport,
538 elif inner_ip.haslayer(UDPerror):
539 self.assertEqual(inner_ip[UDPerror].dport,
542 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
544 self.logger.error(ppp("Unexpected or invalid packet "
545 "(outside network):", packet))
548 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
551 Verify captured packets with ICMP errors on inside network
553 :param capture: Captured packets
554 :param in_if: Inside interface
555 :param packet_num: Expected number of packets (Default 3)
556 :param icmp_type: Type of error ICMP packet
557 we are expecting (Default 11)
559 self.assertEqual(packet_num, len(capture))
560 for packet in capture:
562 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
563 self.assertTrue(packet.haslayer(ICMP))
565 self.assertEqual(icmp.type, icmp_type)
566 self.assertTrue(icmp.haslayer(IPerror))
567 inner_ip = icmp[IPerror]
568 if inner_ip.haslayer(TCPerror):
569 self.assertEqual(inner_ip[TCPerror].sport,
571 elif inner_ip.haslayer(UDPerror):
572 self.assertEqual(inner_ip[UDPerror].sport,
575 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
577 self.logger.error(ppp("Unexpected or invalid packet "
578 "(inside network):", packet))
581 def create_stream_frag(self, src_if, dst, sport, dport, data):
583 Create fragmented packet stream
585 :param src_if: Source interface
586 :param dst: Destination IPv4 address
587 :param sport: Source TCP port
588 :param dport: Destination TCP port
589 :param data: Payload data
592 id = random.randint(0, 65535)
593 p = (IP(src=src_if.remote_ip4, dst=dst) /
594 TCP(sport=sport, dport=dport) /
596 p = p.__class__(str(p))
597 chksum = p['TCP'].chksum
599 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
600 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
601 TCP(sport=sport, dport=dport, chksum=chksum) /
604 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
605 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
606 proto=IP_PROTOS.tcp) /
609 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
610 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
616 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
617 pref=None, plen=0, frag_size=128):
619 Create fragmented packet stream
621 :param src_if: Source interface
622 :param dst: Destination IPv4 address
623 :param sport: Source TCP port
624 :param dport: Destination TCP port
625 :param data: Payload data
626 :param pref: NAT64 prefix
627 :param plen: NAT64 prefix length
628 :param fragsize: size of fragments
632 dst_ip6 = ''.join(['64:ff9b::', dst])
634 dst_ip6 = self.compose_ip6(dst, pref, plen)
636 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
637 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
638 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
639 TCP(sport=sport, dport=dport) /
642 return fragment6(p, frag_size)
644 def reass_frags_and_verify(self, frags, src, dst):
646 Reassemble and verify fragmented packet
648 :param frags: Captured fragments
649 :param src: Source IPv4 address to verify
650 :param dst: Destination IPv4 address to verify
652 :returns: Reassembled IPv4 packet
654 buffer = StringIO.StringIO()
656 self.assertEqual(p[IP].src, src)
657 self.assertEqual(p[IP].dst, dst)
658 self.check_ip_checksum(p)
659 buffer.seek(p[IP].frag * 8)
660 buffer.write(p[IP].payload)
661 ip = frags[0].getlayer(IP)
662 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
663 proto=frags[0][IP].proto)
664 if ip.proto == IP_PROTOS.tcp:
665 p = (ip / TCP(buffer.getvalue()))
666 self.check_tcp_checksum(p)
667 elif ip.proto == IP_PROTOS.udp:
668 p = (ip / UDP(buffer.getvalue()))
671 def reass_frags_and_verify_ip6(self, frags, src, dst):
673 Reassemble and verify fragmented packet
675 :param frags: Captured fragments
676 :param src: Source IPv6 address to verify
677 :param dst: Destination IPv6 address to verify
679 :returns: Reassembled IPv6 packet
681 buffer = StringIO.StringIO()
683 self.assertEqual(p[IPv6].src, src)
684 self.assertEqual(p[IPv6].dst, dst)
685 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
686 buffer.write(p[IPv6ExtHdrFragment].payload)
687 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
688 nh=frags[0][IPv6ExtHdrFragment].nh)
689 if ip.nh == IP_PROTOS.tcp:
690 p = (ip / TCP(buffer.getvalue()))
691 self.check_tcp_checksum(p)
692 elif ip.nh == IP_PROTOS.udp:
693 p = (ip / UDP(buffer.getvalue()))
696 def verify_ipfix_nat44_ses(self, data):
698 Verify IPFIX NAT44 session create/delete event
700 :param data: Decoded IPFIX data records
702 nat44_ses_create_num = 0
703 nat44_ses_delete_num = 0
704 self.assertEqual(6, len(data))
707 self.assertIn(ord(record[230]), [4, 5])
708 if ord(record[230]) == 4:
709 nat44_ses_create_num += 1
711 nat44_ses_delete_num += 1
713 self.assertEqual(self.pg0.remote_ip4n, record[8])
714 # postNATSourceIPv4Address
715 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
718 self.assertEqual(struct.pack("!I", 0), record[234])
719 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
720 if IP_PROTOS.icmp == ord(record[4]):
721 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
722 self.assertEqual(struct.pack("!H", self.icmp_id_out),
724 elif IP_PROTOS.tcp == ord(record[4]):
725 self.assertEqual(struct.pack("!H", self.tcp_port_in),
727 self.assertEqual(struct.pack("!H", self.tcp_port_out),
729 elif IP_PROTOS.udp == ord(record[4]):
730 self.assertEqual(struct.pack("!H", self.udp_port_in),
732 self.assertEqual(struct.pack("!H", self.udp_port_out),
735 self.fail("Invalid protocol")
736 self.assertEqual(3, nat44_ses_create_num)
737 self.assertEqual(3, nat44_ses_delete_num)
739 def verify_ipfix_addr_exhausted(self, data):
741 Verify IPFIX NAT addresses event
743 :param data: Decoded IPFIX data records
745 self.assertEqual(1, len(data))
748 self.assertEqual(ord(record[230]), 3)
750 self.assertEqual(struct.pack("!I", 0), record[283])
752 def verify_ipfix_max_sessions(self, data, limit):
754 Verify IPFIX maximum session entries exceeded event
756 :param data: Decoded IPFIX data records
757 :param limit: Number of maximum session entries that can be created.
759 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 13)
763 # natQuotaExceededEvent
764 self.assertEqual(struct.pack("I", 1), record[466])
766 self.assertEqual(struct.pack("I", limit), record[471])
768 def verify_ipfix_max_bibs(self, data, limit):
770 Verify IPFIX maximum BIB entries exceeded event
772 :param data: Decoded IPFIX data records
773 :param limit: Number of maximum BIB entries that can be created.
775 self.assertEqual(1, len(data))
778 self.assertEqual(ord(record[230]), 13)
779 # natQuotaExceededEvent
780 self.assertEqual(struct.pack("I", 2), record[466])
782 self.assertEqual(struct.pack("I", limit), record[472])
784 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
786 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
788 :param data: Decoded IPFIX data records
789 :param limit: Number of maximum fragments pending reassembly
790 :param src_addr: IPv6 source address
792 self.assertEqual(1, len(data))
795 self.assertEqual(ord(record[230]), 13)
796 # natQuotaExceededEvent
797 self.assertEqual(struct.pack("I", 5), record[466])
798 # maxFragmentsPendingReassembly
799 self.assertEqual(struct.pack("I", limit), record[475])
801 self.assertEqual(src_addr, record[27])
803 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
805 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
807 :param data: Decoded IPFIX data records
808 :param limit: Number of maximum fragments pending reassembly
809 :param src_addr: IPv4 source address
811 self.assertEqual(1, len(data))
814 self.assertEqual(ord(record[230]), 13)
815 # natQuotaExceededEvent
816 self.assertEqual(struct.pack("I", 5), record[466])
817 # maxFragmentsPendingReassembly
818 self.assertEqual(struct.pack("I", limit), record[475])
820 self.assertEqual(src_addr, record[8])
822 def verify_ipfix_bib(self, data, is_create, src_addr):
824 Verify IPFIX NAT64 BIB create and delete events
826 :param data: Decoded IPFIX data records
827 :param is_create: Create event if nonzero value otherwise delete event
828 :param src_addr: IPv6 source address
830 self.assertEqual(1, len(data))
834 self.assertEqual(ord(record[230]), 10)
836 self.assertEqual(ord(record[230]), 11)
838 self.assertEqual(src_addr, record[27])
839 # postNATSourceIPv4Address
840 self.assertEqual(self.nat_addr_n, record[225])
842 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
844 self.assertEqual(struct.pack("!I", 0), record[234])
845 # sourceTransportPort
846 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
847 # postNAPTSourceTransportPort
848 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
850 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
853 Verify IPFIX NAT64 session create and delete events
855 :param data: Decoded IPFIX data records
856 :param is_create: Create event if nonzero value otherwise delete event
857 :param src_addr: IPv6 source address
858 :param dst_addr: IPv4 destination address
859 :param dst_port: destination TCP port
861 self.assertEqual(1, len(data))
865 self.assertEqual(ord(record[230]), 6)
867 self.assertEqual(ord(record[230]), 7)
869 self.assertEqual(src_addr, record[27])
870 # destinationIPv6Address
871 self.assertEqual(socket.inet_pton(socket.AF_INET6,
872 self.compose_ip6(dst_addr,
876 # postNATSourceIPv4Address
877 self.assertEqual(self.nat_addr_n, record[225])
878 # postNATDestinationIPv4Address
879 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
882 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
884 self.assertEqual(struct.pack("!I", 0), record[234])
885 # sourceTransportPort
886 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
887 # postNAPTSourceTransportPort
888 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
889 # destinationTransportPort
890 self.assertEqual(struct.pack("!H", dst_port), record[11])
891 # postNAPTDestinationTransportPort
892 self.assertEqual(struct.pack("!H", dst_port), record[228])
895 class TestNAT44(MethodHolder):
896 """ NAT44 Test Cases """
900 super(TestNAT44, cls).setUpClass()
903 cls.tcp_port_in = 6303
904 cls.tcp_port_out = 6303
905 cls.udp_port_in = 6304
906 cls.udp_port_out = 6304
907 cls.icmp_id_in = 6305
908 cls.icmp_id_out = 6305
909 cls.nat_addr = '10.0.0.3'
910 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
911 cls.ipfix_src_port = 4739
912 cls.ipfix_domain_id = 1
914 cls.create_pg_interfaces(range(10))
915 cls.interfaces = list(cls.pg_interfaces[0:4])
917 for i in cls.interfaces:
922 cls.pg0.generate_remote_hosts(3)
923 cls.pg0.configure_ipv4_neighbors()
925 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
926 cls.vapi.ip_table_add_del(10, is_add=1)
927 cls.vapi.ip_table_add_del(20, is_add=1)
929 cls.pg4._local_ip4 = "172.16.255.1"
930 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
931 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
932 cls.pg4.set_table_ip4(10)
933 cls.pg5._local_ip4 = "172.17.255.3"
934 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
935 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
936 cls.pg5.set_table_ip4(10)
937 cls.pg6._local_ip4 = "172.16.255.1"
938 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
939 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
940 cls.pg6.set_table_ip4(20)
941 for i in cls.overlapping_interfaces:
949 cls.pg9.generate_remote_hosts(2)
951 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
952 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
956 cls.pg9.resolve_arp()
957 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
958 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
959 cls.pg9.resolve_arp()
962 super(TestNAT44, cls).tearDownClass()
965 def clear_nat44(self):
967 Clear NAT44 configuration.
969 # I found no elegant way to do this
970 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
971 dst_address_length=32,
972 next_hop_address=self.pg7.remote_ip4n,
973 next_hop_sw_if_index=self.pg7.sw_if_index,
975 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
976 dst_address_length=32,
977 next_hop_address=self.pg8.remote_ip4n,
978 next_hop_sw_if_index=self.pg8.sw_if_index,
981 for intf in [self.pg7, self.pg8]:
982 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
984 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
989 if self.pg7.has_ip4_config:
990 self.pg7.unconfig_ip4()
992 self.vapi.nat44_forwarding_enable_disable(0)
994 interfaces = self.vapi.nat44_interface_addr_dump()
995 for intf in interfaces:
996 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
997 twice_nat=intf.twice_nat,
1000 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
1001 domain_id=self.ipfix_domain_id)
1002 self.ipfix_src_port = 4739
1003 self.ipfix_domain_id = 1
1005 interfaces = self.vapi.nat44_interface_dump()
1006 for intf in interfaces:
1007 if intf.is_inside > 1:
1008 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1011 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
1015 interfaces = self.vapi.nat44_interface_output_feature_dump()
1016 for intf in interfaces:
1017 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
1021 static_mappings = self.vapi.nat44_static_mapping_dump()
1022 for sm in static_mappings:
1023 self.vapi.nat44_add_del_static_mapping(
1024 sm.local_ip_address,
1025 sm.external_ip_address,
1026 local_port=sm.local_port,
1027 external_port=sm.external_port,
1028 addr_only=sm.addr_only,
1030 protocol=sm.protocol,
1031 twice_nat=sm.twice_nat,
1032 out2in_only=sm.out2in_only,
1036 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
1037 for lb_sm in lb_static_mappings:
1038 self.vapi.nat44_add_del_lb_static_mapping(
1039 lb_sm.external_addr,
1040 lb_sm.external_port,
1042 vrf_id=lb_sm.vrf_id,
1043 twice_nat=lb_sm.twice_nat,
1044 out2in_only=lb_sm.out2in_only,
1050 identity_mappings = self.vapi.nat44_identity_mapping_dump()
1051 for id_m in identity_mappings:
1052 self.vapi.nat44_add_del_identity_mapping(
1053 addr_only=id_m.addr_only,
1056 sw_if_index=id_m.sw_if_index,
1058 protocol=id_m.protocol,
1061 adresses = self.vapi.nat44_address_dump()
1062 for addr in adresses:
1063 self.vapi.nat44_add_del_address_range(addr.ip_address,
1065 twice_nat=addr.twice_nat,
1068 self.vapi.nat_set_reass()
1069 self.vapi.nat_set_reass(is_ip6=1)
1071 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1072 local_port=0, external_port=0, vrf_id=0,
1073 is_add=1, external_sw_if_index=0xFFFFFFFF,
1074 proto=0, twice_nat=0, out2in_only=0, tag=""):
1076 Add/delete NAT44 static mapping
1078 :param local_ip: Local IP address
1079 :param external_ip: External IP address
1080 :param local_port: Local port number (Optional)
1081 :param external_port: External port number (Optional)
1082 :param vrf_id: VRF ID (Default 0)
1083 :param is_add: 1 if add, 0 if delete (Default add)
1084 :param external_sw_if_index: External interface instead of IP address
1085 :param proto: IP protocol (Mandatory if port specified)
1086 :param twice_nat: 1 if translate external host address and port
1087 :param out2in_only: if 1 rule is matching only out2in direction
1088 :param tag: Opaque string tag
1091 if local_port and external_port:
1093 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1094 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1095 self.vapi.nat44_add_del_static_mapping(
1098 external_sw_if_index,
1109 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1111 Add/delete NAT44 address
1113 :param ip: IP address
1114 :param is_add: 1 if add, 0 if delete (Default add)
1115 :param twice_nat: twice NAT address for extenal hosts
1117 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1118 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1120 twice_nat=twice_nat)
1122 def test_dynamic(self):
1123 """ NAT44 dynamic translation test """
1125 self.nat44_add_address(self.nat_addr)
1126 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1127 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1131 pkts = self.create_stream_in(self.pg0, self.pg1)
1132 self.pg0.add_stream(pkts)
1133 self.pg_enable_capture(self.pg_interfaces)
1135 capture = self.pg1.get_capture(len(pkts))
1136 self.verify_capture_out(capture)
1139 pkts = self.create_stream_out(self.pg1)
1140 self.pg1.add_stream(pkts)
1141 self.pg_enable_capture(self.pg_interfaces)
1143 capture = self.pg0.get_capture(len(pkts))
1144 self.verify_capture_in(capture, self.pg0)
1146 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1147 """ NAT44 handling of client packets with TTL=1 """
1149 self.nat44_add_address(self.nat_addr)
1150 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1151 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1154 # Client side - generate traffic
1155 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1156 self.pg0.add_stream(pkts)
1157 self.pg_enable_capture(self.pg_interfaces)
1160 # Client side - verify ICMP type 11 packets
1161 capture = self.pg0.get_capture(len(pkts))
1162 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1164 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1165 """ NAT44 handling of server packets with TTL=1 """
1167 self.nat44_add_address(self.nat_addr)
1168 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1169 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1172 # Client side - create sessions
1173 pkts = self.create_stream_in(self.pg0, self.pg1)
1174 self.pg0.add_stream(pkts)
1175 self.pg_enable_capture(self.pg_interfaces)
1178 # Server side - generate traffic
1179 capture = self.pg1.get_capture(len(pkts))
1180 self.verify_capture_out(capture)
1181 pkts = self.create_stream_out(self.pg1, ttl=1)
1182 self.pg1.add_stream(pkts)
1183 self.pg_enable_capture(self.pg_interfaces)
1186 # Server side - verify ICMP type 11 packets
1187 capture = self.pg1.get_capture(len(pkts))
1188 self.verify_capture_out_with_icmp_errors(capture,
1189 src_ip=self.pg1.local_ip4)
1191 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1192 """ NAT44 handling of error responses to client packets with TTL=2 """
1194 self.nat44_add_address(self.nat_addr)
1195 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1196 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1199 # Client side - generate traffic
1200 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1201 self.pg0.add_stream(pkts)
1202 self.pg_enable_capture(self.pg_interfaces)
1205 # Server side - simulate ICMP type 11 response
1206 capture = self.pg1.get_capture(len(pkts))
1207 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1208 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1209 ICMP(type=11) / packet[IP] for packet in capture]
1210 self.pg1.add_stream(pkts)
1211 self.pg_enable_capture(self.pg_interfaces)
1214 # Client side - verify ICMP type 11 packets
1215 capture = self.pg0.get_capture(len(pkts))
1216 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1218 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1219 """ NAT44 handling of error responses to server packets with TTL=2 """
1221 self.nat44_add_address(self.nat_addr)
1222 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1223 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1226 # Client side - create sessions
1227 pkts = self.create_stream_in(self.pg0, self.pg1)
1228 self.pg0.add_stream(pkts)
1229 self.pg_enable_capture(self.pg_interfaces)
1232 # Server side - generate traffic
1233 capture = self.pg1.get_capture(len(pkts))
1234 self.verify_capture_out(capture)
1235 pkts = self.create_stream_out(self.pg1, ttl=2)
1236 self.pg1.add_stream(pkts)
1237 self.pg_enable_capture(self.pg_interfaces)
1240 # Client side - simulate ICMP type 11 response
1241 capture = self.pg0.get_capture(len(pkts))
1242 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1243 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1244 ICMP(type=11) / packet[IP] for packet in capture]
1245 self.pg0.add_stream(pkts)
1246 self.pg_enable_capture(self.pg_interfaces)
1249 # Server side - verify ICMP type 11 packets
1250 capture = self.pg1.get_capture(len(pkts))
1251 self.verify_capture_out_with_icmp_errors(capture)
1253 def test_ping_out_interface_from_outside(self):
1254 """ Ping NAT44 out interface from outside network """
1256 self.nat44_add_address(self.nat_addr)
1257 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1258 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1261 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1262 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1263 ICMP(id=self.icmp_id_out, type='echo-request'))
1265 self.pg1.add_stream(pkts)
1266 self.pg_enable_capture(self.pg_interfaces)
1268 capture = self.pg1.get_capture(len(pkts))
1269 self.assertEqual(1, len(capture))
1272 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1273 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1274 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1275 self.assertEqual(packet[ICMP].type, 0) # echo reply
1277 self.logger.error(ppp("Unexpected or invalid packet "
1278 "(outside network):", packet))
1281 def test_ping_internal_host_from_outside(self):
1282 """ Ping internal host from outside network """
1284 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1285 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1286 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1290 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1291 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1292 ICMP(id=self.icmp_id_out, type='echo-request'))
1293 self.pg1.add_stream(pkt)
1294 self.pg_enable_capture(self.pg_interfaces)
1296 capture = self.pg0.get_capture(1)
1297 self.verify_capture_in(capture, self.pg0, packet_num=1)
1298 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1301 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1302 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1303 ICMP(id=self.icmp_id_in, type='echo-reply'))
1304 self.pg0.add_stream(pkt)
1305 self.pg_enable_capture(self.pg_interfaces)
1307 capture = self.pg1.get_capture(1)
1308 self.verify_capture_out(capture, same_port=True, packet_num=1)
1309 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1311 def test_forwarding(self):
1312 """ NAT44 forwarding test """
1314 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1315 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1317 self.vapi.nat44_forwarding_enable_disable(1)
1319 real_ip = self.pg0.remote_ip4n
1320 alias_ip = self.nat_addr_n
1321 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1322 external_ip=alias_ip)
1325 # in2out - static mapping match
1327 pkts = self.create_stream_out(self.pg1)
1328 self.pg1.add_stream(pkts)
1329 self.pg_enable_capture(self.pg_interfaces)
1331 capture = self.pg0.get_capture(len(pkts))
1332 self.verify_capture_in(capture, self.pg0)
1334 pkts = self.create_stream_in(self.pg0, self.pg1)
1335 self.pg0.add_stream(pkts)
1336 self.pg_enable_capture(self.pg_interfaces)
1338 capture = self.pg1.get_capture(len(pkts))
1339 self.verify_capture_out(capture, same_port=True)
1341 # in2out - no static mapping match
1343 host0 = self.pg0.remote_hosts[0]
1344 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1346 pkts = self.create_stream_out(self.pg1,
1347 dst_ip=self.pg0.remote_ip4,
1348 use_inside_ports=True)
1349 self.pg1.add_stream(pkts)
1350 self.pg_enable_capture(self.pg_interfaces)
1352 capture = self.pg0.get_capture(len(pkts))
1353 self.verify_capture_in(capture, self.pg0)
1355 pkts = self.create_stream_in(self.pg0, self.pg1)
1356 self.pg0.add_stream(pkts)
1357 self.pg_enable_capture(self.pg_interfaces)
1359 capture = self.pg1.get_capture(len(pkts))
1360 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1363 self.pg0.remote_hosts[0] = host0
1366 self.vapi.nat44_forwarding_enable_disable(0)
1367 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1368 external_ip=alias_ip,
1371 def test_static_in(self):
1372 """ 1:1 NAT initialized from inside network """
1374 nat_ip = "10.0.0.10"
1375 self.tcp_port_out = 6303
1376 self.udp_port_out = 6304
1377 self.icmp_id_out = 6305
1379 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1380 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1381 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1383 sm = self.vapi.nat44_static_mapping_dump()
1384 self.assertEqual(len(sm), 1)
1385 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1386 self.assertEqual(sm[0].protocol, 0)
1387 self.assertEqual(sm[0].local_port, 0)
1388 self.assertEqual(sm[0].external_port, 0)
1391 pkts = self.create_stream_in(self.pg0, self.pg1)
1392 self.pg0.add_stream(pkts)
1393 self.pg_enable_capture(self.pg_interfaces)
1395 capture = self.pg1.get_capture(len(pkts))
1396 self.verify_capture_out(capture, nat_ip, True)
1399 pkts = self.create_stream_out(self.pg1, nat_ip)
1400 self.pg1.add_stream(pkts)
1401 self.pg_enable_capture(self.pg_interfaces)
1403 capture = self.pg0.get_capture(len(pkts))
1404 self.verify_capture_in(capture, self.pg0)
1406 def test_static_out(self):
1407 """ 1:1 NAT initialized from outside network """
1409 nat_ip = "10.0.0.20"
1410 self.tcp_port_out = 6303
1411 self.udp_port_out = 6304
1412 self.icmp_id_out = 6305
1415 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1416 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1417 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1419 sm = self.vapi.nat44_static_mapping_dump()
1420 self.assertEqual(len(sm), 1)
1421 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1424 pkts = self.create_stream_out(self.pg1, nat_ip)
1425 self.pg1.add_stream(pkts)
1426 self.pg_enable_capture(self.pg_interfaces)
1428 capture = self.pg0.get_capture(len(pkts))
1429 self.verify_capture_in(capture, self.pg0)
1432 pkts = self.create_stream_in(self.pg0, self.pg1)
1433 self.pg0.add_stream(pkts)
1434 self.pg_enable_capture(self.pg_interfaces)
1436 capture = self.pg1.get_capture(len(pkts))
1437 self.verify_capture_out(capture, nat_ip, True)
1439 def test_static_with_port_in(self):
1440 """ 1:1 NAPT initialized from inside network """
1442 self.tcp_port_out = 3606
1443 self.udp_port_out = 3607
1444 self.icmp_id_out = 3608
1446 self.nat44_add_address(self.nat_addr)
1447 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1448 self.tcp_port_in, self.tcp_port_out,
1449 proto=IP_PROTOS.tcp)
1450 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1451 self.udp_port_in, self.udp_port_out,
1452 proto=IP_PROTOS.udp)
1453 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1454 self.icmp_id_in, self.icmp_id_out,
1455 proto=IP_PROTOS.icmp)
1456 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1457 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1461 pkts = self.create_stream_in(self.pg0, self.pg1)
1462 self.pg0.add_stream(pkts)
1463 self.pg_enable_capture(self.pg_interfaces)
1465 capture = self.pg1.get_capture(len(pkts))
1466 self.verify_capture_out(capture)
1469 pkts = self.create_stream_out(self.pg1)
1470 self.pg1.add_stream(pkts)
1471 self.pg_enable_capture(self.pg_interfaces)
1473 capture = self.pg0.get_capture(len(pkts))
1474 self.verify_capture_in(capture, self.pg0)
1476 def test_static_with_port_out(self):
1477 """ 1:1 NAPT initialized from outside network """
1479 self.tcp_port_out = 30606
1480 self.udp_port_out = 30607
1481 self.icmp_id_out = 30608
1483 self.nat44_add_address(self.nat_addr)
1484 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1485 self.tcp_port_in, self.tcp_port_out,
1486 proto=IP_PROTOS.tcp)
1487 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1488 self.udp_port_in, self.udp_port_out,
1489 proto=IP_PROTOS.udp)
1490 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1491 self.icmp_id_in, self.icmp_id_out,
1492 proto=IP_PROTOS.icmp)
1493 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1494 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1498 pkts = self.create_stream_out(self.pg1)
1499 self.pg1.add_stream(pkts)
1500 self.pg_enable_capture(self.pg_interfaces)
1502 capture = self.pg0.get_capture(len(pkts))
1503 self.verify_capture_in(capture, self.pg0)
1506 pkts = self.create_stream_in(self.pg0, self.pg1)
1507 self.pg0.add_stream(pkts)
1508 self.pg_enable_capture(self.pg_interfaces)
1510 capture = self.pg1.get_capture(len(pkts))
1511 self.verify_capture_out(capture)
1513 def test_static_with_port_out2(self):
1514 """ 1:1 NAPT symmetrical rule """
1519 self.vapi.nat44_forwarding_enable_disable(1)
1520 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1521 local_port, external_port,
1522 proto=IP_PROTOS.tcp, out2in_only=1)
1523 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1524 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1527 # from client to service
1528 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1529 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1530 TCP(sport=12345, dport=external_port))
1531 self.pg1.add_stream(p)
1532 self.pg_enable_capture(self.pg_interfaces)
1534 capture = self.pg0.get_capture(1)
1540 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1541 self.assertEqual(tcp.dport, local_port)
1542 self.check_tcp_checksum(p)
1543 self.check_ip_checksum(p)
1545 self.logger.error(ppp("Unexpected or invalid packet:", p))
1549 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1550 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1551 ICMP(type=11) / capture[0][IP])
1552 self.pg0.add_stream(p)
1553 self.pg_enable_capture(self.pg_interfaces)
1555 capture = self.pg1.get_capture(1)
1558 self.assertEqual(p[IP].src, self.nat_addr)
1560 self.assertEqual(inner.dst, self.nat_addr)
1561 self.assertEqual(inner[TCPerror].dport, external_port)
1563 self.logger.error(ppp("Unexpected or invalid packet:", p))
1566 # from service back to client
1567 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1568 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1569 TCP(sport=local_port, dport=12345))
1570 self.pg0.add_stream(p)
1571 self.pg_enable_capture(self.pg_interfaces)
1573 capture = self.pg1.get_capture(1)
1578 self.assertEqual(ip.src, self.nat_addr)
1579 self.assertEqual(tcp.sport, external_port)
1580 self.check_tcp_checksum(p)
1581 self.check_ip_checksum(p)
1583 self.logger.error(ppp("Unexpected or invalid packet:", p))
1587 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1588 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1589 ICMP(type=11) / capture[0][IP])
1590 self.pg1.add_stream(p)
1591 self.pg_enable_capture(self.pg_interfaces)
1593 capture = self.pg0.get_capture(1)
1596 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1598 self.assertEqual(inner.src, self.pg0.remote_ip4)
1599 self.assertEqual(inner[TCPerror].sport, local_port)
1601 self.logger.error(ppp("Unexpected or invalid packet:", p))
1604 # from client to server (no translation)
1605 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1606 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1607 TCP(sport=12346, dport=local_port))
1608 self.pg1.add_stream(p)
1609 self.pg_enable_capture(self.pg_interfaces)
1611 capture = self.pg0.get_capture(1)
1617 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1618 self.assertEqual(tcp.dport, local_port)
1619 self.check_tcp_checksum(p)
1620 self.check_ip_checksum(p)
1622 self.logger.error(ppp("Unexpected or invalid packet:", p))
1625 # from service back to client (no translation)
1626 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1627 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1628 TCP(sport=local_port, dport=12346))
1629 self.pg0.add_stream(p)
1630 self.pg_enable_capture(self.pg_interfaces)
1632 capture = self.pg1.get_capture(1)
1637 self.assertEqual(ip.src, self.pg0.remote_ip4)
1638 self.assertEqual(tcp.sport, local_port)
1639 self.check_tcp_checksum(p)
1640 self.check_ip_checksum(p)
1642 self.logger.error(ppp("Unexpected or invalid packet:", p))
1645 def test_static_vrf_aware(self):
1646 """ 1:1 NAT VRF awareness """
1648 nat_ip1 = "10.0.0.30"
1649 nat_ip2 = "10.0.0.40"
1650 self.tcp_port_out = 6303
1651 self.udp_port_out = 6304
1652 self.icmp_id_out = 6305
1654 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1656 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1658 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1660 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1661 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1663 # inside interface VRF match NAT44 static mapping VRF
1664 pkts = self.create_stream_in(self.pg4, self.pg3)
1665 self.pg4.add_stream(pkts)
1666 self.pg_enable_capture(self.pg_interfaces)
1668 capture = self.pg3.get_capture(len(pkts))
1669 self.verify_capture_out(capture, nat_ip1, True)
1671 # inside interface VRF don't match NAT44 static mapping VRF (packets
1673 pkts = self.create_stream_in(self.pg0, self.pg3)
1674 self.pg0.add_stream(pkts)
1675 self.pg_enable_capture(self.pg_interfaces)
1677 self.pg3.assert_nothing_captured()
1679 def test_dynamic_to_static(self):
1680 """ Switch from dynamic translation to 1:1NAT """
1681 nat_ip = "10.0.0.10"
1682 self.tcp_port_out = 6303
1683 self.udp_port_out = 6304
1684 self.icmp_id_out = 6305
1686 self.nat44_add_address(self.nat_addr)
1687 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1688 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1692 pkts = self.create_stream_in(self.pg0, self.pg1)
1693 self.pg0.add_stream(pkts)
1694 self.pg_enable_capture(self.pg_interfaces)
1696 capture = self.pg1.get_capture(len(pkts))
1697 self.verify_capture_out(capture)
1700 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1701 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1702 self.assertEqual(len(sessions), 0)
1703 pkts = self.create_stream_in(self.pg0, self.pg1)
1704 self.pg0.add_stream(pkts)
1705 self.pg_enable_capture(self.pg_interfaces)
1707 capture = self.pg1.get_capture(len(pkts))
1708 self.verify_capture_out(capture, nat_ip, True)
1710 def test_identity_nat(self):
1711 """ Identity NAT """
1713 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1714 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1715 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1718 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1719 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1720 TCP(sport=12345, dport=56789))
1721 self.pg1.add_stream(p)
1722 self.pg_enable_capture(self.pg_interfaces)
1724 capture = self.pg0.get_capture(1)
1729 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1730 self.assertEqual(ip.src, self.pg1.remote_ip4)
1731 self.assertEqual(tcp.dport, 56789)
1732 self.assertEqual(tcp.sport, 12345)
1733 self.check_tcp_checksum(p)
1734 self.check_ip_checksum(p)
1736 self.logger.error(ppp("Unexpected or invalid packet:", p))
1739 def test_static_lb(self):
1740 """ NAT44 local service load balancing """
1741 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1744 server1 = self.pg0.remote_hosts[0]
1745 server2 = self.pg0.remote_hosts[1]
1747 locals = [{'addr': server1.ip4n,
1750 {'addr': server2.ip4n,
1754 self.nat44_add_address(self.nat_addr)
1755 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1758 local_num=len(locals),
1760 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1761 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1764 # from client to service
1765 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1766 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1767 TCP(sport=12345, dport=external_port))
1768 self.pg1.add_stream(p)
1769 self.pg_enable_capture(self.pg_interfaces)
1771 capture = self.pg0.get_capture(1)
1777 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1778 if ip.dst == server1.ip4:
1782 self.assertEqual(tcp.dport, local_port)
1783 self.check_tcp_checksum(p)
1784 self.check_ip_checksum(p)
1786 self.logger.error(ppp("Unexpected or invalid packet:", p))
1789 # from service back to client
1790 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1791 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1792 TCP(sport=local_port, dport=12345))
1793 self.pg0.add_stream(p)
1794 self.pg_enable_capture(self.pg_interfaces)
1796 capture = self.pg1.get_capture(1)
1801 self.assertEqual(ip.src, self.nat_addr)
1802 self.assertEqual(tcp.sport, external_port)
1803 self.check_tcp_checksum(p)
1804 self.check_ip_checksum(p)
1806 self.logger.error(ppp("Unexpected or invalid packet:", p))
1809 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1810 def test_static_lb_multi_clients(self):
1811 """ NAT44 local service load balancing - multiple clients"""
1813 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1816 server1 = self.pg0.remote_hosts[0]
1817 server2 = self.pg0.remote_hosts[1]
1819 locals = [{'addr': server1.ip4n,
1822 {'addr': server2.ip4n,
1826 self.nat44_add_address(self.nat_addr)
1827 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1830 local_num=len(locals),
1832 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1833 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1838 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1840 for client in clients:
1841 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1842 IP(src=client, dst=self.nat_addr) /
1843 TCP(sport=12345, dport=external_port))
1845 self.pg1.add_stream(pkts)
1846 self.pg_enable_capture(self.pg_interfaces)
1848 capture = self.pg0.get_capture(len(pkts))
1850 if p[IP].dst == server1.ip4:
1854 self.assertTrue(server1_n > server2_n)
1856 def test_static_lb_2(self):
1857 """ NAT44 local service load balancing (asymmetrical rule) """
1858 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1861 server1 = self.pg0.remote_hosts[0]
1862 server2 = self.pg0.remote_hosts[1]
1864 locals = [{'addr': server1.ip4n,
1867 {'addr': server2.ip4n,
1871 self.vapi.nat44_forwarding_enable_disable(1)
1872 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1876 local_num=len(locals),
1878 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1879 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1882 # from client to service
1883 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1884 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1885 TCP(sport=12345, dport=external_port))
1886 self.pg1.add_stream(p)
1887 self.pg_enable_capture(self.pg_interfaces)
1889 capture = self.pg0.get_capture(1)
1895 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1896 if ip.dst == server1.ip4:
1900 self.assertEqual(tcp.dport, local_port)
1901 self.check_tcp_checksum(p)
1902 self.check_ip_checksum(p)
1904 self.logger.error(ppp("Unexpected or invalid packet:", p))
1907 # from service back to client
1908 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1909 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1910 TCP(sport=local_port, dport=12345))
1911 self.pg0.add_stream(p)
1912 self.pg_enable_capture(self.pg_interfaces)
1914 capture = self.pg1.get_capture(1)
1919 self.assertEqual(ip.src, self.nat_addr)
1920 self.assertEqual(tcp.sport, external_port)
1921 self.check_tcp_checksum(p)
1922 self.check_ip_checksum(p)
1924 self.logger.error(ppp("Unexpected or invalid packet:", p))
1927 # from client to server (no translation)
1928 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1929 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1930 TCP(sport=12346, dport=local_port))
1931 self.pg1.add_stream(p)
1932 self.pg_enable_capture(self.pg_interfaces)
1934 capture = self.pg0.get_capture(1)
1940 self.assertEqual(ip.dst, server1.ip4)
1941 self.assertEqual(tcp.dport, local_port)
1942 self.check_tcp_checksum(p)
1943 self.check_ip_checksum(p)
1945 self.logger.error(ppp("Unexpected or invalid packet:", p))
1948 # from service back to client (no translation)
1949 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1950 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1951 TCP(sport=local_port, dport=12346))
1952 self.pg0.add_stream(p)
1953 self.pg_enable_capture(self.pg_interfaces)
1955 capture = self.pg1.get_capture(1)
1960 self.assertEqual(ip.src, server1.ip4)
1961 self.assertEqual(tcp.sport, local_port)
1962 self.check_tcp_checksum(p)
1963 self.check_ip_checksum(p)
1965 self.logger.error(ppp("Unexpected or invalid packet:", p))
1968 def test_multiple_inside_interfaces(self):
1969 """ NAT44 multiple non-overlapping address space inside interfaces """
1971 self.nat44_add_address(self.nat_addr)
1972 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1973 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1974 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1977 # between two NAT44 inside interfaces (no translation)
1978 pkts = self.create_stream_in(self.pg0, self.pg1)
1979 self.pg0.add_stream(pkts)
1980 self.pg_enable_capture(self.pg_interfaces)
1982 capture = self.pg1.get_capture(len(pkts))
1983 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1985 # from NAT44 inside to interface without NAT44 feature (no translation)
1986 pkts = self.create_stream_in(self.pg0, self.pg2)
1987 self.pg0.add_stream(pkts)
1988 self.pg_enable_capture(self.pg_interfaces)
1990 capture = self.pg2.get_capture(len(pkts))
1991 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1993 # in2out 1st interface
1994 pkts = self.create_stream_in(self.pg0, self.pg3)
1995 self.pg0.add_stream(pkts)
1996 self.pg_enable_capture(self.pg_interfaces)
1998 capture = self.pg3.get_capture(len(pkts))
1999 self.verify_capture_out(capture)
2001 # out2in 1st interface
2002 pkts = self.create_stream_out(self.pg3)
2003 self.pg3.add_stream(pkts)
2004 self.pg_enable_capture(self.pg_interfaces)
2006 capture = self.pg0.get_capture(len(pkts))
2007 self.verify_capture_in(capture, self.pg0)
2009 # in2out 2nd interface
2010 pkts = self.create_stream_in(self.pg1, self.pg3)
2011 self.pg1.add_stream(pkts)
2012 self.pg_enable_capture(self.pg_interfaces)
2014 capture = self.pg3.get_capture(len(pkts))
2015 self.verify_capture_out(capture)
2017 # out2in 2nd interface
2018 pkts = self.create_stream_out(self.pg3)
2019 self.pg3.add_stream(pkts)
2020 self.pg_enable_capture(self.pg_interfaces)
2022 capture = self.pg1.get_capture(len(pkts))
2023 self.verify_capture_in(capture, self.pg1)
2025 def test_inside_overlapping_interfaces(self):
2026 """ NAT44 multiple inside interfaces with overlapping address space """
2028 static_nat_ip = "10.0.0.10"
2029 self.nat44_add_address(self.nat_addr)
2030 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
2032 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
2033 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
2034 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
2035 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
2038 # between NAT44 inside interfaces with same VRF (no translation)
2039 pkts = self.create_stream_in(self.pg4, self.pg5)
2040 self.pg4.add_stream(pkts)
2041 self.pg_enable_capture(self.pg_interfaces)
2043 capture = self.pg5.get_capture(len(pkts))
2044 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
2046 # between NAT44 inside interfaces with different VRF (hairpinning)
2047 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2048 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2049 TCP(sport=1234, dport=5678))
2050 self.pg4.add_stream(p)
2051 self.pg_enable_capture(self.pg_interfaces)
2053 capture = self.pg6.get_capture(1)
2058 self.assertEqual(ip.src, self.nat_addr)
2059 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2060 self.assertNotEqual(tcp.sport, 1234)
2061 self.assertEqual(tcp.dport, 5678)
2063 self.logger.error(ppp("Unexpected or invalid packet:", p))
2066 # in2out 1st interface
2067 pkts = self.create_stream_in(self.pg4, self.pg3)
2068 self.pg4.add_stream(pkts)
2069 self.pg_enable_capture(self.pg_interfaces)
2071 capture = self.pg3.get_capture(len(pkts))
2072 self.verify_capture_out(capture)
2074 # out2in 1st interface
2075 pkts = self.create_stream_out(self.pg3)
2076 self.pg3.add_stream(pkts)
2077 self.pg_enable_capture(self.pg_interfaces)
2079 capture = self.pg4.get_capture(len(pkts))
2080 self.verify_capture_in(capture, self.pg4)
2082 # in2out 2nd interface
2083 pkts = self.create_stream_in(self.pg5, self.pg3)
2084 self.pg5.add_stream(pkts)
2085 self.pg_enable_capture(self.pg_interfaces)
2087 capture = self.pg3.get_capture(len(pkts))
2088 self.verify_capture_out(capture)
2090 # out2in 2nd interface
2091 pkts = self.create_stream_out(self.pg3)
2092 self.pg3.add_stream(pkts)
2093 self.pg_enable_capture(self.pg_interfaces)
2095 capture = self.pg5.get_capture(len(pkts))
2096 self.verify_capture_in(capture, self.pg5)
2099 addresses = self.vapi.nat44_address_dump()
2100 self.assertEqual(len(addresses), 1)
2101 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2102 self.assertEqual(len(sessions), 3)
2103 for session in sessions:
2104 self.assertFalse(session.is_static)
2105 self.assertEqual(session.inside_ip_address[0:4],
2106 self.pg5.remote_ip4n)
2107 self.assertEqual(session.outside_ip_address,
2108 addresses[0].ip_address)
2109 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2110 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2111 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2112 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2113 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2114 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2115 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2116 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2117 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2119 # in2out 3rd interface
2120 pkts = self.create_stream_in(self.pg6, self.pg3)
2121 self.pg6.add_stream(pkts)
2122 self.pg_enable_capture(self.pg_interfaces)
2124 capture = self.pg3.get_capture(len(pkts))
2125 self.verify_capture_out(capture, static_nat_ip, True)
2127 # out2in 3rd interface
2128 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2129 self.pg3.add_stream(pkts)
2130 self.pg_enable_capture(self.pg_interfaces)
2132 capture = self.pg6.get_capture(len(pkts))
2133 self.verify_capture_in(capture, self.pg6)
2135 # general user and session dump verifications
2136 users = self.vapi.nat44_user_dump()
2137 self.assertTrue(len(users) >= 3)
2138 addresses = self.vapi.nat44_address_dump()
2139 self.assertEqual(len(addresses), 1)
2141 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2143 for session in sessions:
2144 self.assertEqual(user.ip_address, session.inside_ip_address)
2145 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2146 self.assertTrue(session.protocol in
2147 [IP_PROTOS.tcp, IP_PROTOS.udp,
2151 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2152 self.assertTrue(len(sessions) >= 4)
2153 for session in sessions:
2154 self.assertFalse(session.is_static)
2155 self.assertEqual(session.inside_ip_address[0:4],
2156 self.pg4.remote_ip4n)
2157 self.assertEqual(session.outside_ip_address,
2158 addresses[0].ip_address)
2161 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2162 self.assertTrue(len(sessions) >= 3)
2163 for session in sessions:
2164 self.assertTrue(session.is_static)
2165 self.assertEqual(session.inside_ip_address[0:4],
2166 self.pg6.remote_ip4n)
2167 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2168 map(int, static_nat_ip.split('.')))
2169 self.assertTrue(session.inside_port in
2170 [self.tcp_port_in, self.udp_port_in,
2173 def test_hairpinning(self):
2174 """ NAT44 hairpinning - 1:1 NAPT """
2176 host = self.pg0.remote_hosts[0]
2177 server = self.pg0.remote_hosts[1]
2180 server_in_port = 5678
2181 server_out_port = 8765
2183 self.nat44_add_address(self.nat_addr)
2184 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2185 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2187 # add static mapping for server
2188 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2189 server_in_port, server_out_port,
2190 proto=IP_PROTOS.tcp)
2192 # send packet from host to server
2193 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2194 IP(src=host.ip4, dst=self.nat_addr) /
2195 TCP(sport=host_in_port, dport=server_out_port))
2196 self.pg0.add_stream(p)
2197 self.pg_enable_capture(self.pg_interfaces)
2199 capture = self.pg0.get_capture(1)
2204 self.assertEqual(ip.src, self.nat_addr)
2205 self.assertEqual(ip.dst, server.ip4)
2206 self.assertNotEqual(tcp.sport, host_in_port)
2207 self.assertEqual(tcp.dport, server_in_port)
2208 self.check_tcp_checksum(p)
2209 host_out_port = tcp.sport
2211 self.logger.error(ppp("Unexpected or invalid packet:", p))
2214 # send reply from server to host
2215 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2216 IP(src=server.ip4, dst=self.nat_addr) /
2217 TCP(sport=server_in_port, dport=host_out_port))
2218 self.pg0.add_stream(p)
2219 self.pg_enable_capture(self.pg_interfaces)
2221 capture = self.pg0.get_capture(1)
2226 self.assertEqual(ip.src, self.nat_addr)
2227 self.assertEqual(ip.dst, host.ip4)
2228 self.assertEqual(tcp.sport, server_out_port)
2229 self.assertEqual(tcp.dport, host_in_port)
2230 self.check_tcp_checksum(p)
2232 self.logger.error(ppp("Unexpected or invalid packet:", p))
2235 def test_hairpinning2(self):
2236 """ NAT44 hairpinning - 1:1 NAT"""
2238 server1_nat_ip = "10.0.0.10"
2239 server2_nat_ip = "10.0.0.11"
2240 host = self.pg0.remote_hosts[0]
2241 server1 = self.pg0.remote_hosts[1]
2242 server2 = self.pg0.remote_hosts[2]
2243 server_tcp_port = 22
2244 server_udp_port = 20
2246 self.nat44_add_address(self.nat_addr)
2247 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2248 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2251 # add static mapping for servers
2252 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2253 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2257 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2258 IP(src=host.ip4, dst=server1_nat_ip) /
2259 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2261 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2262 IP(src=host.ip4, dst=server1_nat_ip) /
2263 UDP(sport=self.udp_port_in, dport=server_udp_port))
2265 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2266 IP(src=host.ip4, dst=server1_nat_ip) /
2267 ICMP(id=self.icmp_id_in, type='echo-request'))
2269 self.pg0.add_stream(pkts)
2270 self.pg_enable_capture(self.pg_interfaces)
2272 capture = self.pg0.get_capture(len(pkts))
2273 for packet in capture:
2275 self.assertEqual(packet[IP].src, self.nat_addr)
2276 self.assertEqual(packet[IP].dst, server1.ip4)
2277 if packet.haslayer(TCP):
2278 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2279 self.assertEqual(packet[TCP].dport, server_tcp_port)
2280 self.tcp_port_out = packet[TCP].sport
2281 self.check_tcp_checksum(packet)
2282 elif packet.haslayer(UDP):
2283 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2284 self.assertEqual(packet[UDP].dport, server_udp_port)
2285 self.udp_port_out = packet[UDP].sport
2287 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2288 self.icmp_id_out = packet[ICMP].id
2290 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2295 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2296 IP(src=server1.ip4, dst=self.nat_addr) /
2297 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2299 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2300 IP(src=server1.ip4, dst=self.nat_addr) /
2301 UDP(sport=server_udp_port, dport=self.udp_port_out))
2303 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2304 IP(src=server1.ip4, dst=self.nat_addr) /
2305 ICMP(id=self.icmp_id_out, type='echo-reply'))
2307 self.pg0.add_stream(pkts)
2308 self.pg_enable_capture(self.pg_interfaces)
2310 capture = self.pg0.get_capture(len(pkts))
2311 for packet in capture:
2313 self.assertEqual(packet[IP].src, server1_nat_ip)
2314 self.assertEqual(packet[IP].dst, host.ip4)
2315 if packet.haslayer(TCP):
2316 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2317 self.assertEqual(packet[TCP].sport, server_tcp_port)
2318 self.check_tcp_checksum(packet)
2319 elif packet.haslayer(UDP):
2320 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2321 self.assertEqual(packet[UDP].sport, server_udp_port)
2323 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2325 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2328 # server2 to server1
2330 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2331 IP(src=server2.ip4, dst=server1_nat_ip) /
2332 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2334 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2335 IP(src=server2.ip4, dst=server1_nat_ip) /
2336 UDP(sport=self.udp_port_in, dport=server_udp_port))
2338 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2339 IP(src=server2.ip4, dst=server1_nat_ip) /
2340 ICMP(id=self.icmp_id_in, type='echo-request'))
2342 self.pg0.add_stream(pkts)
2343 self.pg_enable_capture(self.pg_interfaces)
2345 capture = self.pg0.get_capture(len(pkts))
2346 for packet in capture:
2348 self.assertEqual(packet[IP].src, server2_nat_ip)
2349 self.assertEqual(packet[IP].dst, server1.ip4)
2350 if packet.haslayer(TCP):
2351 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2352 self.assertEqual(packet[TCP].dport, server_tcp_port)
2353 self.tcp_port_out = packet[TCP].sport
2354 self.check_tcp_checksum(packet)
2355 elif packet.haslayer(UDP):
2356 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2357 self.assertEqual(packet[UDP].dport, server_udp_port)
2358 self.udp_port_out = packet[UDP].sport
2360 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2361 self.icmp_id_out = packet[ICMP].id
2363 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2366 # server1 to server2
2368 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2369 IP(src=server1.ip4, dst=server2_nat_ip) /
2370 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2372 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2373 IP(src=server1.ip4, dst=server2_nat_ip) /
2374 UDP(sport=server_udp_port, dport=self.udp_port_out))
2376 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2377 IP(src=server1.ip4, dst=server2_nat_ip) /
2378 ICMP(id=self.icmp_id_out, type='echo-reply'))
2380 self.pg0.add_stream(pkts)
2381 self.pg_enable_capture(self.pg_interfaces)
2383 capture = self.pg0.get_capture(len(pkts))
2384 for packet in capture:
2386 self.assertEqual(packet[IP].src, server1_nat_ip)
2387 self.assertEqual(packet[IP].dst, server2.ip4)
2388 if packet.haslayer(TCP):
2389 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2390 self.assertEqual(packet[TCP].sport, server_tcp_port)
2391 self.check_tcp_checksum(packet)
2392 elif packet.haslayer(UDP):
2393 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2394 self.assertEqual(packet[UDP].sport, server_udp_port)
2396 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2398 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2401 def test_max_translations_per_user(self):
2402 """ MAX translations per user - recycle the least recently used """
2404 self.nat44_add_address(self.nat_addr)
2405 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2406 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2409 # get maximum number of translations per user
2410 nat44_config = self.vapi.nat_show_config()
2412 # send more than maximum number of translations per user packets
2413 pkts_num = nat44_config.max_translations_per_user + 5
2415 for port in range(0, pkts_num):
2416 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2417 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2418 TCP(sport=1025 + port))
2420 self.pg0.add_stream(pkts)
2421 self.pg_enable_capture(self.pg_interfaces)
2424 # verify number of translated packet
2425 self.pg1.get_capture(pkts_num)
2427 def test_interface_addr(self):
2428 """ Acquire NAT44 addresses from interface """
2429 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2431 # no address in NAT pool
2432 adresses = self.vapi.nat44_address_dump()
2433 self.assertEqual(0, len(adresses))
2435 # configure interface address and check NAT address pool
2436 self.pg7.config_ip4()
2437 adresses = self.vapi.nat44_address_dump()
2438 self.assertEqual(1, len(adresses))
2439 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2441 # remove interface address and check NAT address pool
2442 self.pg7.unconfig_ip4()
2443 adresses = self.vapi.nat44_address_dump()
2444 self.assertEqual(0, len(adresses))
2446 def test_interface_addr_static_mapping(self):
2447 """ Static mapping with addresses from interface """
2450 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2451 self.nat44_add_static_mapping(
2453 external_sw_if_index=self.pg7.sw_if_index,
2456 # static mappings with external interface
2457 static_mappings = self.vapi.nat44_static_mapping_dump()
2458 self.assertEqual(1, len(static_mappings))
2459 self.assertEqual(self.pg7.sw_if_index,
2460 static_mappings[0].external_sw_if_index)
2461 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2463 # configure interface address and check static mappings
2464 self.pg7.config_ip4()
2465 static_mappings = self.vapi.nat44_static_mapping_dump()
2466 self.assertEqual(1, len(static_mappings))
2467 self.assertEqual(static_mappings[0].external_ip_address[0:4],
2468 self.pg7.local_ip4n)
2469 self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
2470 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2472 # remove interface address and check static mappings
2473 self.pg7.unconfig_ip4()
2474 static_mappings = self.vapi.nat44_static_mapping_dump()
2475 self.assertEqual(0, len(static_mappings))
2477 def test_interface_addr_identity_nat(self):
2478 """ Identity NAT with addresses from interface """
2481 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2482 self.vapi.nat44_add_del_identity_mapping(
2483 sw_if_index=self.pg7.sw_if_index,
2485 protocol=IP_PROTOS.tcp,
2488 # identity mappings with external interface
2489 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2490 self.assertEqual(1, len(identity_mappings))
2491 self.assertEqual(self.pg7.sw_if_index,
2492 identity_mappings[0].sw_if_index)
2494 # configure interface address and check identity mappings
2495 self.pg7.config_ip4()
2496 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2497 self.assertEqual(1, len(identity_mappings))
2498 self.assertEqual(identity_mappings[0].ip_address,
2499 self.pg7.local_ip4n)
2500 self.assertEqual(0xFFFFFFFF, identity_mappings[0].sw_if_index)
2501 self.assertEqual(port, identity_mappings[0].port)
2502 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2504 # remove interface address and check identity mappings
2505 self.pg7.unconfig_ip4()
2506 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2507 self.assertEqual(0, len(identity_mappings))
2509 def test_ipfix_nat44_sess(self):
2510 """ IPFIX logging NAT44 session created/delted """
2511 self.ipfix_domain_id = 10
2512 self.ipfix_src_port = 20202
2513 colector_port = 30303
2514 bind_layers(UDP, IPFIX, dport=30303)
2515 self.nat44_add_address(self.nat_addr)
2516 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2517 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2519 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2520 src_address=self.pg3.local_ip4n,
2522 template_interval=10,
2523 collector_port=colector_port)
2524 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2525 src_port=self.ipfix_src_port)
2527 pkts = self.create_stream_in(self.pg0, self.pg1)
2528 self.pg0.add_stream(pkts)
2529 self.pg_enable_capture(self.pg_interfaces)
2531 capture = self.pg1.get_capture(len(pkts))
2532 self.verify_capture_out(capture)
2533 self.nat44_add_address(self.nat_addr, is_add=0)
2534 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2535 capture = self.pg3.get_capture(9)
2536 ipfix = IPFIXDecoder()
2537 # first load template
2539 self.assertTrue(p.haslayer(IPFIX))
2540 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2541 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2542 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2543 self.assertEqual(p[UDP].dport, colector_port)
2544 self.assertEqual(p[IPFIX].observationDomainID,
2545 self.ipfix_domain_id)
2546 if p.haslayer(Template):
2547 ipfix.add_template(p.getlayer(Template))
2548 # verify events in data set
2550 if p.haslayer(Data):
2551 data = ipfix.decode_data_set(p.getlayer(Set))
2552 self.verify_ipfix_nat44_ses(data)
2554 def test_ipfix_addr_exhausted(self):
2555 """ IPFIX logging NAT addresses exhausted """
2556 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2557 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2559 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2560 src_address=self.pg3.local_ip4n,
2562 template_interval=10)
2563 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2564 src_port=self.ipfix_src_port)
2566 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2567 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2569 self.pg0.add_stream(p)
2570 self.pg_enable_capture(self.pg_interfaces)
2572 capture = self.pg1.get_capture(0)
2573 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2574 capture = self.pg3.get_capture(9)
2575 ipfix = IPFIXDecoder()
2576 # first load template
2578 self.assertTrue(p.haslayer(IPFIX))
2579 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2580 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2581 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2582 self.assertEqual(p[UDP].dport, 4739)
2583 self.assertEqual(p[IPFIX].observationDomainID,
2584 self.ipfix_domain_id)
2585 if p.haslayer(Template):
2586 ipfix.add_template(p.getlayer(Template))
2587 # verify events in data set
2589 if p.haslayer(Data):
2590 data = ipfix.decode_data_set(p.getlayer(Set))
2591 self.verify_ipfix_addr_exhausted(data)
2593 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2594 def test_ipfix_max_sessions(self):
2595 """ IPFIX logging maximum session entries exceeded """
2596 self.nat44_add_address(self.nat_addr)
2597 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2598 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2601 nat44_config = self.vapi.nat_show_config()
2602 max_sessions = 10 * nat44_config.translation_buckets
2605 for i in range(0, max_sessions):
2606 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2607 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2608 IP(src=src, dst=self.pg1.remote_ip4) /
2611 self.pg0.add_stream(pkts)
2612 self.pg_enable_capture(self.pg_interfaces)
2615 self.pg1.get_capture(max_sessions)
2616 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2617 src_address=self.pg3.local_ip4n,
2619 template_interval=10)
2620 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2621 src_port=self.ipfix_src_port)
2623 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2624 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2626 self.pg0.add_stream(p)
2627 self.pg_enable_capture(self.pg_interfaces)
2629 self.pg1.get_capture(0)
2630 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2631 capture = self.pg3.get_capture(9)
2632 ipfix = IPFIXDecoder()
2633 # first load template
2635 self.assertTrue(p.haslayer(IPFIX))
2636 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2637 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2638 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2639 self.assertEqual(p[UDP].dport, 4739)
2640 self.assertEqual(p[IPFIX].observationDomainID,
2641 self.ipfix_domain_id)
2642 if p.haslayer(Template):
2643 ipfix.add_template(p.getlayer(Template))
2644 # verify events in data set
2646 if p.haslayer(Data):
2647 data = ipfix.decode_data_set(p.getlayer(Set))
2648 self.verify_ipfix_max_sessions(data, max_sessions)
2650 def test_pool_addr_fib(self):
2651 """ NAT44 add pool addresses to FIB """
2652 static_addr = '10.0.0.10'
2653 self.nat44_add_address(self.nat_addr)
2654 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2655 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2657 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2660 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2661 ARP(op=ARP.who_has, pdst=self.nat_addr,
2662 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2663 self.pg1.add_stream(p)
2664 self.pg_enable_capture(self.pg_interfaces)
2666 capture = self.pg1.get_capture(1)
2667 self.assertTrue(capture[0].haslayer(ARP))
2668 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2671 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2672 ARP(op=ARP.who_has, pdst=static_addr,
2673 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2674 self.pg1.add_stream(p)
2675 self.pg_enable_capture(self.pg_interfaces)
2677 capture = self.pg1.get_capture(1)
2678 self.assertTrue(capture[0].haslayer(ARP))
2679 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2681 # send ARP to non-NAT44 interface
2682 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2683 ARP(op=ARP.who_has, pdst=self.nat_addr,
2684 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2685 self.pg2.add_stream(p)
2686 self.pg_enable_capture(self.pg_interfaces)
2688 capture = self.pg1.get_capture(0)
2690 # remove addresses and verify
2691 self.nat44_add_address(self.nat_addr, is_add=0)
2692 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2695 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2696 ARP(op=ARP.who_has, pdst=self.nat_addr,
2697 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2698 self.pg1.add_stream(p)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg1.get_capture(0)
2703 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2704 ARP(op=ARP.who_has, pdst=static_addr,
2705 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2706 self.pg1.add_stream(p)
2707 self.pg_enable_capture(self.pg_interfaces)
2709 capture = self.pg1.get_capture(0)
2711 def test_vrf_mode(self):
2712 """ NAT44 tenant VRF aware address pool mode """
2716 nat_ip1 = "10.0.0.10"
2717 nat_ip2 = "10.0.0.11"
2719 self.pg0.unconfig_ip4()
2720 self.pg1.unconfig_ip4()
2721 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2722 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2723 self.pg0.set_table_ip4(vrf_id1)
2724 self.pg1.set_table_ip4(vrf_id2)
2725 self.pg0.config_ip4()
2726 self.pg1.config_ip4()
2728 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2729 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2730 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2731 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2732 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2736 pkts = self.create_stream_in(self.pg0, self.pg2)
2737 self.pg0.add_stream(pkts)
2738 self.pg_enable_capture(self.pg_interfaces)
2740 capture = self.pg2.get_capture(len(pkts))
2741 self.verify_capture_out(capture, nat_ip1)
2744 pkts = self.create_stream_in(self.pg1, self.pg2)
2745 self.pg1.add_stream(pkts)
2746 self.pg_enable_capture(self.pg_interfaces)
2748 capture = self.pg2.get_capture(len(pkts))
2749 self.verify_capture_out(capture, nat_ip2)
2751 self.pg0.unconfig_ip4()
2752 self.pg1.unconfig_ip4()
2753 self.pg0.set_table_ip4(0)
2754 self.pg1.set_table_ip4(0)
2755 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2756 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2758 def test_vrf_feature_independent(self):
2759 """ NAT44 tenant VRF independent address pool mode """
2761 nat_ip1 = "10.0.0.10"
2762 nat_ip2 = "10.0.0.11"
2764 self.nat44_add_address(nat_ip1)
2765 self.nat44_add_address(nat_ip2, vrf_id=99)
2766 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2767 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2768 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2772 pkts = self.create_stream_in(self.pg0, self.pg2)
2773 self.pg0.add_stream(pkts)
2774 self.pg_enable_capture(self.pg_interfaces)
2776 capture = self.pg2.get_capture(len(pkts))
2777 self.verify_capture_out(capture, nat_ip1)
2780 pkts = self.create_stream_in(self.pg1, self.pg2)
2781 self.pg1.add_stream(pkts)
2782 self.pg_enable_capture(self.pg_interfaces)
2784 capture = self.pg2.get_capture(len(pkts))
2785 self.verify_capture_out(capture, nat_ip1)
2787 def test_dynamic_ipless_interfaces(self):
2788 """ NAT44 interfaces without configured IP address """
2790 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2791 mactobinary(self.pg7.remote_mac),
2792 self.pg7.remote_ip4n,
2794 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2795 mactobinary(self.pg8.remote_mac),
2796 self.pg8.remote_ip4n,
2799 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2800 dst_address_length=32,
2801 next_hop_address=self.pg7.remote_ip4n,
2802 next_hop_sw_if_index=self.pg7.sw_if_index)
2803 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2804 dst_address_length=32,
2805 next_hop_address=self.pg8.remote_ip4n,
2806 next_hop_sw_if_index=self.pg8.sw_if_index)
2808 self.nat44_add_address(self.nat_addr)
2809 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2810 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2814 pkts = self.create_stream_in(self.pg7, self.pg8)
2815 self.pg7.add_stream(pkts)
2816 self.pg_enable_capture(self.pg_interfaces)
2818 capture = self.pg8.get_capture(len(pkts))
2819 self.verify_capture_out(capture)
2822 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2823 self.pg8.add_stream(pkts)
2824 self.pg_enable_capture(self.pg_interfaces)
2826 capture = self.pg7.get_capture(len(pkts))
2827 self.verify_capture_in(capture, self.pg7)
2829 def test_static_ipless_interfaces(self):
2830 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2832 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2833 mactobinary(self.pg7.remote_mac),
2834 self.pg7.remote_ip4n,
2836 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2837 mactobinary(self.pg8.remote_mac),
2838 self.pg8.remote_ip4n,
2841 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2842 dst_address_length=32,
2843 next_hop_address=self.pg7.remote_ip4n,
2844 next_hop_sw_if_index=self.pg7.sw_if_index)
2845 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2846 dst_address_length=32,
2847 next_hop_address=self.pg8.remote_ip4n,
2848 next_hop_sw_if_index=self.pg8.sw_if_index)
2850 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2851 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2852 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2856 pkts = self.create_stream_out(self.pg8)
2857 self.pg8.add_stream(pkts)
2858 self.pg_enable_capture(self.pg_interfaces)
2860 capture = self.pg7.get_capture(len(pkts))
2861 self.verify_capture_in(capture, self.pg7)
2864 pkts = self.create_stream_in(self.pg7, self.pg8)
2865 self.pg7.add_stream(pkts)
2866 self.pg_enable_capture(self.pg_interfaces)
2868 capture = self.pg8.get_capture(len(pkts))
2869 self.verify_capture_out(capture, self.nat_addr, True)
2871 def test_static_with_port_ipless_interfaces(self):
2872 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2874 self.tcp_port_out = 30606
2875 self.udp_port_out = 30607
2876 self.icmp_id_out = 30608
2878 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2879 mactobinary(self.pg7.remote_mac),
2880 self.pg7.remote_ip4n,
2882 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2883 mactobinary(self.pg8.remote_mac),
2884 self.pg8.remote_ip4n,
2887 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2888 dst_address_length=32,
2889 next_hop_address=self.pg7.remote_ip4n,
2890 next_hop_sw_if_index=self.pg7.sw_if_index)
2891 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2892 dst_address_length=32,
2893 next_hop_address=self.pg8.remote_ip4n,
2894 next_hop_sw_if_index=self.pg8.sw_if_index)
2896 self.nat44_add_address(self.nat_addr)
2897 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2898 self.tcp_port_in, self.tcp_port_out,
2899 proto=IP_PROTOS.tcp)
2900 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2901 self.udp_port_in, self.udp_port_out,
2902 proto=IP_PROTOS.udp)
2903 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2904 self.icmp_id_in, self.icmp_id_out,
2905 proto=IP_PROTOS.icmp)
2906 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2907 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2911 pkts = self.create_stream_out(self.pg8)
2912 self.pg8.add_stream(pkts)
2913 self.pg_enable_capture(self.pg_interfaces)
2915 capture = self.pg7.get_capture(len(pkts))
2916 self.verify_capture_in(capture, self.pg7)
2919 pkts = self.create_stream_in(self.pg7, self.pg8)
2920 self.pg7.add_stream(pkts)
2921 self.pg_enable_capture(self.pg_interfaces)
2923 capture = self.pg8.get_capture(len(pkts))
2924 self.verify_capture_out(capture)
2926 def test_static_unknown_proto(self):
2927 """ 1:1 NAT translate packet with unknown protocol """
2928 nat_ip = "10.0.0.10"
2929 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2930 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2931 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2935 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2936 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2938 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2939 TCP(sport=1234, dport=1234))
2940 self.pg0.add_stream(p)
2941 self.pg_enable_capture(self.pg_interfaces)
2943 p = self.pg1.get_capture(1)
2946 self.assertEqual(packet[IP].src, nat_ip)
2947 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2948 self.assertTrue(packet.haslayer(GRE))
2949 self.check_ip_checksum(packet)
2951 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2955 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2956 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2958 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2959 TCP(sport=1234, dport=1234))
2960 self.pg1.add_stream(p)
2961 self.pg_enable_capture(self.pg_interfaces)
2963 p = self.pg0.get_capture(1)
2966 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2967 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2968 self.assertTrue(packet.haslayer(GRE))
2969 self.check_ip_checksum(packet)
2971 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2974 def test_hairpinning_static_unknown_proto(self):
2975 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2977 host = self.pg0.remote_hosts[0]
2978 server = self.pg0.remote_hosts[1]
2980 host_nat_ip = "10.0.0.10"
2981 server_nat_ip = "10.0.0.11"
2983 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2984 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2985 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2986 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2990 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2991 IP(src=host.ip4, dst=server_nat_ip) /
2993 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2994 TCP(sport=1234, dport=1234))
2995 self.pg0.add_stream(p)
2996 self.pg_enable_capture(self.pg_interfaces)
2998 p = self.pg0.get_capture(1)
3001 self.assertEqual(packet[IP].src, host_nat_ip)
3002 self.assertEqual(packet[IP].dst, server.ip4)
3003 self.assertTrue(packet.haslayer(GRE))
3004 self.check_ip_checksum(packet)
3006 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3010 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3011 IP(src=server.ip4, dst=host_nat_ip) /
3013 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3014 TCP(sport=1234, dport=1234))
3015 self.pg0.add_stream(p)
3016 self.pg_enable_capture(self.pg_interfaces)
3018 p = self.pg0.get_capture(1)
3021 self.assertEqual(packet[IP].src, server_nat_ip)
3022 self.assertEqual(packet[IP].dst, host.ip4)
3023 self.assertTrue(packet.haslayer(GRE))
3024 self.check_ip_checksum(packet)
3026 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3029 def test_unknown_proto(self):
3030 """ NAT44 translate packet with unknown protocol """
3031 self.nat44_add_address(self.nat_addr)
3032 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3033 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3037 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3038 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3039 TCP(sport=self.tcp_port_in, dport=20))
3040 self.pg0.add_stream(p)
3041 self.pg_enable_capture(self.pg_interfaces)
3043 p = self.pg1.get_capture(1)
3045 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3046 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3048 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3049 TCP(sport=1234, dport=1234))
3050 self.pg0.add_stream(p)
3051 self.pg_enable_capture(self.pg_interfaces)
3053 p = self.pg1.get_capture(1)
3056 self.assertEqual(packet[IP].src, self.nat_addr)
3057 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3058 self.assertTrue(packet.haslayer(GRE))
3059 self.check_ip_checksum(packet)
3061 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3065 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3066 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3068 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3069 TCP(sport=1234, dport=1234))
3070 self.pg1.add_stream(p)
3071 self.pg_enable_capture(self.pg_interfaces)
3073 p = self.pg0.get_capture(1)
3076 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3077 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3078 self.assertTrue(packet.haslayer(GRE))
3079 self.check_ip_checksum(packet)
3081 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3084 def test_hairpinning_unknown_proto(self):
3085 """ NAT44 translate packet with unknown protocol - hairpinning """
3086 host = self.pg0.remote_hosts[0]
3087 server = self.pg0.remote_hosts[1]
3090 server_in_port = 5678
3091 server_out_port = 8765
3092 server_nat_ip = "10.0.0.11"
3094 self.nat44_add_address(self.nat_addr)
3095 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3096 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3099 # add static mapping for server
3100 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3103 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3104 IP(src=host.ip4, dst=server_nat_ip) /
3105 TCP(sport=host_in_port, dport=server_out_port))
3106 self.pg0.add_stream(p)
3107 self.pg_enable_capture(self.pg_interfaces)
3109 capture = self.pg0.get_capture(1)
3111 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3112 IP(src=host.ip4, dst=server_nat_ip) /
3114 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3115 TCP(sport=1234, dport=1234))
3116 self.pg0.add_stream(p)
3117 self.pg_enable_capture(self.pg_interfaces)
3119 p = self.pg0.get_capture(1)
3122 self.assertEqual(packet[IP].src, self.nat_addr)
3123 self.assertEqual(packet[IP].dst, server.ip4)
3124 self.assertTrue(packet.haslayer(GRE))
3125 self.check_ip_checksum(packet)
3127 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3131 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3132 IP(src=server.ip4, dst=self.nat_addr) /
3134 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3135 TCP(sport=1234, dport=1234))
3136 self.pg0.add_stream(p)
3137 self.pg_enable_capture(self.pg_interfaces)
3139 p = self.pg0.get_capture(1)
3142 self.assertEqual(packet[IP].src, server_nat_ip)
3143 self.assertEqual(packet[IP].dst, host.ip4)
3144 self.assertTrue(packet.haslayer(GRE))
3145 self.check_ip_checksum(packet)
3147 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3150 def test_output_feature(self):
3151 """ NAT44 interface output feature (in2out postrouting) """
3152 self.nat44_add_address(self.nat_addr)
3153 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3154 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3155 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3159 pkts = self.create_stream_in(self.pg0, self.pg3)
3160 self.pg0.add_stream(pkts)
3161 self.pg_enable_capture(self.pg_interfaces)
3163 capture = self.pg3.get_capture(len(pkts))
3164 self.verify_capture_out(capture)
3167 pkts = self.create_stream_out(self.pg3)
3168 self.pg3.add_stream(pkts)
3169 self.pg_enable_capture(self.pg_interfaces)
3171 capture = self.pg0.get_capture(len(pkts))
3172 self.verify_capture_in(capture, self.pg0)
3174 # from non-NAT interface to NAT inside interface
3175 pkts = self.create_stream_in(self.pg2, self.pg0)
3176 self.pg2.add_stream(pkts)
3177 self.pg_enable_capture(self.pg_interfaces)
3179 capture = self.pg0.get_capture(len(pkts))
3180 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3182 def test_output_feature_vrf_aware(self):
3183 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3184 nat_ip_vrf10 = "10.0.0.10"
3185 nat_ip_vrf20 = "10.0.0.20"
3187 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3188 dst_address_length=32,
3189 next_hop_address=self.pg3.remote_ip4n,
3190 next_hop_sw_if_index=self.pg3.sw_if_index,
3192 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3193 dst_address_length=32,
3194 next_hop_address=self.pg3.remote_ip4n,
3195 next_hop_sw_if_index=self.pg3.sw_if_index,
3198 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3199 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3200 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3201 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3202 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3206 pkts = self.create_stream_in(self.pg4, self.pg3)
3207 self.pg4.add_stream(pkts)
3208 self.pg_enable_capture(self.pg_interfaces)
3210 capture = self.pg3.get_capture(len(pkts))
3211 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3214 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3215 self.pg3.add_stream(pkts)
3216 self.pg_enable_capture(self.pg_interfaces)
3218 capture = self.pg4.get_capture(len(pkts))
3219 self.verify_capture_in(capture, self.pg4)
3222 pkts = self.create_stream_in(self.pg6, self.pg3)
3223 self.pg6.add_stream(pkts)
3224 self.pg_enable_capture(self.pg_interfaces)
3226 capture = self.pg3.get_capture(len(pkts))
3227 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3230 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3231 self.pg3.add_stream(pkts)
3232 self.pg_enable_capture(self.pg_interfaces)
3234 capture = self.pg6.get_capture(len(pkts))
3235 self.verify_capture_in(capture, self.pg6)
3237 def test_output_feature_hairpinning(self):
3238 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3239 host = self.pg0.remote_hosts[0]
3240 server = self.pg0.remote_hosts[1]
3243 server_in_port = 5678
3244 server_out_port = 8765
3246 self.nat44_add_address(self.nat_addr)
3247 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3248 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3251 # add static mapping for server
3252 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3253 server_in_port, server_out_port,
3254 proto=IP_PROTOS.tcp)
3256 # send packet from host to server
3257 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3258 IP(src=host.ip4, dst=self.nat_addr) /
3259 TCP(sport=host_in_port, dport=server_out_port))
3260 self.pg0.add_stream(p)
3261 self.pg_enable_capture(self.pg_interfaces)
3263 capture = self.pg0.get_capture(1)
3268 self.assertEqual(ip.src, self.nat_addr)
3269 self.assertEqual(ip.dst, server.ip4)
3270 self.assertNotEqual(tcp.sport, host_in_port)
3271 self.assertEqual(tcp.dport, server_in_port)
3272 self.check_tcp_checksum(p)
3273 host_out_port = tcp.sport
3275 self.logger.error(ppp("Unexpected or invalid packet:", p))
3278 # send reply from server to host
3279 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3280 IP(src=server.ip4, dst=self.nat_addr) /
3281 TCP(sport=server_in_port, dport=host_out_port))
3282 self.pg0.add_stream(p)
3283 self.pg_enable_capture(self.pg_interfaces)
3285 capture = self.pg0.get_capture(1)
3290 self.assertEqual(ip.src, self.nat_addr)
3291 self.assertEqual(ip.dst, host.ip4)
3292 self.assertEqual(tcp.sport, server_out_port)
3293 self.assertEqual(tcp.dport, host_in_port)
3294 self.check_tcp_checksum(p)
3296 self.logger.error(ppp("Unexpected or invalid packet:", p))
3299 def test_output_feature_and_service(self):
3300 """ NAT44 interface output feature and services """
3301 external_addr = '1.2.3.4'
3305 self.vapi.nat44_forwarding_enable_disable(1)
3306 self.nat44_add_address(self.nat_addr)
3307 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3308 local_port, external_port,
3309 proto=IP_PROTOS.tcp, out2in_only=1)
3310 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3311 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3314 # from client to service
3315 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3316 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3317 TCP(sport=12345, dport=external_port))
3318 self.pg1.add_stream(p)
3319 self.pg_enable_capture(self.pg_interfaces)
3321 capture = self.pg0.get_capture(1)
3327 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3328 self.assertEqual(tcp.dport, local_port)
3329 self.check_tcp_checksum(p)
3330 self.check_ip_checksum(p)
3332 self.logger.error(ppp("Unexpected or invalid packet:", p))
3335 # from service back to client
3336 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3337 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3338 TCP(sport=local_port, dport=12345))
3339 self.pg0.add_stream(p)
3340 self.pg_enable_capture(self.pg_interfaces)
3342 capture = self.pg1.get_capture(1)
3347 self.assertEqual(ip.src, external_addr)
3348 self.assertEqual(tcp.sport, external_port)
3349 self.check_tcp_checksum(p)
3350 self.check_ip_checksum(p)
3352 self.logger.error(ppp("Unexpected or invalid packet:", p))
3355 # from local network host to external network
3356 pkts = self.create_stream_in(self.pg0, self.pg1)
3357 self.pg0.add_stream(pkts)
3358 self.pg_enable_capture(self.pg_interfaces)
3360 capture = self.pg1.get_capture(len(pkts))
3361 self.verify_capture_out(capture)
3362 pkts = self.create_stream_in(self.pg0, self.pg1)
3363 self.pg0.add_stream(pkts)
3364 self.pg_enable_capture(self.pg_interfaces)
3366 capture = self.pg1.get_capture(len(pkts))
3367 self.verify_capture_out(capture)
3369 # from external network back to local network host
3370 pkts = self.create_stream_out(self.pg1)
3371 self.pg1.add_stream(pkts)
3372 self.pg_enable_capture(self.pg_interfaces)
3374 capture = self.pg0.get_capture(len(pkts))
3375 self.verify_capture_in(capture, self.pg0)
3377 def test_output_feature_and_service2(self):
3378 """ NAT44 interface output feature and service host direct access """
3379 self.vapi.nat44_forwarding_enable_disable(1)
3380 self.nat44_add_address(self.nat_addr)
3381 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3384 # session initiaded from service host - translate
3385 pkts = self.create_stream_in(self.pg0, self.pg1)
3386 self.pg0.add_stream(pkts)
3387 self.pg_enable_capture(self.pg_interfaces)
3389 capture = self.pg1.get_capture(len(pkts))
3390 self.verify_capture_out(capture)
3392 pkts = self.create_stream_out(self.pg1)
3393 self.pg1.add_stream(pkts)
3394 self.pg_enable_capture(self.pg_interfaces)
3396 capture = self.pg0.get_capture(len(pkts))
3397 self.verify_capture_in(capture, self.pg0)
3399 tcp_port_out = self.tcp_port_out
3400 udp_port_out = self.udp_port_out
3401 icmp_id_out = self.icmp_id_out
3403 # session initiaded from remote host - do not translate
3404 pkts = self.create_stream_out(self.pg1,
3405 self.pg0.remote_ip4,
3406 use_inside_ports=True)
3407 self.pg1.add_stream(pkts)
3408 self.pg_enable_capture(self.pg_interfaces)
3410 capture = self.pg0.get_capture(len(pkts))
3411 self.verify_capture_in(capture, self.pg0)
3413 pkts = self.create_stream_in(self.pg0, self.pg1)
3414 self.pg0.add_stream(pkts)
3415 self.pg_enable_capture(self.pg_interfaces)
3417 capture = self.pg1.get_capture(len(pkts))
3418 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3421 def test_output_feature_and_service3(self):
3422 """ NAT44 interface output feature and DST NAT """
3423 external_addr = '1.2.3.4'
3427 self.vapi.nat44_forwarding_enable_disable(1)
3428 self.nat44_add_address(self.nat_addr)
3429 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3430 local_port, external_port,
3431 proto=IP_PROTOS.tcp, out2in_only=1)
3432 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3433 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3435 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3438 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3439 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3440 TCP(sport=12345, dport=external_port))
3441 self.pg0.add_stream(p)
3442 self.pg_enable_capture(self.pg_interfaces)
3444 capture = self.pg1.get_capture(1)
3449 self.assertEqual(ip.src, self.pg0.remote_ip4)
3450 self.assertEqual(tcp.sport, 12345)
3451 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3452 self.assertEqual(tcp.dport, local_port)
3453 self.check_tcp_checksum(p)
3454 self.check_ip_checksum(p)
3456 self.logger.error(ppp("Unexpected or invalid packet:", p))
3459 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3460 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3461 TCP(sport=local_port, dport=12345))
3462 self.pg1.add_stream(p)
3463 self.pg_enable_capture(self.pg_interfaces)
3465 capture = self.pg0.get_capture(1)
3470 self.assertEqual(ip.src, external_addr)
3471 self.assertEqual(tcp.sport, external_port)
3472 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3473 self.assertEqual(tcp.dport, 12345)
3474 self.check_tcp_checksum(p)
3475 self.check_ip_checksum(p)
3477 self.logger.error(ppp("Unexpected or invalid packet:", p))
3480 def test_one_armed_nat44(self):
3481 """ One armed NAT44 """
3482 remote_host = self.pg9.remote_hosts[0]
3483 local_host = self.pg9.remote_hosts[1]
3486 self.nat44_add_address(self.nat_addr)
3487 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3488 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3492 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3493 IP(src=local_host.ip4, dst=remote_host.ip4) /
3494 TCP(sport=12345, dport=80))
3495 self.pg9.add_stream(p)
3496 self.pg_enable_capture(self.pg_interfaces)
3498 capture = self.pg9.get_capture(1)
3503 self.assertEqual(ip.src, self.nat_addr)
3504 self.assertEqual(ip.dst, remote_host.ip4)
3505 self.assertNotEqual(tcp.sport, 12345)
3506 external_port = tcp.sport
3507 self.assertEqual(tcp.dport, 80)
3508 self.check_tcp_checksum(p)
3509 self.check_ip_checksum(p)
3511 self.logger.error(ppp("Unexpected or invalid packet:", p))
3515 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3516 IP(src=remote_host.ip4, dst=self.nat_addr) /
3517 TCP(sport=80, dport=external_port))
3518 self.pg9.add_stream(p)
3519 self.pg_enable_capture(self.pg_interfaces)
3521 capture = self.pg9.get_capture(1)
3526 self.assertEqual(ip.src, remote_host.ip4)
3527 self.assertEqual(ip.dst, local_host.ip4)
3528 self.assertEqual(tcp.sport, 80)
3529 self.assertEqual(tcp.dport, 12345)
3530 self.check_tcp_checksum(p)
3531 self.check_ip_checksum(p)
3533 self.logger.error(ppp("Unexpected or invalid packet:", p))
3536 def test_one_armed_nat44_static(self):
3537 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3538 remote_host = self.pg9.remote_hosts[0]
3539 local_host = self.pg9.remote_hosts[1]
3544 self.vapi.nat44_forwarding_enable_disable(1)
3545 self.nat44_add_address(self.nat_addr, twice_nat=1)
3546 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3547 local_port, external_port,
3548 proto=IP_PROTOS.tcp, out2in_only=1,
3550 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3551 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3554 # from client to service
3555 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3556 IP(src=remote_host.ip4, dst=self.nat_addr) /
3557 TCP(sport=12345, dport=external_port))
3558 self.pg9.add_stream(p)
3559 self.pg_enable_capture(self.pg_interfaces)
3561 capture = self.pg9.get_capture(1)
3567 self.assertEqual(ip.dst, local_host.ip4)
3568 self.assertEqual(ip.src, self.nat_addr)
3569 self.assertEqual(tcp.dport, local_port)
3570 self.assertNotEqual(tcp.sport, 12345)
3571 eh_port_in = tcp.sport
3572 self.check_tcp_checksum(p)
3573 self.check_ip_checksum(p)
3575 self.logger.error(ppp("Unexpected or invalid packet:", p))
3578 # from service back to client
3579 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3580 IP(src=local_host.ip4, dst=self.nat_addr) /
3581 TCP(sport=local_port, dport=eh_port_in))
3582 self.pg9.add_stream(p)
3583 self.pg_enable_capture(self.pg_interfaces)
3585 capture = self.pg9.get_capture(1)
3590 self.assertEqual(ip.src, self.nat_addr)
3591 self.assertEqual(ip.dst, remote_host.ip4)
3592 self.assertEqual(tcp.sport, external_port)
3593 self.assertEqual(tcp.dport, 12345)
3594 self.check_tcp_checksum(p)
3595 self.check_ip_checksum(p)
3597 self.logger.error(ppp("Unexpected or invalid packet:", p))
3600 def test_del_session(self):
3601 """ Delete NAT44 session """
3602 self.nat44_add_address(self.nat_addr)
3603 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3604 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3607 pkts = self.create_stream_in(self.pg0, self.pg1)
3608 self.pg0.add_stream(pkts)
3609 self.pg_enable_capture(self.pg_interfaces)
3611 capture = self.pg1.get_capture(len(pkts))
3613 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3614 nsessions = len(sessions)
3616 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3617 sessions[0].inside_port,
3618 sessions[0].protocol)
3619 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3620 sessions[1].outside_port,
3621 sessions[1].protocol,
3624 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3625 self.assertEqual(nsessions - len(sessions), 2)
3627 def test_set_get_reass(self):
3628 """ NAT44 set/get virtual fragmentation reassembly """
3629 reas_cfg1 = self.vapi.nat_get_reass()
3631 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3632 max_reass=reas_cfg1.ip4_max_reass * 2,
3633 max_frag=reas_cfg1.ip4_max_frag * 2)
3635 reas_cfg2 = self.vapi.nat_get_reass()
3637 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3638 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3639 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3641 self.vapi.nat_set_reass(drop_frag=1)
3642 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3644 def test_frag_in_order(self):
3645 """ NAT44 translate fragments arriving in order """
3646 self.nat44_add_address(self.nat_addr)
3647 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3648 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3651 data = "A" * 4 + "B" * 16 + "C" * 3
3652 self.tcp_port_in = random.randint(1025, 65535)
3654 reass = self.vapi.nat_reass_dump()
3655 reass_n_start = len(reass)
3658 pkts = self.create_stream_frag(self.pg0,
3659 self.pg1.remote_ip4,
3663 self.pg0.add_stream(pkts)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 frags = self.pg1.get_capture(len(pkts))
3667 p = self.reass_frags_and_verify(frags,
3669 self.pg1.remote_ip4)
3670 self.assertEqual(p[TCP].dport, 20)
3671 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3672 self.tcp_port_out = p[TCP].sport
3673 self.assertEqual(data, p[Raw].load)
3676 pkts = self.create_stream_frag(self.pg1,
3681 self.pg1.add_stream(pkts)
3682 self.pg_enable_capture(self.pg_interfaces)
3684 frags = self.pg0.get_capture(len(pkts))
3685 p = self.reass_frags_and_verify(frags,
3686 self.pg1.remote_ip4,
3687 self.pg0.remote_ip4)
3688 self.assertEqual(p[TCP].sport, 20)
3689 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3690 self.assertEqual(data, p[Raw].load)
3692 reass = self.vapi.nat_reass_dump()
3693 reass_n_end = len(reass)
3695 self.assertEqual(reass_n_end - reass_n_start, 2)
3697 def test_reass_hairpinning(self):
3698 """ NAT44 fragments hairpinning """
3699 host = self.pg0.remote_hosts[0]
3700 server = self.pg0.remote_hosts[1]
3701 host_in_port = random.randint(1025, 65535)
3703 server_in_port = random.randint(1025, 65535)
3704 server_out_port = random.randint(1025, 65535)
3705 data = "A" * 4 + "B" * 16 + "C" * 3
3707 self.nat44_add_address(self.nat_addr)
3708 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3709 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3711 # add static mapping for server
3712 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3713 server_in_port, server_out_port,
3714 proto=IP_PROTOS.tcp)
3716 # send packet from host to server
3717 pkts = self.create_stream_frag(self.pg0,
3722 self.pg0.add_stream(pkts)
3723 self.pg_enable_capture(self.pg_interfaces)
3725 frags = self.pg0.get_capture(len(pkts))
3726 p = self.reass_frags_and_verify(frags,
3729 self.assertNotEqual(p[TCP].sport, host_in_port)
3730 self.assertEqual(p[TCP].dport, server_in_port)
3731 self.assertEqual(data, p[Raw].load)
3733 def test_frag_out_of_order(self):
3734 """ NAT44 translate fragments arriving out of order """
3735 self.nat44_add_address(self.nat_addr)
3736 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3737 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3740 data = "A" * 4 + "B" * 16 + "C" * 3
3741 random.randint(1025, 65535)
3744 pkts = self.create_stream_frag(self.pg0,
3745 self.pg1.remote_ip4,
3750 self.pg0.add_stream(pkts)
3751 self.pg_enable_capture(self.pg_interfaces)
3753 frags = self.pg1.get_capture(len(pkts))
3754 p = self.reass_frags_and_verify(frags,
3756 self.pg1.remote_ip4)
3757 self.assertEqual(p[TCP].dport, 20)
3758 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3759 self.tcp_port_out = p[TCP].sport
3760 self.assertEqual(data, p[Raw].load)
3763 pkts = self.create_stream_frag(self.pg1,
3769 self.pg1.add_stream(pkts)
3770 self.pg_enable_capture(self.pg_interfaces)
3772 frags = self.pg0.get_capture(len(pkts))
3773 p = self.reass_frags_and_verify(frags,
3774 self.pg1.remote_ip4,
3775 self.pg0.remote_ip4)
3776 self.assertEqual(p[TCP].sport, 20)
3777 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3778 self.assertEqual(data, p[Raw].load)
3780 def test_port_restricted(self):
3781 """ Port restricted NAT44 (MAP-E CE) """
3782 self.nat44_add_address(self.nat_addr)
3783 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3784 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3786 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3787 "psid-offset 6 psid-len 6")
3789 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3790 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3791 TCP(sport=4567, dport=22))
3792 self.pg0.add_stream(p)
3793 self.pg_enable_capture(self.pg_interfaces)
3795 capture = self.pg1.get_capture(1)
3800 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3801 self.assertEqual(ip.src, self.nat_addr)
3802 self.assertEqual(tcp.dport, 22)
3803 self.assertNotEqual(tcp.sport, 4567)
3804 self.assertEqual((tcp.sport >> 6) & 63, 10)
3805 self.check_tcp_checksum(p)
3806 self.check_ip_checksum(p)
3808 self.logger.error(ppp("Unexpected or invalid packet:", p))
3811 def test_twice_nat(self):
3813 twice_nat_addr = '10.0.1.3'
3818 self.nat44_add_address(self.nat_addr)
3819 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3820 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
3821 port_in, port_out, proto=IP_PROTOS.tcp,
3823 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3824 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3827 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3828 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3829 TCP(sport=eh_port_out, dport=port_out))
3830 self.pg1.add_stream(p)
3831 self.pg_enable_capture(self.pg_interfaces)
3833 capture = self.pg0.get_capture(1)
3838 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3839 self.assertEqual(ip.src, twice_nat_addr)
3840 self.assertEqual(tcp.dport, port_in)
3841 self.assertNotEqual(tcp.sport, eh_port_out)
3842 eh_port_in = tcp.sport
3843 self.check_tcp_checksum(p)
3844 self.check_ip_checksum(p)
3846 self.logger.error(ppp("Unexpected or invalid packet:", p))
3849 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3850 IP(src=self.pg0.remote_ip4, dst=twice_nat_addr) /
3851 TCP(sport=port_in, dport=eh_port_in))
3852 self.pg0.add_stream(p)
3853 self.pg_enable_capture(self.pg_interfaces)
3855 capture = self.pg1.get_capture(1)
3860 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3861 self.assertEqual(ip.src, self.nat_addr)
3862 self.assertEqual(tcp.dport, eh_port_out)
3863 self.assertEqual(tcp.sport, port_out)
3864 self.check_tcp_checksum(p)
3865 self.check_ip_checksum(p)
3867 self.logger.error(ppp("Unexpected or invalid packet:", p))
3870 def test_twice_nat_lb(self):
3871 """ Twice NAT44 local service load balancing """
3872 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3873 twice_nat_addr = '10.0.1.3'
3878 server1 = self.pg0.remote_hosts[0]
3879 server2 = self.pg0.remote_hosts[1]
3881 locals = [{'addr': server1.ip4n,
3884 {'addr': server2.ip4n,
3888 self.nat44_add_address(self.nat_addr)
3889 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3891 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
3895 local_num=len(locals),
3897 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3898 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3901 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3902 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3903 TCP(sport=eh_port_out, dport=external_port))
3904 self.pg1.add_stream(p)
3905 self.pg_enable_capture(self.pg_interfaces)
3907 capture = self.pg0.get_capture(1)
3913 self.assertEqual(ip.src, twice_nat_addr)
3914 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
3915 if ip.dst == server1.ip4:
3919 self.assertNotEqual(tcp.sport, eh_port_out)
3920 eh_port_in = tcp.sport
3921 self.assertEqual(tcp.dport, local_port)
3922 self.check_tcp_checksum(p)
3923 self.check_ip_checksum(p)
3925 self.logger.error(ppp("Unexpected or invalid packet:", p))
3928 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3929 IP(src=server.ip4, dst=twice_nat_addr) /
3930 TCP(sport=local_port, dport=eh_port_in))
3931 self.pg0.add_stream(p)
3932 self.pg_enable_capture(self.pg_interfaces)
3934 capture = self.pg1.get_capture(1)
3939 self.assertEqual(ip.src, self.nat_addr)
3940 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3941 self.assertEqual(tcp.sport, external_port)
3942 self.assertEqual(tcp.dport, eh_port_out)
3943 self.check_tcp_checksum(p)
3944 self.check_ip_checksum(p)
3946 self.logger.error(ppp("Unexpected or invalid packet:", p))
3949 def test_twice_nat_interface_addr(self):
3950 """ Acquire twice NAT44 addresses from interface """
3951 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3953 # no address in NAT pool
3954 adresses = self.vapi.nat44_address_dump()
3955 self.assertEqual(0, len(adresses))
3957 # configure interface address and check NAT address pool
3958 self.pg7.config_ip4()
3959 adresses = self.vapi.nat44_address_dump()
3960 self.assertEqual(1, len(adresses))
3961 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3962 self.assertEqual(adresses[0].twice_nat, 1)
3964 # remove interface address and check NAT address pool
3965 self.pg7.unconfig_ip4()
3966 adresses = self.vapi.nat44_address_dump()
3967 self.assertEqual(0, len(adresses))
3969 def test_ipfix_max_frags(self):
3970 """ IPFIX logging maximum fragments pending reassembly exceeded """
3971 self.nat44_add_address(self.nat_addr)
3972 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3973 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3975 self.vapi.nat_set_reass(max_frag=0)
3976 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
3977 src_address=self.pg3.local_ip4n,
3979 template_interval=10)
3980 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
3981 src_port=self.ipfix_src_port)
3983 data = "A" * 4 + "B" * 16 + "C" * 3
3984 self.tcp_port_in = random.randint(1025, 65535)
3985 pkts = self.create_stream_frag(self.pg0,
3986 self.pg1.remote_ip4,
3990 self.pg0.add_stream(pkts[-1])
3991 self.pg_enable_capture(self.pg_interfaces)
3993 frags = self.pg1.get_capture(0)
3994 self.vapi.cli("ipfix flush") # FIXME this should be an API call
3995 capture = self.pg3.get_capture(9)
3996 ipfix = IPFIXDecoder()
3997 # first load template
3999 self.assertTrue(p.haslayer(IPFIX))
4000 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4001 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4002 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4003 self.assertEqual(p[UDP].dport, 4739)
4004 self.assertEqual(p[IPFIX].observationDomainID,
4005 self.ipfix_domain_id)
4006 if p.haslayer(Template):
4007 ipfix.add_template(p.getlayer(Template))
4008 # verify events in data set
4010 if p.haslayer(Data):
4011 data = ipfix.decode_data_set(p.getlayer(Set))
4012 self.verify_ipfix_max_fragments_ip4(data, 0,
4013 self.pg0.remote_ip4n)
4016 super(TestNAT44, self).tearDown()
4017 if not self.vpp_dead:
4018 self.logger.info(self.vapi.cli("show nat44 addresses"))
4019 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4020 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4021 self.logger.info(self.vapi.cli("show nat44 interface address"))
4022 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4023 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4024 self.vapi.cli("nat addr-port-assignment-alg default")
4028 class TestNAT44Out2InDPO(MethodHolder):
4029 """ NAT44 Test Cases using out2in DPO """
4032 def setUpConstants(cls):
4033 super(TestNAT44Out2InDPO, cls).setUpConstants()
4034 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4037 def setUpClass(cls):
4038 super(TestNAT44Out2InDPO, cls).setUpClass()
4041 cls.tcp_port_in = 6303
4042 cls.tcp_port_out = 6303
4043 cls.udp_port_in = 6304
4044 cls.udp_port_out = 6304
4045 cls.icmp_id_in = 6305
4046 cls.icmp_id_out = 6305
4047 cls.nat_addr = '10.0.0.3'
4048 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4049 cls.dst_ip4 = '192.168.70.1'
4051 cls.create_pg_interfaces(range(2))
4054 cls.pg0.config_ip4()
4055 cls.pg0.resolve_arp()
4058 cls.pg1.config_ip6()
4059 cls.pg1.resolve_ndp()
4061 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4062 dst_address_length=0,
4063 next_hop_address=cls.pg1.remote_ip6n,
4064 next_hop_sw_if_index=cls.pg1.sw_if_index)
4067 super(TestNAT44Out2InDPO, cls).tearDownClass()
4070 def configure_xlat(self):
4071 self.dst_ip6_pfx = '1:2:3::'
4072 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4074 self.dst_ip6_pfx_len = 96
4075 self.src_ip6_pfx = '4:5:6::'
4076 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4078 self.src_ip6_pfx_len = 96
4079 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4080 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4081 '\x00\x00\x00\x00', 0, is_translation=1,
4084 def test_464xlat_ce(self):
4085 """ Test 464XLAT CE with NAT44 """
4087 self.configure_xlat()
4089 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4090 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4092 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4093 self.dst_ip6_pfx_len)
4094 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4095 self.src_ip6_pfx_len)
4098 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4099 self.pg0.add_stream(pkts)
4100 self.pg_enable_capture(self.pg_interfaces)
4102 capture = self.pg1.get_capture(len(pkts))
4103 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4106 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4108 self.pg1.add_stream(pkts)
4109 self.pg_enable_capture(self.pg_interfaces)
4111 capture = self.pg0.get_capture(len(pkts))
4112 self.verify_capture_in(capture, self.pg0)
4114 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4116 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4117 self.nat_addr_n, is_add=0)
4119 def test_464xlat_ce_no_nat(self):
4120 """ Test 464XLAT CE without NAT44 """
4122 self.configure_xlat()
4124 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4125 self.dst_ip6_pfx_len)
4126 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4127 self.src_ip6_pfx_len)
4129 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4130 self.pg0.add_stream(pkts)
4131 self.pg_enable_capture(self.pg_interfaces)
4133 capture = self.pg1.get_capture(len(pkts))
4134 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4135 nat_ip=out_dst_ip6, same_port=True)
4137 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4138 self.pg1.add_stream(pkts)
4139 self.pg_enable_capture(self.pg_interfaces)
4141 capture = self.pg0.get_capture(len(pkts))
4142 self.verify_capture_in(capture, self.pg0)
4145 class TestDeterministicNAT(MethodHolder):
4146 """ Deterministic NAT Test Cases """
4149 def setUpConstants(cls):
4150 super(TestDeterministicNAT, cls).setUpConstants()
4151 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4154 def setUpClass(cls):
4155 super(TestDeterministicNAT, cls).setUpClass()
4158 cls.tcp_port_in = 6303
4159 cls.tcp_external_port = 6303
4160 cls.udp_port_in = 6304
4161 cls.udp_external_port = 6304
4162 cls.icmp_id_in = 6305
4163 cls.nat_addr = '10.0.0.3'
4165 cls.create_pg_interfaces(range(3))
4166 cls.interfaces = list(cls.pg_interfaces)
4168 for i in cls.interfaces:
4173 cls.pg0.generate_remote_hosts(2)
4174 cls.pg0.configure_ipv4_neighbors()
4177 super(TestDeterministicNAT, cls).tearDownClass()
4180 def create_stream_in(self, in_if, out_if, ttl=64):
4182 Create packet stream for inside network
4184 :param in_if: Inside interface
4185 :param out_if: Outside interface
4186 :param ttl: TTL of generated packets
4190 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4191 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4192 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4196 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4197 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4198 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4202 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4203 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4204 ICMP(id=self.icmp_id_in, type='echo-request'))
4209 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4211 Create packet stream for outside network
4213 :param out_if: Outside interface
4214 :param dst_ip: Destination IP address (Default use global NAT address)
4215 :param ttl: TTL of generated packets
4218 dst_ip = self.nat_addr
4221 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4222 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4223 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4227 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4228 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4229 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4233 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4234 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4235 ICMP(id=self.icmp_external_id, type='echo-reply'))
4240 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4242 Verify captured packets on outside network
4244 :param capture: Captured packets
4245 :param nat_ip: Translated IP address (Default use global NAT address)
4246 :param same_port: Sorce port number is not translated (Default False)
4247 :param packet_num: Expected number of packets (Default 3)
4250 nat_ip = self.nat_addr
4251 self.assertEqual(packet_num, len(capture))
4252 for packet in capture:
4254 self.assertEqual(packet[IP].src, nat_ip)
4255 if packet.haslayer(TCP):
4256 self.tcp_port_out = packet[TCP].sport
4257 elif packet.haslayer(UDP):
4258 self.udp_port_out = packet[UDP].sport
4260 self.icmp_external_id = packet[ICMP].id
4262 self.logger.error(ppp("Unexpected or invalid packet "
4263 "(outside network):", packet))
4266 def initiate_tcp_session(self, in_if, out_if):
4268 Initiates TCP session
4270 :param in_if: Inside interface
4271 :param out_if: Outside interface
4274 # SYN packet in->out
4275 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4276 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4277 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4280 self.pg_enable_capture(self.pg_interfaces)
4282 capture = out_if.get_capture(1)
4284 self.tcp_port_out = p[TCP].sport
4286 # SYN + ACK packet out->in
4287 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
4288 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
4289 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4291 out_if.add_stream(p)
4292 self.pg_enable_capture(self.pg_interfaces)
4294 in_if.get_capture(1)
4296 # ACK packet in->out
4297 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
4298 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
4299 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4302 self.pg_enable_capture(self.pg_interfaces)
4304 out_if.get_capture(1)
4307 self.logger.error("TCP 3 way handshake failed")
4310 def verify_ipfix_max_entries_per_user(self, data):
4312 Verify IPFIX maximum entries per user exceeded event
4314 :param data: Decoded IPFIX data records
4316 self.assertEqual(1, len(data))
4319 self.assertEqual(ord(record[230]), 13)
4320 # natQuotaExceededEvent
4321 self.assertEqual('\x03\x00\x00\x00', record[466])
4323 self.assertEqual('\xe8\x03\x00\x00', record[473])
4325 self.assertEqual(self.pg0.remote_ip4n, record[8])
4327 def test_deterministic_mode(self):
4328 """ NAT plugin run deterministic mode """
4329 in_addr = '172.16.255.0'
4330 out_addr = '172.17.255.50'
4331 in_addr_t = '172.16.255.20'
4332 in_addr_n = socket.inet_aton(in_addr)
4333 out_addr_n = socket.inet_aton(out_addr)
4334 in_addr_t_n = socket.inet_aton(in_addr_t)
4338 nat_config = self.vapi.nat_show_config()
4339 self.assertEqual(1, nat_config.deterministic)
4341 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4343 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4344 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4345 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4346 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4348 deterministic_mappings = self.vapi.nat_det_map_dump()
4349 self.assertEqual(len(deterministic_mappings), 1)
4350 dsm = deterministic_mappings[0]
4351 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4352 self.assertEqual(in_plen, dsm.in_plen)
4353 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4354 self.assertEqual(out_plen, dsm.out_plen)
4356 self.clear_nat_det()
4357 deterministic_mappings = self.vapi.nat_det_map_dump()
4358 self.assertEqual(len(deterministic_mappings), 0)
4360 def test_set_timeouts(self):
4361 """ Set deterministic NAT timeouts """
4362 timeouts_before = self.vapi.nat_det_get_timeouts()
4364 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4365 timeouts_before.tcp_established + 10,
4366 timeouts_before.tcp_transitory + 10,
4367 timeouts_before.icmp + 10)
4369 timeouts_after = self.vapi.nat_det_get_timeouts()
4371 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4372 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4373 self.assertNotEqual(timeouts_before.tcp_established,
4374 timeouts_after.tcp_established)
4375 self.assertNotEqual(timeouts_before.tcp_transitory,
4376 timeouts_after.tcp_transitory)
4378 def test_det_in(self):
4379 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4381 nat_ip = "10.0.0.10"
4383 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4385 socket.inet_aton(nat_ip),
4387 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4388 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4392 pkts = self.create_stream_in(self.pg0, self.pg1)
4393 self.pg0.add_stream(pkts)
4394 self.pg_enable_capture(self.pg_interfaces)
4396 capture = self.pg1.get_capture(len(pkts))
4397 self.verify_capture_out(capture, nat_ip)
4400 pkts = self.create_stream_out(self.pg1, nat_ip)
4401 self.pg1.add_stream(pkts)
4402 self.pg_enable_capture(self.pg_interfaces)
4404 capture = self.pg0.get_capture(len(pkts))
4405 self.verify_capture_in(capture, self.pg0)
4408 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4409 self.assertEqual(len(sessions), 3)
4413 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4414 self.assertEqual(s.in_port, self.tcp_port_in)
4415 self.assertEqual(s.out_port, self.tcp_port_out)
4416 self.assertEqual(s.ext_port, self.tcp_external_port)
4420 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4421 self.assertEqual(s.in_port, self.udp_port_in)
4422 self.assertEqual(s.out_port, self.udp_port_out)
4423 self.assertEqual(s.ext_port, self.udp_external_port)
4427 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4428 self.assertEqual(s.in_port, self.icmp_id_in)
4429 self.assertEqual(s.out_port, self.icmp_external_id)
4431 def test_multiple_users(self):
4432 """ Deterministic NAT multiple users """
4434 nat_ip = "10.0.0.10"
4436 external_port = 6303
4438 host0 = self.pg0.remote_hosts[0]
4439 host1 = self.pg0.remote_hosts[1]
4441 self.vapi.nat_det_add_del_map(host0.ip4n,
4443 socket.inet_aton(nat_ip),
4445 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4446 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4450 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4451 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4452 TCP(sport=port_in, dport=external_port))
4453 self.pg0.add_stream(p)
4454 self.pg_enable_capture(self.pg_interfaces)
4456 capture = self.pg1.get_capture(1)
4461 self.assertEqual(ip.src, nat_ip)
4462 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4463 self.assertEqual(tcp.dport, external_port)
4464 port_out0 = tcp.sport
4466 self.logger.error(ppp("Unexpected or invalid packet:", p))
4470 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4471 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4472 TCP(sport=port_in, dport=external_port))
4473 self.pg0.add_stream(p)
4474 self.pg_enable_capture(self.pg_interfaces)
4476 capture = self.pg1.get_capture(1)
4481 self.assertEqual(ip.src, nat_ip)
4482 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4483 self.assertEqual(tcp.dport, external_port)
4484 port_out1 = tcp.sport
4486 self.logger.error(ppp("Unexpected or invalid packet:", p))
4489 dms = self.vapi.nat_det_map_dump()
4490 self.assertEqual(1, len(dms))
4491 self.assertEqual(2, dms[0].ses_num)
4494 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4495 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4496 TCP(sport=external_port, dport=port_out0))
4497 self.pg1.add_stream(p)
4498 self.pg_enable_capture(self.pg_interfaces)
4500 capture = self.pg0.get_capture(1)
4505 self.assertEqual(ip.src, self.pg1.remote_ip4)
4506 self.assertEqual(ip.dst, host0.ip4)
4507 self.assertEqual(tcp.dport, port_in)
4508 self.assertEqual(tcp.sport, external_port)
4510 self.logger.error(ppp("Unexpected or invalid packet:", p))
4514 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4515 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4516 TCP(sport=external_port, dport=port_out1))
4517 self.pg1.add_stream(p)
4518 self.pg_enable_capture(self.pg_interfaces)
4520 capture = self.pg0.get_capture(1)
4525 self.assertEqual(ip.src, self.pg1.remote_ip4)
4526 self.assertEqual(ip.dst, host1.ip4)
4527 self.assertEqual(tcp.dport, port_in)
4528 self.assertEqual(tcp.sport, external_port)
4530 self.logger.error(ppp("Unexpected or invalid packet", p))
4533 # session close api test
4534 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4536 self.pg1.remote_ip4n,
4538 dms = self.vapi.nat_det_map_dump()
4539 self.assertEqual(dms[0].ses_num, 1)
4541 self.vapi.nat_det_close_session_in(host0.ip4n,
4543 self.pg1.remote_ip4n,
4545 dms = self.vapi.nat_det_map_dump()
4546 self.assertEqual(dms[0].ses_num, 0)
4548 def test_tcp_session_close_detection_in(self):
4549 """ Deterministic NAT TCP session close from inside network """
4550 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4552 socket.inet_aton(self.nat_addr),
4554 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4555 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4558 self.initiate_tcp_session(self.pg0, self.pg1)
4560 # close the session from inside
4562 # FIN packet in -> out
4563 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4564 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4565 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4567 self.pg0.add_stream(p)
4568 self.pg_enable_capture(self.pg_interfaces)
4570 self.pg1.get_capture(1)
4574 # ACK packet out -> in
4575 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4576 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4577 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4581 # FIN packet out -> in
4582 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4583 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4584 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4588 self.pg1.add_stream(pkts)
4589 self.pg_enable_capture(self.pg_interfaces)
4591 self.pg0.get_capture(2)
4593 # ACK packet in -> out
4594 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4595 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4596 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4598 self.pg0.add_stream(p)
4599 self.pg_enable_capture(self.pg_interfaces)
4601 self.pg1.get_capture(1)
4603 # Check if deterministic NAT44 closed the session
4604 dms = self.vapi.nat_det_map_dump()
4605 self.assertEqual(0, dms[0].ses_num)
4607 self.logger.error("TCP session termination failed")
4610 def test_tcp_session_close_detection_out(self):
4611 """ Deterministic NAT TCP session close from outside network """
4612 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4614 socket.inet_aton(self.nat_addr),
4616 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4617 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4620 self.initiate_tcp_session(self.pg0, self.pg1)
4622 # close the session from outside
4624 # FIN packet out -> in
4625 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4626 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4627 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4629 self.pg1.add_stream(p)
4630 self.pg_enable_capture(self.pg_interfaces)
4632 self.pg0.get_capture(1)
4636 # ACK packet in -> out
4637 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4638 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4639 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4643 # ACK packet in -> out
4644 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4645 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4646 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4650 self.pg0.add_stream(pkts)
4651 self.pg_enable_capture(self.pg_interfaces)
4653 self.pg1.get_capture(2)
4655 # ACK packet out -> in
4656 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4657 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4658 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4660 self.pg1.add_stream(p)
4661 self.pg_enable_capture(self.pg_interfaces)
4663 self.pg0.get_capture(1)
4665 # Check if deterministic NAT44 closed the session
4666 dms = self.vapi.nat_det_map_dump()
4667 self.assertEqual(0, dms[0].ses_num)
4669 self.logger.error("TCP session termination failed")
4672 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4673 def test_session_timeout(self):
4674 """ Deterministic NAT session timeouts """
4675 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4677 socket.inet_aton(self.nat_addr),
4679 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4680 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4683 self.initiate_tcp_session(self.pg0, self.pg1)
4684 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4685 pkts = self.create_stream_in(self.pg0, self.pg1)
4686 self.pg0.add_stream(pkts)
4687 self.pg_enable_capture(self.pg_interfaces)
4689 capture = self.pg1.get_capture(len(pkts))
4692 dms = self.vapi.nat_det_map_dump()
4693 self.assertEqual(0, dms[0].ses_num)
4695 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4696 def test_session_limit_per_user(self):
4697 """ Deterministic NAT maximum sessions per user limit """
4698 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4700 socket.inet_aton(self.nat_addr),
4702 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4703 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4705 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4706 src_address=self.pg2.local_ip4n,
4708 template_interval=10)
4709 self.vapi.nat_ipfix()
4712 for port in range(1025, 2025):
4713 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4714 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4715 UDP(sport=port, dport=port))
4718 self.pg0.add_stream(pkts)
4719 self.pg_enable_capture(self.pg_interfaces)
4721 capture = self.pg1.get_capture(len(pkts))
4723 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4724 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4725 UDP(sport=3001, dport=3002))
4726 self.pg0.add_stream(p)
4727 self.pg_enable_capture(self.pg_interfaces)
4729 capture = self.pg1.assert_nothing_captured()
4731 # verify ICMP error packet
4732 capture = self.pg0.get_capture(1)
4734 self.assertTrue(p.haslayer(ICMP))
4736 self.assertEqual(icmp.type, 3)
4737 self.assertEqual(icmp.code, 1)
4738 self.assertTrue(icmp.haslayer(IPerror))
4739 inner_ip = icmp[IPerror]
4740 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4741 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4743 dms = self.vapi.nat_det_map_dump()
4745 self.assertEqual(1000, dms[0].ses_num)
4747 # verify IPFIX logging
4748 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4750 capture = self.pg2.get_capture(2)
4751 ipfix = IPFIXDecoder()
4752 # first load template
4754 self.assertTrue(p.haslayer(IPFIX))
4755 if p.haslayer(Template):
4756 ipfix.add_template(p.getlayer(Template))
4757 # verify events in data set
4759 if p.haslayer(Data):
4760 data = ipfix.decode_data_set(p.getlayer(Set))
4761 self.verify_ipfix_max_entries_per_user(data)
4763 def clear_nat_det(self):
4765 Clear deterministic NAT configuration.
4767 self.vapi.nat_ipfix(enable=0)
4768 self.vapi.nat_det_set_timeouts()
4769 deterministic_mappings = self.vapi.nat_det_map_dump()
4770 for dsm in deterministic_mappings:
4771 self.vapi.nat_det_add_del_map(dsm.in_addr,
4777 interfaces = self.vapi.nat44_interface_dump()
4778 for intf in interfaces:
4779 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4784 super(TestDeterministicNAT, self).tearDown()
4785 if not self.vpp_dead:
4786 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4788 self.vapi.cli("show nat44 deterministic mappings"))
4790 self.vapi.cli("show nat44 deterministic timeouts"))
4792 self.vapi.cli("show nat44 deterministic sessions"))
4793 self.clear_nat_det()
4796 class TestNAT64(MethodHolder):
4797 """ NAT64 Test Cases """
4800 def setUpConstants(cls):
4801 super(TestNAT64, cls).setUpConstants()
4802 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4803 "nat64 st hash buckets 256", "}"])
4806 def setUpClass(cls):
4807 super(TestNAT64, cls).setUpClass()
4810 cls.tcp_port_in = 6303
4811 cls.tcp_port_out = 6303
4812 cls.udp_port_in = 6304
4813 cls.udp_port_out = 6304
4814 cls.icmp_id_in = 6305
4815 cls.icmp_id_out = 6305
4816 cls.nat_addr = '10.0.0.3'
4817 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4819 cls.vrf1_nat_addr = '10.0.10.3'
4820 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4822 cls.ipfix_src_port = 4739
4823 cls.ipfix_domain_id = 1
4825 cls.create_pg_interfaces(range(5))
4826 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4827 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4828 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4830 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4832 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4834 cls.pg0.generate_remote_hosts(2)
4836 for i in cls.ip6_interfaces:
4839 i.configure_ipv6_neighbors()
4841 for i in cls.ip4_interfaces:
4847 cls.pg3.config_ip4()
4848 cls.pg3.resolve_arp()
4849 cls.pg3.config_ip6()
4850 cls.pg3.configure_ipv6_neighbors()
4853 super(TestNAT64, cls).tearDownClass()
4856 def test_pool(self):
4857 """ Add/delete address to NAT64 pool """
4858 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
4860 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
4862 addresses = self.vapi.nat64_pool_addr_dump()
4863 self.assertEqual(len(addresses), 1)
4864 self.assertEqual(addresses[0].address, nat_addr)
4866 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
4868 addresses = self.vapi.nat64_pool_addr_dump()
4869 self.assertEqual(len(addresses), 0)
4871 def test_interface(self):
4872 """ Enable/disable NAT64 feature on the interface """
4873 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4874 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4876 interfaces = self.vapi.nat64_interface_dump()
4877 self.assertEqual(len(interfaces), 2)
4880 for intf in interfaces:
4881 if intf.sw_if_index == self.pg0.sw_if_index:
4882 self.assertEqual(intf.is_inside, 1)
4884 elif intf.sw_if_index == self.pg1.sw_if_index:
4885 self.assertEqual(intf.is_inside, 0)
4887 self.assertTrue(pg0_found)
4888 self.assertTrue(pg1_found)
4890 features = self.vapi.cli("show interface features pg0")
4891 self.assertNotEqual(features.find('nat64-in2out'), -1)
4892 features = self.vapi.cli("show interface features pg1")
4893 self.assertNotEqual(features.find('nat64-out2in'), -1)
4895 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
4896 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
4898 interfaces = self.vapi.nat64_interface_dump()
4899 self.assertEqual(len(interfaces), 0)
4901 def test_static_bib(self):
4902 """ Add/delete static BIB entry """
4903 in_addr = socket.inet_pton(socket.AF_INET6,
4904 '2001:db8:85a3::8a2e:370:7334')
4905 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
4908 proto = IP_PROTOS.tcp
4910 self.vapi.nat64_add_del_static_bib(in_addr,
4915 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4920 self.assertEqual(bibe.i_addr, in_addr)
4921 self.assertEqual(bibe.o_addr, out_addr)
4922 self.assertEqual(bibe.i_port, in_port)
4923 self.assertEqual(bibe.o_port, out_port)
4924 self.assertEqual(static_bib_num, 1)
4926 self.vapi.nat64_add_del_static_bib(in_addr,
4932 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
4937 self.assertEqual(static_bib_num, 0)
4939 def test_set_timeouts(self):
4940 """ Set NAT64 timeouts """
4941 # verify default values
4942 timeouts = self.vapi.nat64_get_timeouts()
4943 self.assertEqual(timeouts.udp, 300)
4944 self.assertEqual(timeouts.icmp, 60)
4945 self.assertEqual(timeouts.tcp_trans, 240)
4946 self.assertEqual(timeouts.tcp_est, 7440)
4947 self.assertEqual(timeouts.tcp_incoming_syn, 6)
4949 # set and verify custom values
4950 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
4951 tcp_est=7450, tcp_incoming_syn=10)
4952 timeouts = self.vapi.nat64_get_timeouts()
4953 self.assertEqual(timeouts.udp, 200)
4954 self.assertEqual(timeouts.icmp, 30)
4955 self.assertEqual(timeouts.tcp_trans, 250)
4956 self.assertEqual(timeouts.tcp_est, 7450)
4957 self.assertEqual(timeouts.tcp_incoming_syn, 10)
4959 def test_dynamic(self):
4960 """ NAT64 dynamic translation test """
4961 self.tcp_port_in = 6303
4962 self.udp_port_in = 6304
4963 self.icmp_id_in = 6305
4965 ses_num_start = self.nat64_get_ses_num()
4967 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
4969 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
4970 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
4973 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4974 self.pg0.add_stream(pkts)
4975 self.pg_enable_capture(self.pg_interfaces)
4977 capture = self.pg1.get_capture(len(pkts))
4978 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4979 dst_ip=self.pg1.remote_ip4)
4982 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
4983 self.pg1.add_stream(pkts)
4984 self.pg_enable_capture(self.pg_interfaces)
4986 capture = self.pg0.get_capture(len(pkts))
4987 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
4988 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
4991 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
4992 self.pg0.add_stream(pkts)
4993 self.pg_enable_capture(self.pg_interfaces)
4995 capture = self.pg1.get_capture(len(pkts))
4996 self.verify_capture_out(capture, nat_ip=self.nat_addr,
4997 dst_ip=self.pg1.remote_ip4)
5000 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5001 self.pg1.add_stream(pkts)
5002 self.pg_enable_capture(self.pg_interfaces)
5004 capture = self.pg0.get_capture(len(pkts))
5005 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5007 ses_num_end = self.nat64_get_ses_num()
5009 self.assertEqual(ses_num_end - ses_num_start, 3)
5011 # tenant with specific VRF
5012 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5013 self.vrf1_nat_addr_n,
5014 vrf_id=self.vrf1_id)
5015 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5017 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5018 self.pg2.add_stream(pkts)
5019 self.pg_enable_capture(self.pg_interfaces)
5021 capture = self.pg1.get_capture(len(pkts))
5022 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5023 dst_ip=self.pg1.remote_ip4)
5025 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5026 self.pg1.add_stream(pkts)
5027 self.pg_enable_capture(self.pg_interfaces)
5029 capture = self.pg2.get_capture(len(pkts))
5030 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5032 def test_static(self):
5033 """ NAT64 static translation test """
5034 self.tcp_port_in = 60303
5035 self.udp_port_in = 60304
5036 self.icmp_id_in = 60305
5037 self.tcp_port_out = 60303
5038 self.udp_port_out = 60304
5039 self.icmp_id_out = 60305
5041 ses_num_start = self.nat64_get_ses_num()
5043 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5045 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5046 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5048 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5053 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5058 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5065 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5066 self.pg0.add_stream(pkts)
5067 self.pg_enable_capture(self.pg_interfaces)
5069 capture = self.pg1.get_capture(len(pkts))
5070 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5071 dst_ip=self.pg1.remote_ip4, same_port=True)
5074 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5075 self.pg1.add_stream(pkts)
5076 self.pg_enable_capture(self.pg_interfaces)
5078 capture = self.pg0.get_capture(len(pkts))
5079 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5080 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5082 ses_num_end = self.nat64_get_ses_num()
5084 self.assertEqual(ses_num_end - ses_num_start, 3)
5086 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5087 def test_session_timeout(self):
5088 """ NAT64 session timeout """
5089 self.icmp_id_in = 1234
5090 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5092 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5093 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5094 self.vapi.nat64_set_timeouts(icmp=5)
5096 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5097 self.pg0.add_stream(pkts)
5098 self.pg_enable_capture(self.pg_interfaces)
5100 capture = self.pg1.get_capture(len(pkts))
5102 ses_num_before_timeout = self.nat64_get_ses_num()
5106 # ICMP session after timeout
5107 ses_num_after_timeout = self.nat64_get_ses_num()
5108 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5110 def test_icmp_error(self):
5111 """ NAT64 ICMP Error message translation """
5112 self.tcp_port_in = 6303
5113 self.udp_port_in = 6304
5114 self.icmp_id_in = 6305
5116 ses_num_start = self.nat64_get_ses_num()
5118 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5120 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5121 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5123 # send some packets to create sessions
5124 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5125 self.pg0.add_stream(pkts)
5126 self.pg_enable_capture(self.pg_interfaces)
5128 capture_ip4 = self.pg1.get_capture(len(pkts))
5129 self.verify_capture_out(capture_ip4,
5130 nat_ip=self.nat_addr,
5131 dst_ip=self.pg1.remote_ip4)
5133 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5134 self.pg1.add_stream(pkts)
5135 self.pg_enable_capture(self.pg_interfaces)
5137 capture_ip6 = self.pg0.get_capture(len(pkts))
5138 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5139 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5140 self.pg0.remote_ip6)
5143 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5144 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5145 ICMPv6DestUnreach(code=1) /
5146 packet[IPv6] for packet in capture_ip6]
5147 self.pg0.add_stream(pkts)
5148 self.pg_enable_capture(self.pg_interfaces)
5150 capture = self.pg1.get_capture(len(pkts))
5151 for packet in capture:
5153 self.assertEqual(packet[IP].src, self.nat_addr)
5154 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5155 self.assertEqual(packet[ICMP].type, 3)
5156 self.assertEqual(packet[ICMP].code, 13)
5157 inner = packet[IPerror]
5158 self.assertEqual(inner.src, self.pg1.remote_ip4)
5159 self.assertEqual(inner.dst, self.nat_addr)
5160 self.check_icmp_checksum(packet)
5161 if inner.haslayer(TCPerror):
5162 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5163 elif inner.haslayer(UDPerror):
5164 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5166 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5168 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5172 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5173 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5174 ICMP(type=3, code=13) /
5175 packet[IP] for packet in capture_ip4]
5176 self.pg1.add_stream(pkts)
5177 self.pg_enable_capture(self.pg_interfaces)
5179 capture = self.pg0.get_capture(len(pkts))
5180 for packet in capture:
5182 self.assertEqual(packet[IPv6].src, ip.src)
5183 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5184 icmp = packet[ICMPv6DestUnreach]
5185 self.assertEqual(icmp.code, 1)
5186 inner = icmp[IPerror6]
5187 self.assertEqual(inner.src, self.pg0.remote_ip6)
5188 self.assertEqual(inner.dst, ip.src)
5189 self.check_icmpv6_checksum(packet)
5190 if inner.haslayer(TCPerror):
5191 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5192 elif inner.haslayer(UDPerror):
5193 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5195 self.assertEqual(inner[ICMPv6EchoRequest].id,
5198 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5201 def test_hairpinning(self):
5202 """ NAT64 hairpinning """
5204 client = self.pg0.remote_hosts[0]
5205 server = self.pg0.remote_hosts[1]
5206 server_tcp_in_port = 22
5207 server_tcp_out_port = 4022
5208 server_udp_in_port = 23
5209 server_udp_out_port = 4023
5210 client_tcp_in_port = 1234
5211 client_udp_in_port = 1235
5212 client_tcp_out_port = 0
5213 client_udp_out_port = 0
5214 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5215 nat_addr_ip6 = ip.src
5217 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5219 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5220 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5222 self.vapi.nat64_add_del_static_bib(server.ip6n,
5225 server_tcp_out_port,
5227 self.vapi.nat64_add_del_static_bib(server.ip6n,
5230 server_udp_out_port,
5235 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5236 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5237 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5239 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5240 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5241 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5243 self.pg0.add_stream(pkts)
5244 self.pg_enable_capture(self.pg_interfaces)
5246 capture = self.pg0.get_capture(len(pkts))
5247 for packet in capture:
5249 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5250 self.assertEqual(packet[IPv6].dst, server.ip6)
5251 if packet.haslayer(TCP):
5252 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5253 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5254 self.check_tcp_checksum(packet)
5255 client_tcp_out_port = packet[TCP].sport
5257 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5258 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5259 self.check_udp_checksum(packet)
5260 client_udp_out_port = packet[UDP].sport
5262 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5267 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5268 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5269 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5271 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5272 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5273 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5275 self.pg0.add_stream(pkts)
5276 self.pg_enable_capture(self.pg_interfaces)
5278 capture = self.pg0.get_capture(len(pkts))
5279 for packet in capture:
5281 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5282 self.assertEqual(packet[IPv6].dst, client.ip6)
5283 if packet.haslayer(TCP):
5284 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5285 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5286 self.check_tcp_checksum(packet)
5288 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5289 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5290 self.check_udp_checksum(packet)
5292 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5297 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5298 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5299 ICMPv6DestUnreach(code=1) /
5300 packet[IPv6] for packet in capture]
5301 self.pg0.add_stream(pkts)
5302 self.pg_enable_capture(self.pg_interfaces)
5304 capture = self.pg0.get_capture(len(pkts))
5305 for packet in capture:
5307 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5308 self.assertEqual(packet[IPv6].dst, server.ip6)
5309 icmp = packet[ICMPv6DestUnreach]
5310 self.assertEqual(icmp.code, 1)
5311 inner = icmp[IPerror6]
5312 self.assertEqual(inner.src, server.ip6)
5313 self.assertEqual(inner.dst, nat_addr_ip6)
5314 self.check_icmpv6_checksum(packet)
5315 if inner.haslayer(TCPerror):
5316 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5317 self.assertEqual(inner[TCPerror].dport,
5318 client_tcp_out_port)
5320 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5321 self.assertEqual(inner[UDPerror].dport,
5322 client_udp_out_port)
5324 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5327 def test_prefix(self):
5328 """ NAT64 Network-Specific Prefix """
5330 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5332 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5333 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5334 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5335 self.vrf1_nat_addr_n,
5336 vrf_id=self.vrf1_id)
5337 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5340 global_pref64 = "2001:db8::"
5341 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5342 global_pref64_len = 32
5343 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5345 prefix = self.vapi.nat64_prefix_dump()
5346 self.assertEqual(len(prefix), 1)
5347 self.assertEqual(prefix[0].prefix, global_pref64_n)
5348 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5349 self.assertEqual(prefix[0].vrf_id, 0)
5351 # Add tenant specific prefix
5352 vrf1_pref64 = "2001:db8:122:300::"
5353 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5354 vrf1_pref64_len = 56
5355 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5357 vrf_id=self.vrf1_id)
5358 prefix = self.vapi.nat64_prefix_dump()
5359 self.assertEqual(len(prefix), 2)
5362 pkts = self.create_stream_in_ip6(self.pg0,
5365 plen=global_pref64_len)
5366 self.pg0.add_stream(pkts)
5367 self.pg_enable_capture(self.pg_interfaces)
5369 capture = self.pg1.get_capture(len(pkts))
5370 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5371 dst_ip=self.pg1.remote_ip4)
5373 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5374 self.pg1.add_stream(pkts)
5375 self.pg_enable_capture(self.pg_interfaces)
5377 capture = self.pg0.get_capture(len(pkts))
5378 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5381 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5383 # Tenant specific prefix
5384 pkts = self.create_stream_in_ip6(self.pg2,
5387 plen=vrf1_pref64_len)
5388 self.pg2.add_stream(pkts)
5389 self.pg_enable_capture(self.pg_interfaces)
5391 capture = self.pg1.get_capture(len(pkts))
5392 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5393 dst_ip=self.pg1.remote_ip4)
5395 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5396 self.pg1.add_stream(pkts)
5397 self.pg_enable_capture(self.pg_interfaces)
5399 capture = self.pg2.get_capture(len(pkts))
5400 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5403 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5405 def test_unknown_proto(self):
5406 """ NAT64 translate packet with unknown protocol """
5408 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5410 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5411 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5412 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5415 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5416 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5417 TCP(sport=self.tcp_port_in, dport=20))
5418 self.pg0.add_stream(p)
5419 self.pg_enable_capture(self.pg_interfaces)
5421 p = self.pg1.get_capture(1)
5423 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5424 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5426 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5427 TCP(sport=1234, dport=1234))
5428 self.pg0.add_stream(p)
5429 self.pg_enable_capture(self.pg_interfaces)
5431 p = self.pg1.get_capture(1)
5434 self.assertEqual(packet[IP].src, self.nat_addr)
5435 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5436 self.assertTrue(packet.haslayer(GRE))
5437 self.check_ip_checksum(packet)
5439 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5443 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5444 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5446 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5447 TCP(sport=1234, dport=1234))
5448 self.pg1.add_stream(p)
5449 self.pg_enable_capture(self.pg_interfaces)
5451 p = self.pg0.get_capture(1)
5454 self.assertEqual(packet[IPv6].src, remote_ip6)
5455 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5456 self.assertEqual(packet[IPv6].nh, 47)
5458 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5461 def test_hairpinning_unknown_proto(self):
5462 """ NAT64 translate packet with unknown protocol - hairpinning """
5464 client = self.pg0.remote_hosts[0]
5465 server = self.pg0.remote_hosts[1]
5466 server_tcp_in_port = 22
5467 server_tcp_out_port = 4022
5468 client_tcp_in_port = 1234
5469 client_tcp_out_port = 1235
5470 server_nat_ip = "10.0.0.100"
5471 client_nat_ip = "10.0.0.110"
5472 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5473 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5474 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5475 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5477 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5479 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5480 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5482 self.vapi.nat64_add_del_static_bib(server.ip6n,
5485 server_tcp_out_port,
5488 self.vapi.nat64_add_del_static_bib(server.ip6n,
5494 self.vapi.nat64_add_del_static_bib(client.ip6n,
5497 client_tcp_out_port,
5501 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5502 IPv6(src=client.ip6, dst=server_nat_ip6) /
5503 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5504 self.pg0.add_stream(p)
5505 self.pg_enable_capture(self.pg_interfaces)
5507 p = self.pg0.get_capture(1)
5509 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5510 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5512 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5513 TCP(sport=1234, dport=1234))
5514 self.pg0.add_stream(p)
5515 self.pg_enable_capture(self.pg_interfaces)
5517 p = self.pg0.get_capture(1)
5520 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5521 self.assertEqual(packet[IPv6].dst, server.ip6)
5522 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5524 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5528 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5529 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5531 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5532 TCP(sport=1234, dport=1234))
5533 self.pg0.add_stream(p)
5534 self.pg_enable_capture(self.pg_interfaces)
5536 p = self.pg0.get_capture(1)
5539 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5540 self.assertEqual(packet[IPv6].dst, client.ip6)
5541 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5543 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5546 def test_one_armed_nat64(self):
5547 """ One armed NAT64 """
5549 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5553 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5555 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5556 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5559 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5560 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5561 TCP(sport=12345, dport=80))
5562 self.pg3.add_stream(p)
5563 self.pg_enable_capture(self.pg_interfaces)
5565 capture = self.pg3.get_capture(1)
5570 self.assertEqual(ip.src, self.nat_addr)
5571 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5572 self.assertNotEqual(tcp.sport, 12345)
5573 external_port = tcp.sport
5574 self.assertEqual(tcp.dport, 80)
5575 self.check_tcp_checksum(p)
5576 self.check_ip_checksum(p)
5578 self.logger.error(ppp("Unexpected or invalid packet:", p))
5582 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5583 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5584 TCP(sport=80, dport=external_port))
5585 self.pg3.add_stream(p)
5586 self.pg_enable_capture(self.pg_interfaces)
5588 capture = self.pg3.get_capture(1)
5593 self.assertEqual(ip.src, remote_host_ip6)
5594 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5595 self.assertEqual(tcp.sport, 80)
5596 self.assertEqual(tcp.dport, 12345)
5597 self.check_tcp_checksum(p)
5599 self.logger.error(ppp("Unexpected or invalid packet:", p))
5602 def test_frag_in_order(self):
5603 """ NAT64 translate fragments arriving in order """
5604 self.tcp_port_in = random.randint(1025, 65535)
5606 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5608 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5609 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5611 reass = self.vapi.nat_reass_dump()
5612 reass_n_start = len(reass)
5616 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5617 self.tcp_port_in, 20, data)
5618 self.pg0.add_stream(pkts)
5619 self.pg_enable_capture(self.pg_interfaces)
5621 frags = self.pg1.get_capture(len(pkts))
5622 p = self.reass_frags_and_verify(frags,
5624 self.pg1.remote_ip4)
5625 self.assertEqual(p[TCP].dport, 20)
5626 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5627 self.tcp_port_out = p[TCP].sport
5628 self.assertEqual(data, p[Raw].load)
5631 data = "A" * 4 + "b" * 16 + "C" * 3
5632 pkts = self.create_stream_frag(self.pg1,
5637 self.pg1.add_stream(pkts)
5638 self.pg_enable_capture(self.pg_interfaces)
5640 frags = self.pg0.get_capture(len(pkts))
5641 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5642 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5643 self.assertEqual(p[TCP].sport, 20)
5644 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5645 self.assertEqual(data, p[Raw].load)
5647 reass = self.vapi.nat_reass_dump()
5648 reass_n_end = len(reass)
5650 self.assertEqual(reass_n_end - reass_n_start, 2)
5652 def test_reass_hairpinning(self):
5653 """ NAT64 fragments hairpinning """
5655 client = self.pg0.remote_hosts[0]
5656 server = self.pg0.remote_hosts[1]
5657 server_in_port = random.randint(1025, 65535)
5658 server_out_port = random.randint(1025, 65535)
5659 client_in_port = random.randint(1025, 65535)
5660 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5661 nat_addr_ip6 = ip.src
5663 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5665 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5666 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5668 # add static BIB entry for server
5669 self.vapi.nat64_add_del_static_bib(server.ip6n,
5675 # send packet from host to server
5676 pkts = self.create_stream_frag_ip6(self.pg0,
5681 self.pg0.add_stream(pkts)
5682 self.pg_enable_capture(self.pg_interfaces)
5684 frags = self.pg0.get_capture(len(pkts))
5685 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5686 self.assertNotEqual(p[TCP].sport, client_in_port)
5687 self.assertEqual(p[TCP].dport, server_in_port)
5688 self.assertEqual(data, p[Raw].load)
5690 def test_frag_out_of_order(self):
5691 """ NAT64 translate fragments arriving out of order """
5692 self.tcp_port_in = random.randint(1025, 65535)
5694 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5696 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5697 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5701 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5702 self.tcp_port_in, 20, data)
5704 self.pg0.add_stream(pkts)
5705 self.pg_enable_capture(self.pg_interfaces)
5707 frags = self.pg1.get_capture(len(pkts))
5708 p = self.reass_frags_and_verify(frags,
5710 self.pg1.remote_ip4)
5711 self.assertEqual(p[TCP].dport, 20)
5712 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5713 self.tcp_port_out = p[TCP].sport
5714 self.assertEqual(data, p[Raw].load)
5717 data = "A" * 4 + "B" * 16 + "C" * 3
5718 pkts = self.create_stream_frag(self.pg1,
5724 self.pg1.add_stream(pkts)
5725 self.pg_enable_capture(self.pg_interfaces)
5727 frags = self.pg0.get_capture(len(pkts))
5728 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5729 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5730 self.assertEqual(p[TCP].sport, 20)
5731 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5732 self.assertEqual(data, p[Raw].load)
5734 def test_interface_addr(self):
5735 """ Acquire NAT64 pool addresses from interface """
5736 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5738 # no address in NAT64 pool
5739 adresses = self.vapi.nat44_address_dump()
5740 self.assertEqual(0, len(adresses))
5742 # configure interface address and check NAT64 address pool
5743 self.pg4.config_ip4()
5744 addresses = self.vapi.nat64_pool_addr_dump()
5745 self.assertEqual(len(addresses), 1)
5746 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5748 # remove interface address and check NAT64 address pool
5749 self.pg4.unconfig_ip4()
5750 addresses = self.vapi.nat64_pool_addr_dump()
5751 self.assertEqual(0, len(adresses))
5753 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5754 def test_ipfix_max_bibs_sessions(self):
5755 """ IPFIX logging maximum session and BIB entries exceeded """
5758 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5762 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5764 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5765 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5769 for i in range(0, max_bibs):
5770 src = "fd01:aa::%x" % (i)
5771 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5772 IPv6(src=src, dst=remote_host_ip6) /
5773 TCP(sport=12345, dport=80))
5775 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5776 IPv6(src=src, dst=remote_host_ip6) /
5777 TCP(sport=12345, dport=22))
5779 self.pg0.add_stream(pkts)
5780 self.pg_enable_capture(self.pg_interfaces)
5782 self.pg1.get_capture(max_sessions)
5784 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5785 src_address=self.pg3.local_ip4n,
5787 template_interval=10)
5788 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5789 src_port=self.ipfix_src_port)
5791 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5792 IPv6(src=src, dst=remote_host_ip6) /
5793 TCP(sport=12345, dport=25))
5794 self.pg0.add_stream(p)
5795 self.pg_enable_capture(self.pg_interfaces)
5797 self.pg1.get_capture(0)
5798 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5799 capture = self.pg3.get_capture(9)
5800 ipfix = IPFIXDecoder()
5801 # first load template
5803 self.assertTrue(p.haslayer(IPFIX))
5804 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5805 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5806 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5807 self.assertEqual(p[UDP].dport, 4739)
5808 self.assertEqual(p[IPFIX].observationDomainID,
5809 self.ipfix_domain_id)
5810 if p.haslayer(Template):
5811 ipfix.add_template(p.getlayer(Template))
5812 # verify events in data set
5814 if p.haslayer(Data):
5815 data = ipfix.decode_data_set(p.getlayer(Set))
5816 self.verify_ipfix_max_sessions(data, max_sessions)
5818 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5819 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5820 TCP(sport=12345, dport=80))
5821 self.pg0.add_stream(p)
5822 self.pg_enable_capture(self.pg_interfaces)
5824 self.pg1.get_capture(0)
5825 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5826 capture = self.pg3.get_capture(1)
5827 # verify events in data set
5829 self.assertTrue(p.haslayer(IPFIX))
5830 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5831 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5832 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5833 self.assertEqual(p[UDP].dport, 4739)
5834 self.assertEqual(p[IPFIX].observationDomainID,
5835 self.ipfix_domain_id)
5836 if p.haslayer(Data):
5837 data = ipfix.decode_data_set(p.getlayer(Set))
5838 self.verify_ipfix_max_bibs(data, max_bibs)
5840 def test_ipfix_max_frags(self):
5841 """ IPFIX logging maximum fragments pending reassembly exceeded """
5842 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5844 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5845 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5846 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
5847 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5848 src_address=self.pg3.local_ip4n,
5850 template_interval=10)
5851 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5852 src_port=self.ipfix_src_port)
5855 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5856 self.tcp_port_in, 20, data)
5857 self.pg0.add_stream(pkts[-1])
5858 self.pg_enable_capture(self.pg_interfaces)
5860 self.pg1.get_capture(0)
5861 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5862 capture = self.pg3.get_capture(9)
5863 ipfix = IPFIXDecoder()
5864 # first load template
5866 self.assertTrue(p.haslayer(IPFIX))
5867 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5868 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5869 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5870 self.assertEqual(p[UDP].dport, 4739)
5871 self.assertEqual(p[IPFIX].observationDomainID,
5872 self.ipfix_domain_id)
5873 if p.haslayer(Template):
5874 ipfix.add_template(p.getlayer(Template))
5875 # verify events in data set
5877 if p.haslayer(Data):
5878 data = ipfix.decode_data_set(p.getlayer(Set))
5879 self.verify_ipfix_max_fragments_ip6(data, 0,
5880 self.pg0.remote_ip6n)
5882 def test_ipfix_bib_ses(self):
5883 """ IPFIX logging NAT64 BIB/session create and delete events """
5884 self.tcp_port_in = random.randint(1025, 65535)
5885 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5889 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5891 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5892 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5893 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5894 src_address=self.pg3.local_ip4n,
5896 template_interval=10)
5897 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5898 src_port=self.ipfix_src_port)
5901 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5902 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5903 TCP(sport=self.tcp_port_in, dport=25))
5904 self.pg0.add_stream(p)
5905 self.pg_enable_capture(self.pg_interfaces)
5907 p = self.pg1.get_capture(1)
5908 self.tcp_port_out = p[0][TCP].sport
5909 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5910 capture = self.pg3.get_capture(10)
5911 ipfix = IPFIXDecoder()
5912 # first load template
5914 self.assertTrue(p.haslayer(IPFIX))
5915 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5916 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5917 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5918 self.assertEqual(p[UDP].dport, 4739)
5919 self.assertEqual(p[IPFIX].observationDomainID,
5920 self.ipfix_domain_id)
5921 if p.haslayer(Template):
5922 ipfix.add_template(p.getlayer(Template))
5923 # verify events in data set
5925 if p.haslayer(Data):
5926 data = ipfix.decode_data_set(p.getlayer(Set))
5927 if ord(data[0][230]) == 10:
5928 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
5929 elif ord(data[0][230]) == 6:
5930 self.verify_ipfix_nat64_ses(data,
5932 self.pg0.remote_ip6n,
5933 self.pg1.remote_ip4,
5936 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5939 self.pg_enable_capture(self.pg_interfaces)
5940 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5943 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5944 capture = self.pg3.get_capture(2)
5945 # verify events in data set
5947 self.assertTrue(p.haslayer(IPFIX))
5948 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5949 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5950 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5951 self.assertEqual(p[UDP].dport, 4739)
5952 self.assertEqual(p[IPFIX].observationDomainID,
5953 self.ipfix_domain_id)
5954 if p.haslayer(Data):
5955 data = ipfix.decode_data_set(p.getlayer(Set))
5956 if ord(data[0][230]) == 11:
5957 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
5958 elif ord(data[0][230]) == 7:
5959 self.verify_ipfix_nat64_ses(data,
5961 self.pg0.remote_ip6n,
5962 self.pg1.remote_ip4,
5965 self.logger.error(ppp("Unexpected or invalid packet: ", p))
5967 def nat64_get_ses_num(self):
5969 Return number of active NAT64 sessions.
5971 st = self.vapi.nat64_st_dump()
5974 def clear_nat64(self):
5976 Clear NAT64 configuration.
5978 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
5979 domain_id=self.ipfix_domain_id)
5980 self.ipfix_src_port = 4739
5981 self.ipfix_domain_id = 1
5983 self.vapi.nat64_set_timeouts()
5985 interfaces = self.vapi.nat64_interface_dump()
5986 for intf in interfaces:
5987 if intf.is_inside > 1:
5988 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5991 self.vapi.nat64_add_del_interface(intf.sw_if_index,
5995 bib = self.vapi.nat64_bib_dump(255)
5998 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6006 adresses = self.vapi.nat64_pool_addr_dump()
6007 for addr in adresses:
6008 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6013 prefixes = self.vapi.nat64_prefix_dump()
6014 for prefix in prefixes:
6015 self.vapi.nat64_add_del_prefix(prefix.prefix,
6017 vrf_id=prefix.vrf_id,
6021 super(TestNAT64, self).tearDown()
6022 if not self.vpp_dead:
6023 self.logger.info(self.vapi.cli("show nat64 pool"))
6024 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6025 self.logger.info(self.vapi.cli("show nat64 prefix"))
6026 self.logger.info(self.vapi.cli("show nat64 bib all"))
6027 self.logger.info(self.vapi.cli("show nat64 session table all"))
6028 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6032 class TestDSlite(MethodHolder):
6033 """ DS-Lite Test Cases """
6036 def setUpClass(cls):
6037 super(TestDSlite, cls).setUpClass()
6040 cls.nat_addr = '10.0.0.3'
6041 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6043 cls.create_pg_interfaces(range(2))
6045 cls.pg0.config_ip4()
6046 cls.pg0.resolve_arp()
6048 cls.pg1.config_ip6()
6049 cls.pg1.generate_remote_hosts(2)
6050 cls.pg1.configure_ipv6_neighbors()
6053 super(TestDSlite, cls).tearDownClass()
6056 def test_dslite(self):
6057 """ Test DS-Lite """
6058 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6060 aftr_ip4 = '192.0.0.1'
6061 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6062 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6063 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6064 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6067 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6068 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6069 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6070 UDP(sport=20000, dport=10000))
6071 self.pg1.add_stream(p)
6072 self.pg_enable_capture(self.pg_interfaces)
6074 capture = self.pg0.get_capture(1)
6075 capture = capture[0]
6076 self.assertFalse(capture.haslayer(IPv6))
6077 self.assertEqual(capture[IP].src, self.nat_addr)
6078 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6079 self.assertNotEqual(capture[UDP].sport, 20000)
6080 self.assertEqual(capture[UDP].dport, 10000)
6081 self.check_ip_checksum(capture)
6082 out_port = capture[UDP].sport
6084 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6085 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6086 UDP(sport=10000, dport=out_port))
6087 self.pg0.add_stream(p)
6088 self.pg_enable_capture(self.pg_interfaces)
6090 capture = self.pg1.get_capture(1)
6091 capture = capture[0]
6092 self.assertEqual(capture[IPv6].src, aftr_ip6)
6093 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6094 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6095 self.assertEqual(capture[IP].dst, '192.168.1.1')
6096 self.assertEqual(capture[UDP].sport, 10000)
6097 self.assertEqual(capture[UDP].dport, 20000)
6098 self.check_ip_checksum(capture)
6101 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6102 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6103 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6104 TCP(sport=20001, dport=10001))
6105 self.pg1.add_stream(p)
6106 self.pg_enable_capture(self.pg_interfaces)
6108 capture = self.pg0.get_capture(1)
6109 capture = capture[0]
6110 self.assertFalse(capture.haslayer(IPv6))
6111 self.assertEqual(capture[IP].src, self.nat_addr)
6112 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6113 self.assertNotEqual(capture[TCP].sport, 20001)
6114 self.assertEqual(capture[TCP].dport, 10001)
6115 self.check_ip_checksum(capture)
6116 self.check_tcp_checksum(capture)
6117 out_port = capture[TCP].sport
6119 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6120 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6121 TCP(sport=10001, dport=out_port))
6122 self.pg0.add_stream(p)
6123 self.pg_enable_capture(self.pg_interfaces)
6125 capture = self.pg1.get_capture(1)
6126 capture = capture[0]
6127 self.assertEqual(capture[IPv6].src, aftr_ip6)
6128 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6129 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6130 self.assertEqual(capture[IP].dst, '192.168.1.1')
6131 self.assertEqual(capture[TCP].sport, 10001)
6132 self.assertEqual(capture[TCP].dport, 20001)
6133 self.check_ip_checksum(capture)
6134 self.check_tcp_checksum(capture)
6137 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6138 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6139 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6140 ICMP(id=4000, type='echo-request'))
6141 self.pg1.add_stream(p)
6142 self.pg_enable_capture(self.pg_interfaces)
6144 capture = self.pg0.get_capture(1)
6145 capture = capture[0]
6146 self.assertFalse(capture.haslayer(IPv6))
6147 self.assertEqual(capture[IP].src, self.nat_addr)
6148 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6149 self.assertNotEqual(capture[ICMP].id, 4000)
6150 self.check_ip_checksum(capture)
6151 self.check_icmp_checksum(capture)
6152 out_id = capture[ICMP].id
6154 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6155 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6156 ICMP(id=out_id, type='echo-reply'))
6157 self.pg0.add_stream(p)
6158 self.pg_enable_capture(self.pg_interfaces)
6160 capture = self.pg1.get_capture(1)
6161 capture = capture[0]
6162 self.assertEqual(capture[IPv6].src, aftr_ip6)
6163 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6164 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6165 self.assertEqual(capture[IP].dst, '192.168.1.1')
6166 self.assertEqual(capture[ICMP].id, 4000)
6167 self.check_ip_checksum(capture)
6168 self.check_icmp_checksum(capture)
6170 # ping DS-Lite AFTR tunnel endpoint address
6171 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6172 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6173 ICMPv6EchoRequest())
6174 self.pg1.add_stream(p)
6175 self.pg_enable_capture(self.pg_interfaces)
6177 capture = self.pg1.get_capture(1)
6178 self.assertEqual(1, len(capture))
6179 capture = capture[0]
6180 self.assertEqual(capture[IPv6].src, aftr_ip6)
6181 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6182 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6185 super(TestDSlite, self).tearDown()
6186 if not self.vpp_dead:
6187 self.logger.info(self.vapi.cli("show dslite pool"))
6189 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6190 self.logger.info(self.vapi.cli("show dslite sessions"))
6193 class TestDSliteCE(MethodHolder):
6194 """ DS-Lite CE Test Cases """
6197 def setUpConstants(cls):
6198 super(TestDSliteCE, cls).setUpConstants()
6199 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6202 def setUpClass(cls):
6203 super(TestDSliteCE, cls).setUpClass()
6206 cls.create_pg_interfaces(range(2))
6208 cls.pg0.config_ip4()
6209 cls.pg0.resolve_arp()
6211 cls.pg1.config_ip6()
6212 cls.pg1.generate_remote_hosts(1)
6213 cls.pg1.configure_ipv6_neighbors()
6216 super(TestDSliteCE, cls).tearDownClass()
6219 def test_dslite_ce(self):
6220 """ Test DS-Lite CE """
6222 b4_ip4 = '192.0.0.2'
6223 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6224 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6225 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6226 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6228 aftr_ip4 = '192.0.0.1'
6229 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6230 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6231 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6232 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6234 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6235 dst_address_length=128,
6236 next_hop_address=self.pg1.remote_ip6n,
6237 next_hop_sw_if_index=self.pg1.sw_if_index,
6241 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6242 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6243 UDP(sport=10000, dport=20000))
6244 self.pg0.add_stream(p)
6245 self.pg_enable_capture(self.pg_interfaces)
6247 capture = self.pg1.get_capture(1)
6248 capture = capture[0]
6249 self.assertEqual(capture[IPv6].src, b4_ip6)
6250 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6251 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6252 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6253 self.assertEqual(capture[UDP].sport, 10000)
6254 self.assertEqual(capture[UDP].dport, 20000)
6255 self.check_ip_checksum(capture)
6258 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6259 IPv6(dst=b4_ip6, src=aftr_ip6) /
6260 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6261 UDP(sport=20000, dport=10000))
6262 self.pg1.add_stream(p)
6263 self.pg_enable_capture(self.pg_interfaces)
6265 capture = self.pg0.get_capture(1)
6266 capture = capture[0]
6267 self.assertFalse(capture.haslayer(IPv6))
6268 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6269 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6270 self.assertEqual(capture[UDP].sport, 20000)
6271 self.assertEqual(capture[UDP].dport, 10000)
6272 self.check_ip_checksum(capture)
6274 # ping DS-Lite B4 tunnel endpoint address
6275 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6276 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6277 ICMPv6EchoRequest())
6278 self.pg1.add_stream(p)
6279 self.pg_enable_capture(self.pg_interfaces)
6281 capture = self.pg1.get_capture(1)
6282 self.assertEqual(1, len(capture))
6283 capture = capture[0]
6284 self.assertEqual(capture[IPv6].src, b4_ip6)
6285 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6286 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6289 super(TestDSliteCE, self).tearDown()
6290 if not self.vpp_dead:
6292 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6294 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6297 class TestNAT66(MethodHolder):
6298 """ NAT66 Test Cases """
6301 def setUpClass(cls):
6302 super(TestNAT66, cls).setUpClass()
6305 cls.nat_addr = 'fd01:ff::2'
6306 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6308 cls.create_pg_interfaces(range(2))
6309 cls.interfaces = list(cls.pg_interfaces)
6311 for i in cls.interfaces:
6314 i.configure_ipv6_neighbors()
6317 super(TestNAT66, cls).tearDownClass()
6320 def test_static(self):
6321 """ 1:1 NAT66 test """
6322 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6323 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6324 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6329 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6330 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6333 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6334 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6337 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6338 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6339 ICMPv6EchoRequest())
6341 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6342 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6343 GRE() / IP() / TCP())
6345 self.pg0.add_stream(pkts)
6346 self.pg_enable_capture(self.pg_interfaces)
6348 capture = self.pg1.get_capture(len(pkts))
6349 for packet in capture:
6351 self.assertEqual(packet[IPv6].src, self.nat_addr)
6352 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6353 if packet.haslayer(TCP):
6354 self.check_tcp_checksum(packet)
6355 elif packet.haslayer(UDP):
6356 self.check_udp_checksum(packet)
6357 elif packet.haslayer(ICMPv6EchoRequest):
6358 self.check_icmpv6_checksum(packet)
6360 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6365 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6366 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6369 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6370 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6373 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6374 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6377 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6378 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6379 GRE() / IP() / TCP())
6381 self.pg1.add_stream(pkts)
6382 self.pg_enable_capture(self.pg_interfaces)
6384 capture = self.pg0.get_capture(len(pkts))
6385 for packet in capture:
6387 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6388 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6389 if packet.haslayer(TCP):
6390 self.check_tcp_checksum(packet)
6391 elif packet.haslayer(UDP):
6392 self.check_udp_checksum(packet)
6393 elif packet.haslayer(ICMPv6EchoReply):
6394 self.check_icmpv6_checksum(packet)
6396 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6399 sm = self.vapi.nat66_static_mapping_dump()
6400 self.assertEqual(len(sm), 1)
6401 self.assertEqual(sm[0].total_pkts, 8)
6403 def clear_nat66(self):
6405 Clear NAT66 configuration.
6407 interfaces = self.vapi.nat66_interface_dump()
6408 for intf in interfaces:
6409 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6413 static_mappings = self.vapi.nat66_static_mapping_dump()
6414 for sm in static_mappings:
6415 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6416 sm.external_ip_address,
6421 super(TestNAT66, self).tearDown()
6422 if not self.vpp_dead:
6423 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6424 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6427 if __name__ == '__main__':
6428 unittest.main(testRunner=VppTestRunner)