9 from framework import VppTestCase, VppTestRunner, running_extended_tests
10 from scapy.layers.inet import IP, TCP, UDP, ICMP
11 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
12 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
13 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
14 from scapy.layers.l2 import Ether, ARP, GRE
15 from scapy.data import IP_PROTOS
16 from scapy.packet import bind_layers, Raw
17 from scapy.all import fragment6
19 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
20 from time import sleep
21 from util import ip4_range
22 from util import mactobinary
25 class MethodHolder(VppTestCase):
26 """ NAT create capture and verify method holder """
28 def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
30 Create packet stream for inside network
32 :param in_if: Inside interface
33 :param out_if: Outside interface
34 :param dst_ip: Destination address
35 :param ttl: TTL of generated packets
38 dst_ip = out_if.remote_ip4
42 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
43 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
44 TCP(sport=self.tcp_port_in, dport=20))
48 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
49 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
50 UDP(sport=self.udp_port_in, dport=20))
54 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
55 IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
56 ICMP(id=self.icmp_id_in, type='echo-request'))
61 def compose_ip6(self, ip4, pref, plen):
63 Compose IPv4-embedded IPv6 addresses
65 :param ip4: IPv4 address
66 :param pref: IPv6 prefix
67 :param plen: IPv6 prefix length
68 :returns: IPv4-embedded IPv6 addresses
70 pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
71 ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
100 pref_n[14] = ip4_n[2]
101 pref_n[15] = ip4_n[3]
102 return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
104 def extract_ip4(self, ip6, plen):
106 Extract IPv4 address embedded in IPv6 addresses
108 :param ip6: IPv6 address
109 :param plen: IPv6 prefix length
110 :returns: extracted IPv4 address
112 ip6_n = list(socket.inet_pton(socket.AF_INET6, ip6))
144 return socket.inet_ntop(socket.AF_INET, ''.join(ip4_n))
146 def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
148 Create IPv6 packet stream for inside network
150 :param in_if: Inside interface
151 :param out_if: Outside interface
152 :param ttl: Hop Limit of generated packets
153 :param pref: NAT64 prefix
154 :param plen: NAT64 prefix length
158 dst = ''.join(['64:ff9b::', out_if.remote_ip4])
160 dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
163 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
164 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
165 TCP(sport=self.tcp_port_in, dport=20))
169 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
170 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
171 UDP(sport=self.udp_port_in, dport=20))
175 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
176 IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
177 ICMPv6EchoRequest(id=self.icmp_id_in))
182 def create_stream_out(self, out_if, dst_ip=None, ttl=64,
183 use_inside_ports=False):
185 Create packet stream for outside network
187 :param out_if: Outside interface
188 :param dst_ip: Destination IP address (Default use global NAT address)
189 :param ttl: TTL of generated packets
190 :param use_inside_ports: Use inside NAT ports as destination ports
191 instead of outside ports
194 dst_ip = self.nat_addr
195 if not use_inside_ports:
196 tcp_port = self.tcp_port_out
197 udp_port = self.udp_port_out
198 icmp_id = self.icmp_id_out
200 tcp_port = self.tcp_port_in
201 udp_port = self.udp_port_in
202 icmp_id = self.icmp_id_in
205 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
206 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
207 TCP(dport=tcp_port, sport=20))
211 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
212 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
213 UDP(dport=udp_port, sport=20))
217 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
218 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
219 ICMP(id=icmp_id, type='echo-reply'))
224 def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
226 Create packet stream for outside network
228 :param out_if: Outside interface
229 :param dst_ip: Destination IP address (Default use global NAT address)
230 :param hl: HL of generated packets
234 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
235 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
236 TCP(dport=self.tcp_port_out, sport=20))
240 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
241 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
242 UDP(dport=self.udp_port_out, sport=20))
246 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
247 IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
248 ICMPv6EchoReply(id=self.icmp_id_out))
253 def verify_capture_out(self, capture, nat_ip=None, same_port=False,
254 packet_num=3, dst_ip=None, is_ip6=False):
256 Verify captured packets on outside network
258 :param capture: Captured packets
259 :param nat_ip: Translated IP address (Default use global NAT address)
260 :param same_port: Sorce port number is not translated (Default False)
261 :param packet_num: Expected number of packets (Default 3)
262 :param dst_ip: Destination IP address (Default do not verify)
263 :param is_ip6: If L3 protocol is IPv6 (Default False)
267 ICMP46 = ICMPv6EchoRequest
272 nat_ip = self.nat_addr
273 self.assertEqual(packet_num, len(capture))
274 for packet in capture:
277 self.assert_packet_checksums_valid(packet)
278 self.assertEqual(packet[IP46].src, nat_ip)
279 if dst_ip is not None:
280 self.assertEqual(packet[IP46].dst, dst_ip)
281 if packet.haslayer(TCP):
283 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
286 packet[TCP].sport, self.tcp_port_in)
287 self.tcp_port_out = packet[TCP].sport
288 self.assert_packet_checksums_valid(packet)
289 elif packet.haslayer(UDP):
291 self.assertEqual(packet[UDP].sport, self.udp_port_in)
294 packet[UDP].sport, self.udp_port_in)
295 self.udp_port_out = packet[UDP].sport
298 self.assertEqual(packet[ICMP46].id, self.icmp_id_in)
300 self.assertNotEqual(packet[ICMP46].id, self.icmp_id_in)
301 self.icmp_id_out = packet[ICMP46].id
302 self.assert_packet_checksums_valid(packet)
304 self.logger.error(ppp("Unexpected or invalid packet "
305 "(outside network):", packet))
308 def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
309 packet_num=3, dst_ip=None):
311 Verify captured packets on outside network
313 :param capture: Captured packets
314 :param nat_ip: Translated IP address
315 :param same_port: Sorce port number is not translated (Default False)
316 :param packet_num: Expected number of packets (Default 3)
317 :param dst_ip: Destination IP address (Default do not verify)
319 return self.verify_capture_out(capture, nat_ip, same_port, packet_num,
322 def verify_capture_in(self, capture, in_if, packet_num=3):
324 Verify captured packets on inside network
326 :param capture: Captured packets
327 :param in_if: Inside interface
328 :param packet_num: Expected number of packets (Default 3)
330 self.assertEqual(packet_num, len(capture))
331 for packet in capture:
333 self.assert_packet_checksums_valid(packet)
334 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
335 if packet.haslayer(TCP):
336 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
337 elif packet.haslayer(UDP):
338 self.assertEqual(packet[UDP].dport, self.udp_port_in)
340 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
342 self.logger.error(ppp("Unexpected or invalid packet "
343 "(inside network):", packet))
346 def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
348 Verify captured IPv6 packets on inside network
350 :param capture: Captured packets
351 :param src_ip: Source IP
352 :param dst_ip: Destination IP address
353 :param packet_num: Expected number of packets (Default 3)
355 self.assertEqual(packet_num, len(capture))
356 for packet in capture:
358 self.assertEqual(packet[IPv6].src, src_ip)
359 self.assertEqual(packet[IPv6].dst, dst_ip)
360 self.assert_packet_checksums_valid(packet)
361 if packet.haslayer(TCP):
362 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
363 elif packet.haslayer(UDP):
364 self.assertEqual(packet[UDP].dport, self.udp_port_in)
366 self.assertEqual(packet[ICMPv6EchoReply].id,
369 self.logger.error(ppp("Unexpected or invalid packet "
370 "(inside network):", packet))
373 def verify_capture_no_translation(self, capture, ingress_if, egress_if):
375 Verify captured packet that don't have to be translated
377 :param capture: Captured packets
378 :param ingress_if: Ingress interface
379 :param egress_if: Egress interface
381 for packet in capture:
383 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385 if packet.haslayer(TCP):
386 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387 elif packet.haslayer(UDP):
388 self.assertEqual(packet[UDP].sport, self.udp_port_in)
390 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
392 self.logger.error(ppp("Unexpected or invalid packet "
393 "(inside network):", packet))
396 def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397 packet_num=3, icmp_type=11):
399 Verify captured packets with ICMP errors on outside network
401 :param capture: Captured packets
402 :param src_ip: Translated IP address or IP address of VPP
403 (Default use global NAT address)
404 :param packet_num: Expected number of packets (Default 3)
405 :param icmp_type: Type of error ICMP packet
406 we are expecting (Default 11)
409 src_ip = self.nat_addr
410 self.assertEqual(packet_num, len(capture))
411 for packet in capture:
413 self.assertEqual(packet[IP].src, src_ip)
414 self.assertTrue(packet.haslayer(ICMP))
416 self.assertEqual(icmp.type, icmp_type)
417 self.assertTrue(icmp.haslayer(IPerror))
418 inner_ip = icmp[IPerror]
419 if inner_ip.haslayer(TCPerror):
420 self.assertEqual(inner_ip[TCPerror].dport,
422 elif inner_ip.haslayer(UDPerror):
423 self.assertEqual(inner_ip[UDPerror].dport,
426 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
428 self.logger.error(ppp("Unexpected or invalid packet "
429 "(outside network):", packet))
432 def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
435 Verify captured packets with ICMP errors on inside network
437 :param capture: Captured packets
438 :param in_if: Inside interface
439 :param packet_num: Expected number of packets (Default 3)
440 :param icmp_type: Type of error ICMP packet
441 we are expecting (Default 11)
443 self.assertEqual(packet_num, len(capture))
444 for packet in capture:
446 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447 self.assertTrue(packet.haslayer(ICMP))
449 self.assertEqual(icmp.type, icmp_type)
450 self.assertTrue(icmp.haslayer(IPerror))
451 inner_ip = icmp[IPerror]
452 if inner_ip.haslayer(TCPerror):
453 self.assertEqual(inner_ip[TCPerror].sport,
455 elif inner_ip.haslayer(UDPerror):
456 self.assertEqual(inner_ip[UDPerror].sport,
459 self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
461 self.logger.error(ppp("Unexpected or invalid packet "
462 "(inside network):", packet))
465 def create_stream_frag(self, src_if, dst, sport, dport, data):
467 Create fragmented packet stream
469 :param src_if: Source interface
470 :param dst: Destination IPv4 address
471 :param sport: Source TCP port
472 :param dport: Destination TCP port
473 :param data: Payload data
476 id = random.randint(0, 65535)
477 p = (IP(src=src_if.remote_ip4, dst=dst) /
478 TCP(sport=sport, dport=dport) /
480 p = p.__class__(str(p))
481 chksum = p['TCP'].chksum
483 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
484 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
485 TCP(sport=sport, dport=dport, chksum=chksum) /
488 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
489 IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
490 proto=IP_PROTOS.tcp) /
493 p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
494 IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=IP_PROTOS.tcp,
500 def create_stream_frag_ip6(self, src_if, dst, sport, dport, data,
501 pref=None, plen=0, frag_size=128):
503 Create fragmented packet stream
505 :param src_if: Source interface
506 :param dst: Destination IPv4 address
507 :param sport: Source TCP port
508 :param dport: Destination TCP port
509 :param data: Payload data
510 :param pref: NAT64 prefix
511 :param plen: NAT64 prefix length
512 :param fragsize: size of fragments
516 dst_ip6 = ''.join(['64:ff9b::', dst])
518 dst_ip6 = self.compose_ip6(dst, pref, plen)
520 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
521 IPv6(src=src_if.remote_ip6, dst=dst_ip6) /
522 IPv6ExtHdrFragment(id=random.randint(0, 65535)) /
523 TCP(sport=sport, dport=dport) /
526 return fragment6(p, frag_size)
528 def reass_frags_and_verify(self, frags, src, dst):
530 Reassemble and verify fragmented packet
532 :param frags: Captured fragments
533 :param src: Source IPv4 address to verify
534 :param dst: Destination IPv4 address to verify
536 :returns: Reassembled IPv4 packet
538 buffer = StringIO.StringIO()
540 self.assertEqual(p[IP].src, src)
541 self.assertEqual(p[IP].dst, dst)
542 self.assert_ip_checksum_valid(p)
543 buffer.seek(p[IP].frag * 8)
544 buffer.write(p[IP].payload)
545 ip = frags[0].getlayer(IP)
546 ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
547 proto=frags[0][IP].proto)
548 if ip.proto == IP_PROTOS.tcp:
549 p = (ip / TCP(buffer.getvalue()))
550 self.assert_tcp_checksum_valid(p)
551 elif ip.proto == IP_PROTOS.udp:
552 p = (ip / UDP(buffer.getvalue()))
555 def reass_frags_and_verify_ip6(self, frags, src, dst):
557 Reassemble and verify fragmented packet
559 :param frags: Captured fragments
560 :param src: Source IPv6 address to verify
561 :param dst: Destination IPv6 address to verify
563 :returns: Reassembled IPv6 packet
565 buffer = StringIO.StringIO()
567 self.assertEqual(p[IPv6].src, src)
568 self.assertEqual(p[IPv6].dst, dst)
569 buffer.seek(p[IPv6ExtHdrFragment].offset * 8)
570 buffer.write(p[IPv6ExtHdrFragment].payload)
571 ip = IPv6(src=frags[0][IPv6].src, dst=frags[0][IPv6].dst,
572 nh=frags[0][IPv6ExtHdrFragment].nh)
573 if ip.nh == IP_PROTOS.tcp:
574 p = (ip / TCP(buffer.getvalue()))
575 elif ip.nh == IP_PROTOS.udp:
576 p = (ip / UDP(buffer.getvalue()))
577 self.assert_packet_checksums_valid(p)
580 def initiate_tcp_session(self, in_if, out_if):
582 Initiates TCP session
584 :param in_if: Inside interface
585 :param out_if: Outside interface
589 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
590 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
591 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
594 self.pg_enable_capture(self.pg_interfaces)
596 capture = out_if.get_capture(1)
598 self.tcp_port_out = p[TCP].sport
600 # SYN + ACK packet out->in
601 p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
602 IP(src=out_if.remote_ip4, dst=self.nat_addr) /
603 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
606 self.pg_enable_capture(self.pg_interfaces)
611 p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
612 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
613 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
616 self.pg_enable_capture(self.pg_interfaces)
618 out_if.get_capture(1)
621 self.logger.error("TCP 3 way handshake failed")
624 def verify_ipfix_nat44_ses(self, data):
626 Verify IPFIX NAT44 session create/delete event
628 :param data: Decoded IPFIX data records
630 nat44_ses_create_num = 0
631 nat44_ses_delete_num = 0
632 self.assertEqual(6, len(data))
635 self.assertIn(ord(record[230]), [4, 5])
636 if ord(record[230]) == 4:
637 nat44_ses_create_num += 1
639 nat44_ses_delete_num += 1
641 self.assertEqual(self.pg0.remote_ip4n, record[8])
642 # postNATSourceIPv4Address
643 self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
646 self.assertEqual(struct.pack("!I", 0), record[234])
647 # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
648 if IP_PROTOS.icmp == ord(record[4]):
649 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
650 self.assertEqual(struct.pack("!H", self.icmp_id_out),
652 elif IP_PROTOS.tcp == ord(record[4]):
653 self.assertEqual(struct.pack("!H", self.tcp_port_in),
655 self.assertEqual(struct.pack("!H", self.tcp_port_out),
657 elif IP_PROTOS.udp == ord(record[4]):
658 self.assertEqual(struct.pack("!H", self.udp_port_in),
660 self.assertEqual(struct.pack("!H", self.udp_port_out),
663 self.fail("Invalid protocol")
664 self.assertEqual(3, nat44_ses_create_num)
665 self.assertEqual(3, nat44_ses_delete_num)
667 def verify_ipfix_addr_exhausted(self, data):
669 Verify IPFIX NAT addresses event
671 :param data: Decoded IPFIX data records
673 self.assertEqual(1, len(data))
676 self.assertEqual(ord(record[230]), 3)
678 self.assertEqual(struct.pack("!I", 0), record[283])
680 def verify_ipfix_max_sessions(self, data, limit):
682 Verify IPFIX maximum session entries exceeded event
684 :param data: Decoded IPFIX data records
685 :param limit: Number of maximum session entries that can be created.
687 self.assertEqual(1, len(data))
690 self.assertEqual(ord(record[230]), 13)
691 # natQuotaExceededEvent
692 self.assertEqual(struct.pack("I", 1), record[466])
694 self.assertEqual(struct.pack("I", limit), record[471])
696 def verify_ipfix_max_bibs(self, data, limit):
698 Verify IPFIX maximum BIB entries exceeded event
700 :param data: Decoded IPFIX data records
701 :param limit: Number of maximum BIB entries that can be created.
703 self.assertEqual(1, len(data))
706 self.assertEqual(ord(record[230]), 13)
707 # natQuotaExceededEvent
708 self.assertEqual(struct.pack("I", 2), record[466])
710 self.assertEqual(struct.pack("I", limit), record[472])
712 def verify_ipfix_max_fragments_ip6(self, data, limit, src_addr):
714 Verify IPFIX maximum IPv6 fragments pending reassembly exceeded event
716 :param data: Decoded IPFIX data records
717 :param limit: Number of maximum fragments pending reassembly
718 :param src_addr: IPv6 source address
720 self.assertEqual(1, len(data))
723 self.assertEqual(ord(record[230]), 13)
724 # natQuotaExceededEvent
725 self.assertEqual(struct.pack("I", 5), record[466])
726 # maxFragmentsPendingReassembly
727 self.assertEqual(struct.pack("I", limit), record[475])
729 self.assertEqual(src_addr, record[27])
731 def verify_ipfix_max_fragments_ip4(self, data, limit, src_addr):
733 Verify IPFIX maximum IPv4 fragments pending reassembly exceeded event
735 :param data: Decoded IPFIX data records
736 :param limit: Number of maximum fragments pending reassembly
737 :param src_addr: IPv4 source address
739 self.assertEqual(1, len(data))
742 self.assertEqual(ord(record[230]), 13)
743 # natQuotaExceededEvent
744 self.assertEqual(struct.pack("I", 5), record[466])
745 # maxFragmentsPendingReassembly
746 self.assertEqual(struct.pack("I", limit), record[475])
748 self.assertEqual(src_addr, record[8])
750 def verify_ipfix_bib(self, data, is_create, src_addr):
752 Verify IPFIX NAT64 BIB create and delete events
754 :param data: Decoded IPFIX data records
755 :param is_create: Create event if nonzero value otherwise delete event
756 :param src_addr: IPv6 source address
758 self.assertEqual(1, len(data))
762 self.assertEqual(ord(record[230]), 10)
764 self.assertEqual(ord(record[230]), 11)
766 self.assertEqual(src_addr, record[27])
767 # postNATSourceIPv4Address
768 self.assertEqual(self.nat_addr_n, record[225])
770 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
772 self.assertEqual(struct.pack("!I", 0), record[234])
773 # sourceTransportPort
774 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
775 # postNAPTSourceTransportPort
776 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
778 def verify_ipfix_nat64_ses(self, data, is_create, src_addr, dst_addr,
781 Verify IPFIX NAT64 session create and delete events
783 :param data: Decoded IPFIX data records
784 :param is_create: Create event if nonzero value otherwise delete event
785 :param src_addr: IPv6 source address
786 :param dst_addr: IPv4 destination address
787 :param dst_port: destination TCP port
789 self.assertEqual(1, len(data))
793 self.assertEqual(ord(record[230]), 6)
795 self.assertEqual(ord(record[230]), 7)
797 self.assertEqual(src_addr, record[27])
798 # destinationIPv6Address
799 self.assertEqual(socket.inet_pton(socket.AF_INET6,
800 self.compose_ip6(dst_addr,
804 # postNATSourceIPv4Address
805 self.assertEqual(self.nat_addr_n, record[225])
806 # postNATDestinationIPv4Address
807 self.assertEqual(socket.inet_pton(socket.AF_INET, dst_addr),
810 self.assertEqual(IP_PROTOS.tcp, ord(record[4]))
812 self.assertEqual(struct.pack("!I", 0), record[234])
813 # sourceTransportPort
814 self.assertEqual(struct.pack("!H", self.tcp_port_in), record[7])
815 # postNAPTSourceTransportPort
816 self.assertEqual(struct.pack("!H", self.tcp_port_out), record[227])
817 # destinationTransportPort
818 self.assertEqual(struct.pack("!H", dst_port), record[11])
819 # postNAPTDestinationTransportPort
820 self.assertEqual(struct.pack("!H", dst_port), record[228])
823 class TestNAT44(MethodHolder):
824 """ NAT44 Test Cases """
828 super(TestNAT44, cls).setUpClass()
831 cls.tcp_port_in = 6303
832 cls.tcp_port_out = 6303
833 cls.udp_port_in = 6304
834 cls.udp_port_out = 6304
835 cls.icmp_id_in = 6305
836 cls.icmp_id_out = 6305
837 cls.nat_addr = '10.0.0.3'
838 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
839 cls.ipfix_src_port = 4739
840 cls.ipfix_domain_id = 1
841 cls.tcp_external_port = 80
843 cls.create_pg_interfaces(range(10))
844 cls.interfaces = list(cls.pg_interfaces[0:4])
846 for i in cls.interfaces:
851 cls.pg0.generate_remote_hosts(3)
852 cls.pg0.configure_ipv4_neighbors()
854 cls.pg1.generate_remote_hosts(1)
855 cls.pg1.configure_ipv4_neighbors()
857 cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
858 cls.vapi.ip_table_add_del(10, is_add=1)
859 cls.vapi.ip_table_add_del(20, is_add=1)
861 cls.pg4._local_ip4 = "172.16.255.1"
862 cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
863 cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
864 cls.pg4.set_table_ip4(10)
865 cls.pg5._local_ip4 = "172.17.255.3"
866 cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
867 cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
868 cls.pg5.set_table_ip4(10)
869 cls.pg6._local_ip4 = "172.16.255.1"
870 cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
871 cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
872 cls.pg6.set_table_ip4(20)
873 for i in cls.overlapping_interfaces:
881 cls.pg9.generate_remote_hosts(2)
883 ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
884 cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
888 cls.pg9.resolve_arp()
889 cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
890 cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
891 cls.pg9.resolve_arp()
894 super(TestNAT44, cls).tearDownClass()
897 def clear_nat44(self):
899 Clear NAT44 configuration.
901 # I found no elegant way to do this
902 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
903 dst_address_length=32,
904 next_hop_address=self.pg7.remote_ip4n,
905 next_hop_sw_if_index=self.pg7.sw_if_index,
907 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
908 dst_address_length=32,
909 next_hop_address=self.pg8.remote_ip4n,
910 next_hop_sw_if_index=self.pg8.sw_if_index,
913 for intf in [self.pg7, self.pg8]:
914 neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
916 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
921 if self.pg7.has_ip4_config:
922 self.pg7.unconfig_ip4()
924 self.vapi.nat44_forwarding_enable_disable(0)
926 interfaces = self.vapi.nat44_interface_addr_dump()
927 for intf in interfaces:
928 self.vapi.nat44_add_interface_addr(intf.sw_if_index,
929 twice_nat=intf.twice_nat,
932 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
933 domain_id=self.ipfix_domain_id)
934 self.ipfix_src_port = 4739
935 self.ipfix_domain_id = 1
937 interfaces = self.vapi.nat44_interface_dump()
938 for intf in interfaces:
939 if intf.is_inside > 1:
940 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
943 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
947 interfaces = self.vapi.nat44_interface_output_feature_dump()
948 for intf in interfaces:
949 self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
953 static_mappings = self.vapi.nat44_static_mapping_dump()
954 for sm in static_mappings:
955 self.vapi.nat44_add_del_static_mapping(
957 sm.external_ip_address,
958 local_port=sm.local_port,
959 external_port=sm.external_port,
960 addr_only=sm.addr_only,
962 protocol=sm.protocol,
963 twice_nat=sm.twice_nat,
964 self_twice_nat=sm.self_twice_nat,
965 out2in_only=sm.out2in_only,
967 external_sw_if_index=sm.external_sw_if_index,
970 lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
971 for lb_sm in lb_static_mappings:
972 self.vapi.nat44_add_del_lb_static_mapping(
977 twice_nat=lb_sm.twice_nat,
978 self_twice_nat=lb_sm.self_twice_nat,
979 out2in_only=lb_sm.out2in_only,
985 identity_mappings = self.vapi.nat44_identity_mapping_dump()
986 for id_m in identity_mappings:
987 self.vapi.nat44_add_del_identity_mapping(
988 addr_only=id_m.addr_only,
991 sw_if_index=id_m.sw_if_index,
993 protocol=id_m.protocol,
996 adresses = self.vapi.nat44_address_dump()
997 for addr in adresses:
998 self.vapi.nat44_add_del_address_range(addr.ip_address,
1000 twice_nat=addr.twice_nat,
1003 self.vapi.nat_set_reass()
1004 self.vapi.nat_set_reass(is_ip6=1)
1006 def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
1007 local_port=0, external_port=0, vrf_id=0,
1008 is_add=1, external_sw_if_index=0xFFFFFFFF,
1009 proto=0, twice_nat=0, self_twice_nat=0,
1010 out2in_only=0, tag=""):
1012 Add/delete NAT44 static mapping
1014 :param local_ip: Local IP address
1015 :param external_ip: External IP address
1016 :param local_port: Local port number (Optional)
1017 :param external_port: External port number (Optional)
1018 :param vrf_id: VRF ID (Default 0)
1019 :param is_add: 1 if add, 0 if delete (Default add)
1020 :param external_sw_if_index: External interface instead of IP address
1021 :param proto: IP protocol (Mandatory if port specified)
1022 :param twice_nat: 1 if translate external host address and port
1023 :param self_twice_nat: 1 if translate external host address and port
1024 whenever external host address equals
1025 local address of internal host
1026 :param out2in_only: if 1 rule is matching only out2in direction
1027 :param tag: Opaque string tag
1030 if local_port and external_port:
1032 l_ip = socket.inet_pton(socket.AF_INET, local_ip)
1033 e_ip = socket.inet_pton(socket.AF_INET, external_ip)
1034 self.vapi.nat44_add_del_static_mapping(
1037 external_sw_if_index,
1049 def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
1051 Add/delete NAT44 address
1053 :param ip: IP address
1054 :param is_add: 1 if add, 0 if delete (Default add)
1055 :param twice_nat: twice NAT address for extenal hosts
1057 nat_addr = socket.inet_pton(socket.AF_INET, ip)
1058 self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
1060 twice_nat=twice_nat)
1062 def test_dynamic(self):
1063 """ NAT44 dynamic translation test """
1065 self.nat44_add_address(self.nat_addr)
1066 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1067 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1071 pkts = self.create_stream_in(self.pg0, self.pg1)
1072 self.pg0.add_stream(pkts)
1073 self.pg_enable_capture(self.pg_interfaces)
1075 capture = self.pg1.get_capture(len(pkts))
1076 self.verify_capture_out(capture)
1079 pkts = self.create_stream_out(self.pg1)
1080 self.pg1.add_stream(pkts)
1081 self.pg_enable_capture(self.pg_interfaces)
1083 capture = self.pg0.get_capture(len(pkts))
1084 self.verify_capture_in(capture, self.pg0)
1086 def test_dynamic_icmp_errors_in2out_ttl_1(self):
1087 """ NAT44 handling of client packets with TTL=1 """
1089 self.nat44_add_address(self.nat_addr)
1090 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1091 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1094 # Client side - generate traffic
1095 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1096 self.pg0.add_stream(pkts)
1097 self.pg_enable_capture(self.pg_interfaces)
1100 # Client side - verify ICMP type 11 packets
1101 capture = self.pg0.get_capture(len(pkts))
1102 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1104 def test_dynamic_icmp_errors_out2in_ttl_1(self):
1105 """ NAT44 handling of server packets with TTL=1 """
1107 self.nat44_add_address(self.nat_addr)
1108 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1109 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1112 # Client side - create sessions
1113 pkts = self.create_stream_in(self.pg0, self.pg1)
1114 self.pg0.add_stream(pkts)
1115 self.pg_enable_capture(self.pg_interfaces)
1118 # Server side - generate traffic
1119 capture = self.pg1.get_capture(len(pkts))
1120 self.verify_capture_out(capture)
1121 pkts = self.create_stream_out(self.pg1, ttl=1)
1122 self.pg1.add_stream(pkts)
1123 self.pg_enable_capture(self.pg_interfaces)
1126 # Server side - verify ICMP type 11 packets
1127 capture = self.pg1.get_capture(len(pkts))
1128 self.verify_capture_out_with_icmp_errors(capture,
1129 src_ip=self.pg1.local_ip4)
1131 def test_dynamic_icmp_errors_in2out_ttl_2(self):
1132 """ NAT44 handling of error responses to client packets with TTL=2 """
1134 self.nat44_add_address(self.nat_addr)
1135 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1136 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1139 # Client side - generate traffic
1140 pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1141 self.pg0.add_stream(pkts)
1142 self.pg_enable_capture(self.pg_interfaces)
1145 # Server side - simulate ICMP type 11 response
1146 capture = self.pg1.get_capture(len(pkts))
1147 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1148 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1149 ICMP(type=11) / packet[IP] for packet in capture]
1150 self.pg1.add_stream(pkts)
1151 self.pg_enable_capture(self.pg_interfaces)
1154 # Client side - verify ICMP type 11 packets
1155 capture = self.pg0.get_capture(len(pkts))
1156 self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1158 def test_dynamic_icmp_errors_out2in_ttl_2(self):
1159 """ NAT44 handling of error responses to server packets with TTL=2 """
1161 self.nat44_add_address(self.nat_addr)
1162 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1163 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1166 # Client side - create sessions
1167 pkts = self.create_stream_in(self.pg0, self.pg1)
1168 self.pg0.add_stream(pkts)
1169 self.pg_enable_capture(self.pg_interfaces)
1172 # Server side - generate traffic
1173 capture = self.pg1.get_capture(len(pkts))
1174 self.verify_capture_out(capture)
1175 pkts = self.create_stream_out(self.pg1, ttl=2)
1176 self.pg1.add_stream(pkts)
1177 self.pg_enable_capture(self.pg_interfaces)
1180 # Client side - simulate ICMP type 11 response
1181 capture = self.pg0.get_capture(len(pkts))
1182 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1183 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1184 ICMP(type=11) / packet[IP] for packet in capture]
1185 self.pg0.add_stream(pkts)
1186 self.pg_enable_capture(self.pg_interfaces)
1189 # Server side - verify ICMP type 11 packets
1190 capture = self.pg1.get_capture(len(pkts))
1191 self.verify_capture_out_with_icmp_errors(capture)
1193 def test_ping_out_interface_from_outside(self):
1194 """ Ping NAT44 out interface from outside network """
1196 self.nat44_add_address(self.nat_addr)
1197 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1198 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1201 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1202 IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1203 ICMP(id=self.icmp_id_out, type='echo-request'))
1205 self.pg1.add_stream(pkts)
1206 self.pg_enable_capture(self.pg_interfaces)
1208 capture = self.pg1.get_capture(len(pkts))
1209 self.assertEqual(1, len(capture))
1212 self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1213 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1214 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1215 self.assertEqual(packet[ICMP].type, 0) # echo reply
1217 self.logger.error(ppp("Unexpected or invalid packet "
1218 "(outside network):", packet))
1221 def test_ping_internal_host_from_outside(self):
1222 """ Ping internal host from outside network """
1224 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1225 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1226 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1230 pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1231 IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1232 ICMP(id=self.icmp_id_out, type='echo-request'))
1233 self.pg1.add_stream(pkt)
1234 self.pg_enable_capture(self.pg_interfaces)
1236 capture = self.pg0.get_capture(1)
1237 self.verify_capture_in(capture, self.pg0, packet_num=1)
1238 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1241 pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1242 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1243 ICMP(id=self.icmp_id_in, type='echo-reply'))
1244 self.pg0.add_stream(pkt)
1245 self.pg_enable_capture(self.pg_interfaces)
1247 capture = self.pg1.get_capture(1)
1248 self.verify_capture_out(capture, same_port=True, packet_num=1)
1249 self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1251 def test_forwarding(self):
1252 """ NAT44 forwarding test """
1254 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1255 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1257 self.vapi.nat44_forwarding_enable_disable(1)
1259 real_ip = self.pg0.remote_ip4n
1260 alias_ip = self.nat_addr_n
1261 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1262 external_ip=alias_ip)
1265 # in2out - static mapping match
1267 pkts = self.create_stream_out(self.pg1)
1268 self.pg1.add_stream(pkts)
1269 self.pg_enable_capture(self.pg_interfaces)
1271 capture = self.pg0.get_capture(len(pkts))
1272 self.verify_capture_in(capture, self.pg0)
1274 pkts = self.create_stream_in(self.pg0, self.pg1)
1275 self.pg0.add_stream(pkts)
1276 self.pg_enable_capture(self.pg_interfaces)
1278 capture = self.pg1.get_capture(len(pkts))
1279 self.verify_capture_out(capture, same_port=True)
1281 # in2out - no static mapping match
1283 host0 = self.pg0.remote_hosts[0]
1284 self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1286 pkts = self.create_stream_out(self.pg1,
1287 dst_ip=self.pg0.remote_ip4,
1288 use_inside_ports=True)
1289 self.pg1.add_stream(pkts)
1290 self.pg_enable_capture(self.pg_interfaces)
1292 capture = self.pg0.get_capture(len(pkts))
1293 self.verify_capture_in(capture, self.pg0)
1295 pkts = self.create_stream_in(self.pg0, self.pg1)
1296 self.pg0.add_stream(pkts)
1297 self.pg_enable_capture(self.pg_interfaces)
1299 capture = self.pg1.get_capture(len(pkts))
1300 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1303 self.pg0.remote_hosts[0] = host0
1305 user = self.pg0.remote_hosts[1]
1306 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1307 self.assertEqual(len(sessions), 3)
1308 self.assertTrue(sessions[0].ext_host_valid)
1309 self.vapi.nat44_del_session(
1310 sessions[0].inside_ip_address,
1311 sessions[0].inside_port,
1312 sessions[0].protocol,
1313 ext_host_address=sessions[0].ext_host_address,
1314 ext_host_port=sessions[0].ext_host_port)
1315 sessions = self.vapi.nat44_user_session_dump(user.ip4n, 0)
1316 self.assertEqual(len(sessions), 2)
1319 self.vapi.nat44_forwarding_enable_disable(0)
1320 self.vapi.nat44_add_del_static_mapping(local_ip=real_ip,
1321 external_ip=alias_ip,
1324 def test_static_in(self):
1325 """ 1:1 NAT initialized from inside network """
1327 nat_ip = "10.0.0.10"
1328 self.tcp_port_out = 6303
1329 self.udp_port_out = 6304
1330 self.icmp_id_out = 6305
1332 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1333 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1334 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1336 sm = self.vapi.nat44_static_mapping_dump()
1337 self.assertEqual(len(sm), 1)
1338 self.assertEqual((sm[0].tag).split('\0', 1)[0], '')
1339 self.assertEqual(sm[0].protocol, 0)
1340 self.assertEqual(sm[0].local_port, 0)
1341 self.assertEqual(sm[0].external_port, 0)
1344 pkts = self.create_stream_in(self.pg0, self.pg1)
1345 self.pg0.add_stream(pkts)
1346 self.pg_enable_capture(self.pg_interfaces)
1348 capture = self.pg1.get_capture(len(pkts))
1349 self.verify_capture_out(capture, nat_ip, True)
1352 pkts = self.create_stream_out(self.pg1, nat_ip)
1353 self.pg1.add_stream(pkts)
1354 self.pg_enable_capture(self.pg_interfaces)
1356 capture = self.pg0.get_capture(len(pkts))
1357 self.verify_capture_in(capture, self.pg0)
1359 def test_static_out(self):
1360 """ 1:1 NAT initialized from outside network """
1362 nat_ip = "10.0.0.20"
1363 self.tcp_port_out = 6303
1364 self.udp_port_out = 6304
1365 self.icmp_id_out = 6305
1368 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1369 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1370 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1372 sm = self.vapi.nat44_static_mapping_dump()
1373 self.assertEqual(len(sm), 1)
1374 self.assertEqual((sm[0].tag).split('\0', 1)[0], tag)
1377 pkts = self.create_stream_out(self.pg1, nat_ip)
1378 self.pg1.add_stream(pkts)
1379 self.pg_enable_capture(self.pg_interfaces)
1381 capture = self.pg0.get_capture(len(pkts))
1382 self.verify_capture_in(capture, self.pg0)
1385 pkts = self.create_stream_in(self.pg0, self.pg1)
1386 self.pg0.add_stream(pkts)
1387 self.pg_enable_capture(self.pg_interfaces)
1389 capture = self.pg1.get_capture(len(pkts))
1390 self.verify_capture_out(capture, nat_ip, True)
1392 def test_static_with_port_in(self):
1393 """ 1:1 NAPT initialized from inside network """
1395 self.tcp_port_out = 3606
1396 self.udp_port_out = 3607
1397 self.icmp_id_out = 3608
1399 self.nat44_add_address(self.nat_addr)
1400 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1401 self.tcp_port_in, self.tcp_port_out,
1402 proto=IP_PROTOS.tcp)
1403 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1404 self.udp_port_in, self.udp_port_out,
1405 proto=IP_PROTOS.udp)
1406 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1407 self.icmp_id_in, self.icmp_id_out,
1408 proto=IP_PROTOS.icmp)
1409 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1410 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1414 pkts = self.create_stream_in(self.pg0, self.pg1)
1415 self.pg0.add_stream(pkts)
1416 self.pg_enable_capture(self.pg_interfaces)
1418 capture = self.pg1.get_capture(len(pkts))
1419 self.verify_capture_out(capture)
1422 pkts = self.create_stream_out(self.pg1)
1423 self.pg1.add_stream(pkts)
1424 self.pg_enable_capture(self.pg_interfaces)
1426 capture = self.pg0.get_capture(len(pkts))
1427 self.verify_capture_in(capture, self.pg0)
1429 def test_static_with_port_out(self):
1430 """ 1:1 NAPT initialized from outside network """
1432 self.tcp_port_out = 30606
1433 self.udp_port_out = 30607
1434 self.icmp_id_out = 30608
1436 self.nat44_add_address(self.nat_addr)
1437 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1438 self.tcp_port_in, self.tcp_port_out,
1439 proto=IP_PROTOS.tcp)
1440 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1441 self.udp_port_in, self.udp_port_out,
1442 proto=IP_PROTOS.udp)
1443 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1444 self.icmp_id_in, self.icmp_id_out,
1445 proto=IP_PROTOS.icmp)
1446 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1447 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1451 pkts = self.create_stream_out(self.pg1)
1452 self.pg1.add_stream(pkts)
1453 self.pg_enable_capture(self.pg_interfaces)
1455 capture = self.pg0.get_capture(len(pkts))
1456 self.verify_capture_in(capture, self.pg0)
1459 pkts = self.create_stream_in(self.pg0, self.pg1)
1460 self.pg0.add_stream(pkts)
1461 self.pg_enable_capture(self.pg_interfaces)
1463 capture = self.pg1.get_capture(len(pkts))
1464 self.verify_capture_out(capture)
1466 def test_static_with_port_out2(self):
1467 """ 1:1 NAPT symmetrical rule """
1472 self.vapi.nat44_forwarding_enable_disable(1)
1473 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1474 local_port, external_port,
1475 proto=IP_PROTOS.tcp, out2in_only=1)
1476 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1477 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1480 # from client to service
1481 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1482 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1483 TCP(sport=12345, dport=external_port))
1484 self.pg1.add_stream(p)
1485 self.pg_enable_capture(self.pg_interfaces)
1487 capture = self.pg0.get_capture(1)
1492 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1493 self.assertEqual(tcp.dport, local_port)
1494 self.assert_packet_checksums_valid(p)
1496 self.logger.error(ppp("Unexpected or invalid packet:", p))
1500 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1501 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1502 ICMP(type=11) / capture[0][IP])
1503 self.pg0.add_stream(p)
1504 self.pg_enable_capture(self.pg_interfaces)
1506 capture = self.pg1.get_capture(1)
1509 self.assertEqual(p[IP].src, self.nat_addr)
1511 self.assertEqual(inner.dst, self.nat_addr)
1512 self.assertEqual(inner[TCPerror].dport, external_port)
1514 self.logger.error(ppp("Unexpected or invalid packet:", p))
1517 # from service back to client
1518 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1519 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1520 TCP(sport=local_port, dport=12345))
1521 self.pg0.add_stream(p)
1522 self.pg_enable_capture(self.pg_interfaces)
1524 capture = self.pg1.get_capture(1)
1529 self.assertEqual(ip.src, self.nat_addr)
1530 self.assertEqual(tcp.sport, external_port)
1531 self.assert_packet_checksums_valid(p)
1533 self.logger.error(ppp("Unexpected or invalid packet:", p))
1537 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1538 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1539 ICMP(type=11) / capture[0][IP])
1540 self.pg1.add_stream(p)
1541 self.pg_enable_capture(self.pg_interfaces)
1543 capture = self.pg0.get_capture(1)
1546 self.assertEqual(p[IP].dst, self.pg0.remote_ip4)
1548 self.assertEqual(inner.src, self.pg0.remote_ip4)
1549 self.assertEqual(inner[TCPerror].sport, local_port)
1551 self.logger.error(ppp("Unexpected or invalid packet:", p))
1554 # from client to server (no translation)
1555 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1556 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1557 TCP(sport=12346, dport=local_port))
1558 self.pg1.add_stream(p)
1559 self.pg_enable_capture(self.pg_interfaces)
1561 capture = self.pg0.get_capture(1)
1566 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1567 self.assertEqual(tcp.dport, local_port)
1568 self.assert_packet_checksums_valid(p)
1570 self.logger.error(ppp("Unexpected or invalid packet:", p))
1573 # from service back to client (no translation)
1574 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1575 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1576 TCP(sport=local_port, dport=12346))
1577 self.pg0.add_stream(p)
1578 self.pg_enable_capture(self.pg_interfaces)
1580 capture = self.pg1.get_capture(1)
1585 self.assertEqual(ip.src, self.pg0.remote_ip4)
1586 self.assertEqual(tcp.sport, local_port)
1587 self.assert_packet_checksums_valid(p)
1589 self.logger.error(ppp("Unexpected or invalid packet:", p))
1592 def test_static_vrf_aware(self):
1593 """ 1:1 NAT VRF awareness """
1595 nat_ip1 = "10.0.0.30"
1596 nat_ip2 = "10.0.0.40"
1597 self.tcp_port_out = 6303
1598 self.udp_port_out = 6304
1599 self.icmp_id_out = 6305
1601 self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1603 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1605 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1607 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1608 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1610 # inside interface VRF match NAT44 static mapping VRF
1611 pkts = self.create_stream_in(self.pg4, self.pg3)
1612 self.pg4.add_stream(pkts)
1613 self.pg_enable_capture(self.pg_interfaces)
1615 capture = self.pg3.get_capture(len(pkts))
1616 self.verify_capture_out(capture, nat_ip1, True)
1618 # inside interface VRF don't match NAT44 static mapping VRF (packets
1620 pkts = self.create_stream_in(self.pg0, self.pg3)
1621 self.pg0.add_stream(pkts)
1622 self.pg_enable_capture(self.pg_interfaces)
1624 self.pg3.assert_nothing_captured()
1626 def test_dynamic_to_static(self):
1627 """ Switch from dynamic translation to 1:1NAT """
1628 nat_ip = "10.0.0.10"
1629 self.tcp_port_out = 6303
1630 self.udp_port_out = 6304
1631 self.icmp_id_out = 6305
1633 self.nat44_add_address(self.nat_addr)
1634 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1635 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1639 pkts = self.create_stream_in(self.pg0, self.pg1)
1640 self.pg0.add_stream(pkts)
1641 self.pg_enable_capture(self.pg_interfaces)
1643 capture = self.pg1.get_capture(len(pkts))
1644 self.verify_capture_out(capture)
1647 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1648 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
1649 self.assertEqual(len(sessions), 0)
1650 pkts = self.create_stream_in(self.pg0, self.pg1)
1651 self.pg0.add_stream(pkts)
1652 self.pg_enable_capture(self.pg_interfaces)
1654 capture = self.pg1.get_capture(len(pkts))
1655 self.verify_capture_out(capture, nat_ip, True)
1657 def test_identity_nat(self):
1658 """ Identity NAT """
1660 self.vapi.nat44_add_del_identity_mapping(ip=self.pg0.remote_ip4n)
1661 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1662 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1665 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1666 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1667 TCP(sport=12345, dport=56789))
1668 self.pg1.add_stream(p)
1669 self.pg_enable_capture(self.pg_interfaces)
1671 capture = self.pg0.get_capture(1)
1676 self.assertEqual(ip.dst, self.pg0.remote_ip4)
1677 self.assertEqual(ip.src, self.pg1.remote_ip4)
1678 self.assertEqual(tcp.dport, 56789)
1679 self.assertEqual(tcp.sport, 12345)
1680 self.assert_packet_checksums_valid(p)
1682 self.logger.error(ppp("Unexpected or invalid packet:", p))
1685 def test_static_lb(self):
1686 """ NAT44 local service load balancing """
1687 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1690 server1 = self.pg0.remote_hosts[0]
1691 server2 = self.pg0.remote_hosts[1]
1693 locals = [{'addr': server1.ip4n,
1696 {'addr': server2.ip4n,
1700 self.nat44_add_address(self.nat_addr)
1701 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1704 local_num=len(locals),
1706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1710 # from client to service
1711 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1712 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1713 TCP(sport=12345, dport=external_port))
1714 self.pg1.add_stream(p)
1715 self.pg_enable_capture(self.pg_interfaces)
1717 capture = self.pg0.get_capture(1)
1723 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1724 if ip.dst == server1.ip4:
1728 self.assertEqual(tcp.dport, local_port)
1729 self.assert_packet_checksums_valid(p)
1731 self.logger.error(ppp("Unexpected or invalid packet:", p))
1734 # from service back to client
1735 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1736 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1737 TCP(sport=local_port, dport=12345))
1738 self.pg0.add_stream(p)
1739 self.pg_enable_capture(self.pg_interfaces)
1741 capture = self.pg1.get_capture(1)
1746 self.assertEqual(ip.src, self.nat_addr)
1747 self.assertEqual(tcp.sport, external_port)
1748 self.assert_packet_checksums_valid(p)
1750 self.logger.error(ppp("Unexpected or invalid packet:", p))
1753 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
1754 self.assertEqual(len(sessions), 1)
1755 self.assertTrue(sessions[0].ext_host_valid)
1756 self.vapi.nat44_del_session(
1757 sessions[0].inside_ip_address,
1758 sessions[0].inside_port,
1759 sessions[0].protocol,
1760 ext_host_address=sessions[0].ext_host_address,
1761 ext_host_port=sessions[0].ext_host_port)
1762 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
1763 self.assertEqual(len(sessions), 0)
1765 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
1766 def test_static_lb_multi_clients(self):
1767 """ NAT44 local service load balancing - multiple clients"""
1769 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1772 server1 = self.pg0.remote_hosts[0]
1773 server2 = self.pg0.remote_hosts[1]
1775 locals = [{'addr': server1.ip4n,
1778 {'addr': server2.ip4n,
1782 self.nat44_add_address(self.nat_addr)
1783 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1786 local_num=len(locals),
1788 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1789 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1794 clients = ip4_range(self.pg1.remote_ip4, 10, 50)
1796 for client in clients:
1797 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1798 IP(src=client, dst=self.nat_addr) /
1799 TCP(sport=12345, dport=external_port))
1801 self.pg1.add_stream(pkts)
1802 self.pg_enable_capture(self.pg_interfaces)
1804 capture = self.pg0.get_capture(len(pkts))
1806 if p[IP].dst == server1.ip4:
1810 self.assertTrue(server1_n > server2_n)
1812 def test_static_lb_2(self):
1813 """ NAT44 local service load balancing (asymmetrical rule) """
1814 external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1817 server1 = self.pg0.remote_hosts[0]
1818 server2 = self.pg0.remote_hosts[1]
1820 locals = [{'addr': server1.ip4n,
1823 {'addr': server2.ip4n,
1827 self.vapi.nat44_forwarding_enable_disable(1)
1828 self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1832 local_num=len(locals),
1834 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1835 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1838 # from client to service
1839 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1840 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1841 TCP(sport=12345, dport=external_port))
1842 self.pg1.add_stream(p)
1843 self.pg_enable_capture(self.pg_interfaces)
1845 capture = self.pg0.get_capture(1)
1851 self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1852 if ip.dst == server1.ip4:
1856 self.assertEqual(tcp.dport, local_port)
1857 self.assert_packet_checksums_valid(p)
1859 self.logger.error(ppp("Unexpected or invalid packet:", p))
1862 # from service back to client
1863 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1864 IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1865 TCP(sport=local_port, dport=12345))
1866 self.pg0.add_stream(p)
1867 self.pg_enable_capture(self.pg_interfaces)
1869 capture = self.pg1.get_capture(1)
1874 self.assertEqual(ip.src, self.nat_addr)
1875 self.assertEqual(tcp.sport, external_port)
1876 self.assert_packet_checksums_valid(p)
1878 self.logger.error(ppp("Unexpected or invalid packet:", p))
1881 # from client to server (no translation)
1882 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1883 IP(src=self.pg1.remote_ip4, dst=server1.ip4) /
1884 TCP(sport=12346, dport=local_port))
1885 self.pg1.add_stream(p)
1886 self.pg_enable_capture(self.pg_interfaces)
1888 capture = self.pg0.get_capture(1)
1894 self.assertEqual(ip.dst, server1.ip4)
1895 self.assertEqual(tcp.dport, local_port)
1896 self.assert_packet_checksums_valid(p)
1898 self.logger.error(ppp("Unexpected or invalid packet:", p))
1901 # from service back to client (no translation)
1902 p = (Ether(src=server1.mac, dst=self.pg0.local_mac) /
1903 IP(src=server1.ip4, dst=self.pg1.remote_ip4) /
1904 TCP(sport=local_port, dport=12346))
1905 self.pg0.add_stream(p)
1906 self.pg_enable_capture(self.pg_interfaces)
1908 capture = self.pg1.get_capture(1)
1913 self.assertEqual(ip.src, server1.ip4)
1914 self.assertEqual(tcp.sport, local_port)
1915 self.assert_packet_checksums_valid(p)
1917 self.logger.error(ppp("Unexpected or invalid packet:", p))
1920 def test_multiple_inside_interfaces(self):
1921 """ NAT44 multiple non-overlapping address space inside interfaces """
1923 self.nat44_add_address(self.nat_addr)
1924 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1925 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1926 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1929 # between two NAT44 inside interfaces (no translation)
1930 pkts = self.create_stream_in(self.pg0, self.pg1)
1931 self.pg0.add_stream(pkts)
1932 self.pg_enable_capture(self.pg_interfaces)
1934 capture = self.pg1.get_capture(len(pkts))
1935 self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1937 # from NAT44 inside to interface without NAT44 feature (no translation)
1938 pkts = self.create_stream_in(self.pg0, self.pg2)
1939 self.pg0.add_stream(pkts)
1940 self.pg_enable_capture(self.pg_interfaces)
1942 capture = self.pg2.get_capture(len(pkts))
1943 self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1945 # in2out 1st interface
1946 pkts = self.create_stream_in(self.pg0, self.pg3)
1947 self.pg0.add_stream(pkts)
1948 self.pg_enable_capture(self.pg_interfaces)
1950 capture = self.pg3.get_capture(len(pkts))
1951 self.verify_capture_out(capture)
1953 # out2in 1st interface
1954 pkts = self.create_stream_out(self.pg3)
1955 self.pg3.add_stream(pkts)
1956 self.pg_enable_capture(self.pg_interfaces)
1958 capture = self.pg0.get_capture(len(pkts))
1959 self.verify_capture_in(capture, self.pg0)
1961 # in2out 2nd interface
1962 pkts = self.create_stream_in(self.pg1, self.pg3)
1963 self.pg1.add_stream(pkts)
1964 self.pg_enable_capture(self.pg_interfaces)
1966 capture = self.pg3.get_capture(len(pkts))
1967 self.verify_capture_out(capture)
1969 # out2in 2nd interface
1970 pkts = self.create_stream_out(self.pg3)
1971 self.pg3.add_stream(pkts)
1972 self.pg_enable_capture(self.pg_interfaces)
1974 capture = self.pg1.get_capture(len(pkts))
1975 self.verify_capture_in(capture, self.pg1)
1977 def test_inside_overlapping_interfaces(self):
1978 """ NAT44 multiple inside interfaces with overlapping address space """
1980 static_nat_ip = "10.0.0.10"
1981 self.nat44_add_address(self.nat_addr)
1982 self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1984 self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1985 self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1986 self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1987 self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1990 # between NAT44 inside interfaces with same VRF (no translation)
1991 pkts = self.create_stream_in(self.pg4, self.pg5)
1992 self.pg4.add_stream(pkts)
1993 self.pg_enable_capture(self.pg_interfaces)
1995 capture = self.pg5.get_capture(len(pkts))
1996 self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1998 # between NAT44 inside interfaces with different VRF (hairpinning)
1999 p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
2000 IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
2001 TCP(sport=1234, dport=5678))
2002 self.pg4.add_stream(p)
2003 self.pg_enable_capture(self.pg_interfaces)
2005 capture = self.pg6.get_capture(1)
2010 self.assertEqual(ip.src, self.nat_addr)
2011 self.assertEqual(ip.dst, self.pg6.remote_ip4)
2012 self.assertNotEqual(tcp.sport, 1234)
2013 self.assertEqual(tcp.dport, 5678)
2015 self.logger.error(ppp("Unexpected or invalid packet:", p))
2018 # in2out 1st interface
2019 pkts = self.create_stream_in(self.pg4, self.pg3)
2020 self.pg4.add_stream(pkts)
2021 self.pg_enable_capture(self.pg_interfaces)
2023 capture = self.pg3.get_capture(len(pkts))
2024 self.verify_capture_out(capture)
2026 # out2in 1st interface
2027 pkts = self.create_stream_out(self.pg3)
2028 self.pg3.add_stream(pkts)
2029 self.pg_enable_capture(self.pg_interfaces)
2031 capture = self.pg4.get_capture(len(pkts))
2032 self.verify_capture_in(capture, self.pg4)
2034 # in2out 2nd interface
2035 pkts = self.create_stream_in(self.pg5, self.pg3)
2036 self.pg5.add_stream(pkts)
2037 self.pg_enable_capture(self.pg_interfaces)
2039 capture = self.pg3.get_capture(len(pkts))
2040 self.verify_capture_out(capture)
2042 # out2in 2nd interface
2043 pkts = self.create_stream_out(self.pg3)
2044 self.pg3.add_stream(pkts)
2045 self.pg_enable_capture(self.pg_interfaces)
2047 capture = self.pg5.get_capture(len(pkts))
2048 self.verify_capture_in(capture, self.pg5)
2051 addresses = self.vapi.nat44_address_dump()
2052 self.assertEqual(len(addresses), 1)
2053 sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
2054 self.assertEqual(len(sessions), 3)
2055 for session in sessions:
2056 self.assertFalse(session.is_static)
2057 self.assertEqual(session.inside_ip_address[0:4],
2058 self.pg5.remote_ip4n)
2059 self.assertEqual(session.outside_ip_address,
2060 addresses[0].ip_address)
2061 self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
2062 self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
2063 self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
2064 self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
2065 self.assertEqual(sessions[1].inside_port, self.udp_port_in)
2066 self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
2067 self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
2068 self.assertEqual(sessions[1].outside_port, self.udp_port_out)
2069 self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
2071 # in2out 3rd interface
2072 pkts = self.create_stream_in(self.pg6, self.pg3)
2073 self.pg6.add_stream(pkts)
2074 self.pg_enable_capture(self.pg_interfaces)
2076 capture = self.pg3.get_capture(len(pkts))
2077 self.verify_capture_out(capture, static_nat_ip, True)
2079 # out2in 3rd interface
2080 pkts = self.create_stream_out(self.pg3, static_nat_ip)
2081 self.pg3.add_stream(pkts)
2082 self.pg_enable_capture(self.pg_interfaces)
2084 capture = self.pg6.get_capture(len(pkts))
2085 self.verify_capture_in(capture, self.pg6)
2087 # general user and session dump verifications
2088 users = self.vapi.nat44_user_dump()
2089 self.assertTrue(len(users) >= 3)
2090 addresses = self.vapi.nat44_address_dump()
2091 self.assertEqual(len(addresses), 1)
2093 sessions = self.vapi.nat44_user_session_dump(user.ip_address,
2095 for session in sessions:
2096 self.assertEqual(user.ip_address, session.inside_ip_address)
2097 self.assertTrue(session.total_bytes > session.total_pkts > 0)
2098 self.assertTrue(session.protocol in
2099 [IP_PROTOS.tcp, IP_PROTOS.udp,
2101 self.assertFalse(session.ext_host_valid)
2104 sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
2105 self.assertTrue(len(sessions) >= 4)
2106 for session in sessions:
2107 self.assertFalse(session.is_static)
2108 self.assertEqual(session.inside_ip_address[0:4],
2109 self.pg4.remote_ip4n)
2110 self.assertEqual(session.outside_ip_address,
2111 addresses[0].ip_address)
2114 sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
2115 self.assertTrue(len(sessions) >= 3)
2116 for session in sessions:
2117 self.assertTrue(session.is_static)
2118 self.assertEqual(session.inside_ip_address[0:4],
2119 self.pg6.remote_ip4n)
2120 self.assertEqual(map(ord, session.outside_ip_address[0:4]),
2121 map(int, static_nat_ip.split('.')))
2122 self.assertTrue(session.inside_port in
2123 [self.tcp_port_in, self.udp_port_in,
2126 def test_hairpinning(self):
2127 """ NAT44 hairpinning - 1:1 NAPT """
2129 host = self.pg0.remote_hosts[0]
2130 server = self.pg0.remote_hosts[1]
2133 server_in_port = 5678
2134 server_out_port = 8765
2136 self.nat44_add_address(self.nat_addr)
2137 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2138 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2140 # add static mapping for server
2141 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2142 server_in_port, server_out_port,
2143 proto=IP_PROTOS.tcp)
2145 # send packet from host to server
2146 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2147 IP(src=host.ip4, dst=self.nat_addr) /
2148 TCP(sport=host_in_port, dport=server_out_port))
2149 self.pg0.add_stream(p)
2150 self.pg_enable_capture(self.pg_interfaces)
2152 capture = self.pg0.get_capture(1)
2157 self.assertEqual(ip.src, self.nat_addr)
2158 self.assertEqual(ip.dst, server.ip4)
2159 self.assertNotEqual(tcp.sport, host_in_port)
2160 self.assertEqual(tcp.dport, server_in_port)
2161 self.assert_packet_checksums_valid(p)
2162 host_out_port = tcp.sport
2164 self.logger.error(ppp("Unexpected or invalid packet:", p))
2167 # send reply from server to host
2168 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2169 IP(src=server.ip4, dst=self.nat_addr) /
2170 TCP(sport=server_in_port, dport=host_out_port))
2171 self.pg0.add_stream(p)
2172 self.pg_enable_capture(self.pg_interfaces)
2174 capture = self.pg0.get_capture(1)
2179 self.assertEqual(ip.src, self.nat_addr)
2180 self.assertEqual(ip.dst, host.ip4)
2181 self.assertEqual(tcp.sport, server_out_port)
2182 self.assertEqual(tcp.dport, host_in_port)
2183 self.assert_packet_checksums_valid(p)
2185 self.logger.error(ppp("Unexpected or invalid packet:", p))
2188 def test_hairpinning2(self):
2189 """ NAT44 hairpinning - 1:1 NAT"""
2191 server1_nat_ip = "10.0.0.10"
2192 server2_nat_ip = "10.0.0.11"
2193 host = self.pg0.remote_hosts[0]
2194 server1 = self.pg0.remote_hosts[1]
2195 server2 = self.pg0.remote_hosts[2]
2196 server_tcp_port = 22
2197 server_udp_port = 20
2199 self.nat44_add_address(self.nat_addr)
2200 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2201 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2204 # add static mapping for servers
2205 self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
2206 self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
2210 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2211 IP(src=host.ip4, dst=server1_nat_ip) /
2212 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2214 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2215 IP(src=host.ip4, dst=server1_nat_ip) /
2216 UDP(sport=self.udp_port_in, dport=server_udp_port))
2218 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2219 IP(src=host.ip4, dst=server1_nat_ip) /
2220 ICMP(id=self.icmp_id_in, type='echo-request'))
2222 self.pg0.add_stream(pkts)
2223 self.pg_enable_capture(self.pg_interfaces)
2225 capture = self.pg0.get_capture(len(pkts))
2226 for packet in capture:
2228 self.assertEqual(packet[IP].src, self.nat_addr)
2229 self.assertEqual(packet[IP].dst, server1.ip4)
2230 if packet.haslayer(TCP):
2231 self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
2232 self.assertEqual(packet[TCP].dport, server_tcp_port)
2233 self.tcp_port_out = packet[TCP].sport
2234 self.assert_packet_checksums_valid(packet)
2235 elif packet.haslayer(UDP):
2236 self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
2237 self.assertEqual(packet[UDP].dport, server_udp_port)
2238 self.udp_port_out = packet[UDP].sport
2240 self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
2241 self.icmp_id_out = packet[ICMP].id
2243 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2248 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2249 IP(src=server1.ip4, dst=self.nat_addr) /
2250 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2252 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2253 IP(src=server1.ip4, dst=self.nat_addr) /
2254 UDP(sport=server_udp_port, dport=self.udp_port_out))
2256 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2257 IP(src=server1.ip4, dst=self.nat_addr) /
2258 ICMP(id=self.icmp_id_out, type='echo-reply'))
2260 self.pg0.add_stream(pkts)
2261 self.pg_enable_capture(self.pg_interfaces)
2263 capture = self.pg0.get_capture(len(pkts))
2264 for packet in capture:
2266 self.assertEqual(packet[IP].src, server1_nat_ip)
2267 self.assertEqual(packet[IP].dst, host.ip4)
2268 if packet.haslayer(TCP):
2269 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2270 self.assertEqual(packet[TCP].sport, server_tcp_port)
2271 self.assert_packet_checksums_valid(packet)
2272 elif packet.haslayer(UDP):
2273 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2274 self.assertEqual(packet[UDP].sport, server_udp_port)
2276 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2278 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2281 # server2 to server1
2283 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2284 IP(src=server2.ip4, dst=server1_nat_ip) /
2285 TCP(sport=self.tcp_port_in, dport=server_tcp_port))
2287 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2288 IP(src=server2.ip4, dst=server1_nat_ip) /
2289 UDP(sport=self.udp_port_in, dport=server_udp_port))
2291 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2292 IP(src=server2.ip4, dst=server1_nat_ip) /
2293 ICMP(id=self.icmp_id_in, type='echo-request'))
2295 self.pg0.add_stream(pkts)
2296 self.pg_enable_capture(self.pg_interfaces)
2298 capture = self.pg0.get_capture(len(pkts))
2299 for packet in capture:
2301 self.assertEqual(packet[IP].src, server2_nat_ip)
2302 self.assertEqual(packet[IP].dst, server1.ip4)
2303 if packet.haslayer(TCP):
2304 self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2305 self.assertEqual(packet[TCP].dport, server_tcp_port)
2306 self.tcp_port_out = packet[TCP].sport
2307 self.assert_packet_checksums_valid(packet)
2308 elif packet.haslayer(UDP):
2309 self.assertEqual(packet[UDP].sport, self.udp_port_in)
2310 self.assertEqual(packet[UDP].dport, server_udp_port)
2311 self.udp_port_out = packet[UDP].sport
2313 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2314 self.icmp_id_out = packet[ICMP].id
2316 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2319 # server1 to server2
2321 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2322 IP(src=server1.ip4, dst=server2_nat_ip) /
2323 TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2325 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2326 IP(src=server1.ip4, dst=server2_nat_ip) /
2327 UDP(sport=server_udp_port, dport=self.udp_port_out))
2329 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2330 IP(src=server1.ip4, dst=server2_nat_ip) /
2331 ICMP(id=self.icmp_id_out, type='echo-reply'))
2333 self.pg0.add_stream(pkts)
2334 self.pg_enable_capture(self.pg_interfaces)
2336 capture = self.pg0.get_capture(len(pkts))
2337 for packet in capture:
2339 self.assertEqual(packet[IP].src, server1_nat_ip)
2340 self.assertEqual(packet[IP].dst, server2.ip4)
2341 if packet.haslayer(TCP):
2342 self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2343 self.assertEqual(packet[TCP].sport, server_tcp_port)
2344 self.assert_packet_checksums_valid(packet)
2345 elif packet.haslayer(UDP):
2346 self.assertEqual(packet[UDP].dport, self.udp_port_in)
2347 self.assertEqual(packet[UDP].sport, server_udp_port)
2349 self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2351 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2354 def test_max_translations_per_user(self):
2355 """ MAX translations per user - recycle the least recently used """
2357 self.nat44_add_address(self.nat_addr)
2358 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2359 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2362 # get maximum number of translations per user
2363 nat44_config = self.vapi.nat_show_config()
2365 # send more than maximum number of translations per user packets
2366 pkts_num = nat44_config.max_translations_per_user + 5
2368 for port in range(0, pkts_num):
2369 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2370 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2371 TCP(sport=1025 + port))
2373 self.pg0.add_stream(pkts)
2374 self.pg_enable_capture(self.pg_interfaces)
2377 # verify number of translated packet
2378 self.pg1.get_capture(pkts_num)
2380 users = self.vapi.nat44_user_dump()
2382 if user.ip_address == self.pg0.remote_ip4n:
2383 self.assertEqual(user.nsessions,
2384 nat44_config.max_translations_per_user)
2385 self.assertEqual(user.nstaticsessions, 0)
2388 self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
2390 proto=IP_PROTOS.tcp)
2391 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2392 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2393 TCP(sport=tcp_port))
2394 self.pg0.add_stream(p)
2395 self.pg_enable_capture(self.pg_interfaces)
2397 self.pg1.get_capture(1)
2398 users = self.vapi.nat44_user_dump()
2400 if user.ip_address == self.pg0.remote_ip4n:
2401 self.assertEqual(user.nsessions,
2402 nat44_config.max_translations_per_user - 1)
2403 self.assertEqual(user.nstaticsessions, 1)
2405 def test_interface_addr(self):
2406 """ Acquire NAT44 addresses from interface """
2407 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2409 # no address in NAT pool
2410 adresses = self.vapi.nat44_address_dump()
2411 self.assertEqual(0, len(adresses))
2413 # configure interface address and check NAT address pool
2414 self.pg7.config_ip4()
2415 adresses = self.vapi.nat44_address_dump()
2416 self.assertEqual(1, len(adresses))
2417 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
2419 # remove interface address and check NAT address pool
2420 self.pg7.unconfig_ip4()
2421 adresses = self.vapi.nat44_address_dump()
2422 self.assertEqual(0, len(adresses))
2424 def test_interface_addr_static_mapping(self):
2425 """ Static mapping with addresses from interface """
2428 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2429 self.nat44_add_static_mapping(
2431 external_sw_if_index=self.pg7.sw_if_index,
2434 # static mappings with external interface
2435 static_mappings = self.vapi.nat44_static_mapping_dump()
2436 self.assertEqual(1, len(static_mappings))
2437 self.assertEqual(self.pg7.sw_if_index,
2438 static_mappings[0].external_sw_if_index)
2439 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2441 # configure interface address and check static mappings
2442 self.pg7.config_ip4()
2443 static_mappings = self.vapi.nat44_static_mapping_dump()
2444 self.assertEqual(2, len(static_mappings))
2446 for sm in static_mappings:
2447 if sm.external_sw_if_index == 0xFFFFFFFF:
2448 self.assertEqual(sm.external_ip_address[0:4],
2449 self.pg7.local_ip4n)
2450 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2452 self.assertTrue(resolved)
2454 # remove interface address and check static mappings
2455 self.pg7.unconfig_ip4()
2456 static_mappings = self.vapi.nat44_static_mapping_dump()
2457 self.assertEqual(1, len(static_mappings))
2458 self.assertEqual(self.pg7.sw_if_index,
2459 static_mappings[0].external_sw_if_index)
2460 self.assertEqual((static_mappings[0].tag).split('\0', 1)[0], tag)
2462 # configure interface address again and check static mappings
2463 self.pg7.config_ip4()
2464 static_mappings = self.vapi.nat44_static_mapping_dump()
2465 self.assertEqual(2, len(static_mappings))
2467 for sm in static_mappings:
2468 if sm.external_sw_if_index == 0xFFFFFFFF:
2469 self.assertEqual(sm.external_ip_address[0:4],
2470 self.pg7.local_ip4n)
2471 self.assertEqual((sm.tag).split('\0', 1)[0], tag)
2473 self.assertTrue(resolved)
2475 # remove static mapping
2476 self.nat44_add_static_mapping(
2478 external_sw_if_index=self.pg7.sw_if_index,
2481 static_mappings = self.vapi.nat44_static_mapping_dump()
2482 self.assertEqual(0, len(static_mappings))
2484 def test_interface_addr_identity_nat(self):
2485 """ Identity NAT with addresses from interface """
2488 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
2489 self.vapi.nat44_add_del_identity_mapping(
2490 sw_if_index=self.pg7.sw_if_index,
2492 protocol=IP_PROTOS.tcp,
2495 # identity mappings with external interface
2496 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2497 self.assertEqual(1, len(identity_mappings))
2498 self.assertEqual(self.pg7.sw_if_index,
2499 identity_mappings[0].sw_if_index)
2501 # configure interface address and check identity mappings
2502 self.pg7.config_ip4()
2503 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2505 self.assertEqual(2, len(identity_mappings))
2506 for sm in identity_mappings:
2507 if sm.sw_if_index == 0xFFFFFFFF:
2508 self.assertEqual(identity_mappings[0].ip_address,
2509 self.pg7.local_ip4n)
2510 self.assertEqual(port, identity_mappings[0].port)
2511 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2513 self.assertTrue(resolved)
2515 # remove interface address and check identity mappings
2516 self.pg7.unconfig_ip4()
2517 identity_mappings = self.vapi.nat44_identity_mapping_dump()
2518 self.assertEqual(1, len(identity_mappings))
2519 self.assertEqual(self.pg7.sw_if_index,
2520 identity_mappings[0].sw_if_index)
2522 def test_ipfix_nat44_sess(self):
2523 """ IPFIX logging NAT44 session created/delted """
2524 self.ipfix_domain_id = 10
2525 self.ipfix_src_port = 20202
2526 colector_port = 30303
2527 bind_layers(UDP, IPFIX, dport=30303)
2528 self.nat44_add_address(self.nat_addr)
2529 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2530 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2532 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2533 src_address=self.pg3.local_ip4n,
2535 template_interval=10,
2536 collector_port=colector_port)
2537 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2538 src_port=self.ipfix_src_port)
2540 pkts = self.create_stream_in(self.pg0, self.pg1)
2541 self.pg0.add_stream(pkts)
2542 self.pg_enable_capture(self.pg_interfaces)
2544 capture = self.pg1.get_capture(len(pkts))
2545 self.verify_capture_out(capture)
2546 self.nat44_add_address(self.nat_addr, is_add=0)
2547 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2548 capture = self.pg3.get_capture(9)
2549 ipfix = IPFIXDecoder()
2550 # first load template
2552 self.assertTrue(p.haslayer(IPFIX))
2553 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2554 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2555 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2556 self.assertEqual(p[UDP].dport, colector_port)
2557 self.assertEqual(p[IPFIX].observationDomainID,
2558 self.ipfix_domain_id)
2559 if p.haslayer(Template):
2560 ipfix.add_template(p.getlayer(Template))
2561 # verify events in data set
2563 if p.haslayer(Data):
2564 data = ipfix.decode_data_set(p.getlayer(Set))
2565 self.verify_ipfix_nat44_ses(data)
2567 def test_ipfix_addr_exhausted(self):
2568 """ IPFIX logging NAT addresses exhausted """
2569 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2570 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2572 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2573 src_address=self.pg3.local_ip4n,
2575 template_interval=10)
2576 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2577 src_port=self.ipfix_src_port)
2579 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2580 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2582 self.pg0.add_stream(p)
2583 self.pg_enable_capture(self.pg_interfaces)
2585 capture = self.pg1.get_capture(0)
2586 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2587 capture = self.pg3.get_capture(9)
2588 ipfix = IPFIXDecoder()
2589 # first load template
2591 self.assertTrue(p.haslayer(IPFIX))
2592 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2593 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2594 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2595 self.assertEqual(p[UDP].dport, 4739)
2596 self.assertEqual(p[IPFIX].observationDomainID,
2597 self.ipfix_domain_id)
2598 if p.haslayer(Template):
2599 ipfix.add_template(p.getlayer(Template))
2600 # verify events in data set
2602 if p.haslayer(Data):
2603 data = ipfix.decode_data_set(p.getlayer(Set))
2604 self.verify_ipfix_addr_exhausted(data)
2606 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2607 def test_ipfix_max_sessions(self):
2608 """ IPFIX logging maximum session entries exceeded """
2609 self.nat44_add_address(self.nat_addr)
2610 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2611 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2614 nat44_config = self.vapi.nat_show_config()
2615 max_sessions = 10 * nat44_config.translation_buckets
2618 for i in range(0, max_sessions):
2619 src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2620 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2621 IP(src=src, dst=self.pg1.remote_ip4) /
2624 self.pg0.add_stream(pkts)
2625 self.pg_enable_capture(self.pg_interfaces)
2628 self.pg1.get_capture(max_sessions)
2629 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
2630 src_address=self.pg3.local_ip4n,
2632 template_interval=10)
2633 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
2634 src_port=self.ipfix_src_port)
2636 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2637 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2639 self.pg0.add_stream(p)
2640 self.pg_enable_capture(self.pg_interfaces)
2642 self.pg1.get_capture(0)
2643 self.vapi.cli("ipfix flush") # FIXME this should be an API call
2644 capture = self.pg3.get_capture(9)
2645 ipfix = IPFIXDecoder()
2646 # first load template
2648 self.assertTrue(p.haslayer(IPFIX))
2649 self.assertEqual(p[IP].src, self.pg3.local_ip4)
2650 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2651 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2652 self.assertEqual(p[UDP].dport, 4739)
2653 self.assertEqual(p[IPFIX].observationDomainID,
2654 self.ipfix_domain_id)
2655 if p.haslayer(Template):
2656 ipfix.add_template(p.getlayer(Template))
2657 # verify events in data set
2659 if p.haslayer(Data):
2660 data = ipfix.decode_data_set(p.getlayer(Set))
2661 self.verify_ipfix_max_sessions(data, max_sessions)
2663 def test_pool_addr_fib(self):
2664 """ NAT44 add pool addresses to FIB """
2665 static_addr = '10.0.0.10'
2666 self.nat44_add_address(self.nat_addr)
2667 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2668 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2670 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2673 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2674 ARP(op=ARP.who_has, pdst=self.nat_addr,
2675 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2676 self.pg1.add_stream(p)
2677 self.pg_enable_capture(self.pg_interfaces)
2679 capture = self.pg1.get_capture(1)
2680 self.assertTrue(capture[0].haslayer(ARP))
2681 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2684 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2685 ARP(op=ARP.who_has, pdst=static_addr,
2686 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2687 self.pg1.add_stream(p)
2688 self.pg_enable_capture(self.pg_interfaces)
2690 capture = self.pg1.get_capture(1)
2691 self.assertTrue(capture[0].haslayer(ARP))
2692 self.assertTrue(capture[0][ARP].op, ARP.is_at)
2694 # send ARP to non-NAT44 interface
2695 p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2696 ARP(op=ARP.who_has, pdst=self.nat_addr,
2697 psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2698 self.pg2.add_stream(p)
2699 self.pg_enable_capture(self.pg_interfaces)
2701 capture = self.pg1.get_capture(0)
2703 # remove addresses and verify
2704 self.nat44_add_address(self.nat_addr, is_add=0)
2705 self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2708 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2709 ARP(op=ARP.who_has, pdst=self.nat_addr,
2710 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2711 self.pg1.add_stream(p)
2712 self.pg_enable_capture(self.pg_interfaces)
2714 capture = self.pg1.get_capture(0)
2716 p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2717 ARP(op=ARP.who_has, pdst=static_addr,
2718 psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2719 self.pg1.add_stream(p)
2720 self.pg_enable_capture(self.pg_interfaces)
2722 capture = self.pg1.get_capture(0)
2724 def test_vrf_mode(self):
2725 """ NAT44 tenant VRF aware address pool mode """
2729 nat_ip1 = "10.0.0.10"
2730 nat_ip2 = "10.0.0.11"
2732 self.pg0.unconfig_ip4()
2733 self.pg1.unconfig_ip4()
2734 self.vapi.ip_table_add_del(vrf_id1, is_add=1)
2735 self.vapi.ip_table_add_del(vrf_id2, is_add=1)
2736 self.pg0.set_table_ip4(vrf_id1)
2737 self.pg1.set_table_ip4(vrf_id2)
2738 self.pg0.config_ip4()
2739 self.pg1.config_ip4()
2741 self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2742 self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2743 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2744 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2745 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2749 pkts = self.create_stream_in(self.pg0, self.pg2)
2750 self.pg0.add_stream(pkts)
2751 self.pg_enable_capture(self.pg_interfaces)
2753 capture = self.pg2.get_capture(len(pkts))
2754 self.verify_capture_out(capture, nat_ip1)
2757 pkts = self.create_stream_in(self.pg1, self.pg2)
2758 self.pg1.add_stream(pkts)
2759 self.pg_enable_capture(self.pg_interfaces)
2761 capture = self.pg2.get_capture(len(pkts))
2762 self.verify_capture_out(capture, nat_ip2)
2764 self.pg0.unconfig_ip4()
2765 self.pg1.unconfig_ip4()
2766 self.pg0.set_table_ip4(0)
2767 self.pg1.set_table_ip4(0)
2768 self.vapi.ip_table_add_del(vrf_id1, is_add=0)
2769 self.vapi.ip_table_add_del(vrf_id2, is_add=0)
2771 def test_vrf_feature_independent(self):
2772 """ NAT44 tenant VRF independent address pool mode """
2774 nat_ip1 = "10.0.0.10"
2775 nat_ip2 = "10.0.0.11"
2777 self.nat44_add_address(nat_ip1)
2778 self.nat44_add_address(nat_ip2, vrf_id=99)
2779 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2780 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
2781 self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
2785 pkts = self.create_stream_in(self.pg0, self.pg2)
2786 self.pg0.add_stream(pkts)
2787 self.pg_enable_capture(self.pg_interfaces)
2789 capture = self.pg2.get_capture(len(pkts))
2790 self.verify_capture_out(capture, nat_ip1)
2793 pkts = self.create_stream_in(self.pg1, self.pg2)
2794 self.pg1.add_stream(pkts)
2795 self.pg_enable_capture(self.pg_interfaces)
2797 capture = self.pg2.get_capture(len(pkts))
2798 self.verify_capture_out(capture, nat_ip1)
2800 def test_dynamic_ipless_interfaces(self):
2801 """ NAT44 interfaces without configured IP address """
2803 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2804 mactobinary(self.pg7.remote_mac),
2805 self.pg7.remote_ip4n,
2807 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2808 mactobinary(self.pg8.remote_mac),
2809 self.pg8.remote_ip4n,
2812 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2813 dst_address_length=32,
2814 next_hop_address=self.pg7.remote_ip4n,
2815 next_hop_sw_if_index=self.pg7.sw_if_index)
2816 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2817 dst_address_length=32,
2818 next_hop_address=self.pg8.remote_ip4n,
2819 next_hop_sw_if_index=self.pg8.sw_if_index)
2821 self.nat44_add_address(self.nat_addr)
2822 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2823 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2827 pkts = self.create_stream_in(self.pg7, self.pg8)
2828 self.pg7.add_stream(pkts)
2829 self.pg_enable_capture(self.pg_interfaces)
2831 capture = self.pg8.get_capture(len(pkts))
2832 self.verify_capture_out(capture)
2835 pkts = self.create_stream_out(self.pg8, self.nat_addr)
2836 self.pg8.add_stream(pkts)
2837 self.pg_enable_capture(self.pg_interfaces)
2839 capture = self.pg7.get_capture(len(pkts))
2840 self.verify_capture_in(capture, self.pg7)
2842 def test_static_ipless_interfaces(self):
2843 """ NAT44 interfaces without configured IP address - 1:1 NAT """
2845 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2846 mactobinary(self.pg7.remote_mac),
2847 self.pg7.remote_ip4n,
2849 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2850 mactobinary(self.pg8.remote_mac),
2851 self.pg8.remote_ip4n,
2854 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2855 dst_address_length=32,
2856 next_hop_address=self.pg7.remote_ip4n,
2857 next_hop_sw_if_index=self.pg7.sw_if_index)
2858 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2859 dst_address_length=32,
2860 next_hop_address=self.pg8.remote_ip4n,
2861 next_hop_sw_if_index=self.pg8.sw_if_index)
2863 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2864 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2865 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2869 pkts = self.create_stream_out(self.pg8)
2870 self.pg8.add_stream(pkts)
2871 self.pg_enable_capture(self.pg_interfaces)
2873 capture = self.pg7.get_capture(len(pkts))
2874 self.verify_capture_in(capture, self.pg7)
2877 pkts = self.create_stream_in(self.pg7, self.pg8)
2878 self.pg7.add_stream(pkts)
2879 self.pg_enable_capture(self.pg_interfaces)
2881 capture = self.pg8.get_capture(len(pkts))
2882 self.verify_capture_out(capture, self.nat_addr, True)
2884 def test_static_with_port_ipless_interfaces(self):
2885 """ NAT44 interfaces without configured IP address - 1:1 NAPT """
2887 self.tcp_port_out = 30606
2888 self.udp_port_out = 30607
2889 self.icmp_id_out = 30608
2891 self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
2892 mactobinary(self.pg7.remote_mac),
2893 self.pg7.remote_ip4n,
2895 self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
2896 mactobinary(self.pg8.remote_mac),
2897 self.pg8.remote_ip4n,
2900 self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
2901 dst_address_length=32,
2902 next_hop_address=self.pg7.remote_ip4n,
2903 next_hop_sw_if_index=self.pg7.sw_if_index)
2904 self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
2905 dst_address_length=32,
2906 next_hop_address=self.pg8.remote_ip4n,
2907 next_hop_sw_if_index=self.pg8.sw_if_index)
2909 self.nat44_add_address(self.nat_addr)
2910 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2911 self.tcp_port_in, self.tcp_port_out,
2912 proto=IP_PROTOS.tcp)
2913 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2914 self.udp_port_in, self.udp_port_out,
2915 proto=IP_PROTOS.udp)
2916 self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2917 self.icmp_id_in, self.icmp_id_out,
2918 proto=IP_PROTOS.icmp)
2919 self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
2920 self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
2924 pkts = self.create_stream_out(self.pg8)
2925 self.pg8.add_stream(pkts)
2926 self.pg_enable_capture(self.pg_interfaces)
2928 capture = self.pg7.get_capture(len(pkts))
2929 self.verify_capture_in(capture, self.pg7)
2932 pkts = self.create_stream_in(self.pg7, self.pg8)
2933 self.pg7.add_stream(pkts)
2934 self.pg_enable_capture(self.pg_interfaces)
2936 capture = self.pg8.get_capture(len(pkts))
2937 self.verify_capture_out(capture)
2939 def test_static_unknown_proto(self):
2940 """ 1:1 NAT translate packet with unknown protocol """
2941 nat_ip = "10.0.0.10"
2942 self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2943 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2944 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2948 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2949 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2951 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2952 TCP(sport=1234, dport=1234))
2953 self.pg0.add_stream(p)
2954 self.pg_enable_capture(self.pg_interfaces)
2956 p = self.pg1.get_capture(1)
2959 self.assertEqual(packet[IP].src, nat_ip)
2960 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2961 self.assertTrue(packet.haslayer(GRE))
2962 self.assert_packet_checksums_valid(packet)
2964 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2968 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2969 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2971 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2972 TCP(sport=1234, dport=1234))
2973 self.pg1.add_stream(p)
2974 self.pg_enable_capture(self.pg_interfaces)
2976 p = self.pg0.get_capture(1)
2979 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2980 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2981 self.assertTrue(packet.haslayer(GRE))
2982 self.assert_packet_checksums_valid(packet)
2984 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2987 def test_hairpinning_static_unknown_proto(self):
2988 """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2990 host = self.pg0.remote_hosts[0]
2991 server = self.pg0.remote_hosts[1]
2993 host_nat_ip = "10.0.0.10"
2994 server_nat_ip = "10.0.0.11"
2996 self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2997 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2998 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2999 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3003 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3004 IP(src=host.ip4, dst=server_nat_ip) /
3006 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3007 TCP(sport=1234, dport=1234))
3008 self.pg0.add_stream(p)
3009 self.pg_enable_capture(self.pg_interfaces)
3011 p = self.pg0.get_capture(1)
3014 self.assertEqual(packet[IP].src, host_nat_ip)
3015 self.assertEqual(packet[IP].dst, server.ip4)
3016 self.assertTrue(packet.haslayer(GRE))
3017 self.assert_packet_checksums_valid(packet)
3019 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3023 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3024 IP(src=server.ip4, dst=host_nat_ip) /
3026 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3027 TCP(sport=1234, dport=1234))
3028 self.pg0.add_stream(p)
3029 self.pg_enable_capture(self.pg_interfaces)
3031 p = self.pg0.get_capture(1)
3034 self.assertEqual(packet[IP].src, server_nat_ip)
3035 self.assertEqual(packet[IP].dst, host.ip4)
3036 self.assertTrue(packet.haslayer(GRE))
3037 self.assert_packet_checksums_valid(packet)
3039 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3042 def test_unknown_proto(self):
3043 """ NAT44 translate packet with unknown protocol """
3044 self.nat44_add_address(self.nat_addr)
3045 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3046 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3050 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3051 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3052 TCP(sport=self.tcp_port_in, dport=20))
3053 self.pg0.add_stream(p)
3054 self.pg_enable_capture(self.pg_interfaces)
3056 p = self.pg1.get_capture(1)
3058 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3059 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3061 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3062 TCP(sport=1234, dport=1234))
3063 self.pg0.add_stream(p)
3064 self.pg_enable_capture(self.pg_interfaces)
3066 p = self.pg1.get_capture(1)
3069 self.assertEqual(packet[IP].src, self.nat_addr)
3070 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3071 self.assertTrue(packet.haslayer(GRE))
3072 self.assert_packet_checksums_valid(packet)
3074 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3078 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3079 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3081 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3082 TCP(sport=1234, dport=1234))
3083 self.pg1.add_stream(p)
3084 self.pg_enable_capture(self.pg_interfaces)
3086 p = self.pg0.get_capture(1)
3089 self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
3090 self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
3091 self.assertTrue(packet.haslayer(GRE))
3092 self.assert_packet_checksums_valid(packet)
3094 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3097 def test_hairpinning_unknown_proto(self):
3098 """ NAT44 translate packet with unknown protocol - hairpinning """
3099 host = self.pg0.remote_hosts[0]
3100 server = self.pg0.remote_hosts[1]
3102 server_out_port = 8765
3103 server_nat_ip = "10.0.0.11"
3105 self.nat44_add_address(self.nat_addr)
3106 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3107 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3110 # add static mapping for server
3111 self.nat44_add_static_mapping(server.ip4, server_nat_ip)
3114 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3115 IP(src=host.ip4, dst=server_nat_ip) /
3116 TCP(sport=host_in_port, dport=server_out_port))
3117 self.pg0.add_stream(p)
3118 self.pg_enable_capture(self.pg_interfaces)
3120 capture = self.pg0.get_capture(1)
3122 p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
3123 IP(src=host.ip4, dst=server_nat_ip) /
3125 IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
3126 TCP(sport=1234, dport=1234))
3127 self.pg0.add_stream(p)
3128 self.pg_enable_capture(self.pg_interfaces)
3130 p = self.pg0.get_capture(1)
3133 self.assertEqual(packet[IP].src, self.nat_addr)
3134 self.assertEqual(packet[IP].dst, server.ip4)
3135 self.assertTrue(packet.haslayer(GRE))
3136 self.assert_packet_checksums_valid(packet)
3138 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3142 p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
3143 IP(src=server.ip4, dst=self.nat_addr) /
3145 IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
3146 TCP(sport=1234, dport=1234))
3147 self.pg0.add_stream(p)
3148 self.pg_enable_capture(self.pg_interfaces)
3150 p = self.pg0.get_capture(1)
3153 self.assertEqual(packet[IP].src, server_nat_ip)
3154 self.assertEqual(packet[IP].dst, host.ip4)
3155 self.assertTrue(packet.haslayer(GRE))
3156 self.assert_packet_checksums_valid(packet)
3158 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3161 def test_output_feature(self):
3162 """ NAT44 interface output feature (in2out postrouting) """
3163 self.nat44_add_address(self.nat_addr)
3164 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3165 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
3166 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3170 pkts = self.create_stream_in(self.pg0, self.pg3)
3171 self.pg0.add_stream(pkts)
3172 self.pg_enable_capture(self.pg_interfaces)
3174 capture = self.pg3.get_capture(len(pkts))
3175 self.verify_capture_out(capture)
3178 pkts = self.create_stream_out(self.pg3)
3179 self.pg3.add_stream(pkts)
3180 self.pg_enable_capture(self.pg_interfaces)
3182 capture = self.pg0.get_capture(len(pkts))
3183 self.verify_capture_in(capture, self.pg0)
3185 # from non-NAT interface to NAT inside interface
3186 pkts = self.create_stream_in(self.pg2, self.pg0)
3187 self.pg2.add_stream(pkts)
3188 self.pg_enable_capture(self.pg_interfaces)
3190 capture = self.pg0.get_capture(len(pkts))
3191 self.verify_capture_no_translation(capture, self.pg2, self.pg0)
3193 def test_output_feature_vrf_aware(self):
3194 """ NAT44 interface output feature VRF aware (in2out postrouting) """
3195 nat_ip_vrf10 = "10.0.0.10"
3196 nat_ip_vrf20 = "10.0.0.20"
3198 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3199 dst_address_length=32,
3200 next_hop_address=self.pg3.remote_ip4n,
3201 next_hop_sw_if_index=self.pg3.sw_if_index,
3203 self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
3204 dst_address_length=32,
3205 next_hop_address=self.pg3.remote_ip4n,
3206 next_hop_sw_if_index=self.pg3.sw_if_index,
3209 self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
3210 self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
3211 self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
3212 self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
3213 self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
3217 pkts = self.create_stream_in(self.pg4, self.pg3)
3218 self.pg4.add_stream(pkts)
3219 self.pg_enable_capture(self.pg_interfaces)
3221 capture = self.pg3.get_capture(len(pkts))
3222 self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
3225 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
3226 self.pg3.add_stream(pkts)
3227 self.pg_enable_capture(self.pg_interfaces)
3229 capture = self.pg4.get_capture(len(pkts))
3230 self.verify_capture_in(capture, self.pg4)
3233 pkts = self.create_stream_in(self.pg6, self.pg3)
3234 self.pg6.add_stream(pkts)
3235 self.pg_enable_capture(self.pg_interfaces)
3237 capture = self.pg3.get_capture(len(pkts))
3238 self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
3241 pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
3242 self.pg3.add_stream(pkts)
3243 self.pg_enable_capture(self.pg_interfaces)
3245 capture = self.pg6.get_capture(len(pkts))
3246 self.verify_capture_in(capture, self.pg6)
3248 def test_output_feature_hairpinning(self):
3249 """ NAT44 interface output feature hairpinning (in2out postrouting) """
3250 host = self.pg0.remote_hosts[0]
3251 server = self.pg0.remote_hosts[1]
3254 server_in_port = 5678
3255 server_out_port = 8765
3257 self.nat44_add_address(self.nat_addr)
3258 self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
3259 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3262 # add static mapping for server
3263 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3264 server_in_port, server_out_port,
3265 proto=IP_PROTOS.tcp)
3267 # send packet from host to server
3268 p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
3269 IP(src=host.ip4, dst=self.nat_addr) /
3270 TCP(sport=host_in_port, dport=server_out_port))
3271 self.pg0.add_stream(p)
3272 self.pg_enable_capture(self.pg_interfaces)
3274 capture = self.pg0.get_capture(1)
3279 self.assertEqual(ip.src, self.nat_addr)
3280 self.assertEqual(ip.dst, server.ip4)
3281 self.assertNotEqual(tcp.sport, host_in_port)
3282 self.assertEqual(tcp.dport, server_in_port)
3283 self.assert_packet_checksums_valid(p)
3284 host_out_port = tcp.sport
3286 self.logger.error(ppp("Unexpected or invalid packet:", p))
3289 # send reply from server to host
3290 p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
3291 IP(src=server.ip4, dst=self.nat_addr) /
3292 TCP(sport=server_in_port, dport=host_out_port))
3293 self.pg0.add_stream(p)
3294 self.pg_enable_capture(self.pg_interfaces)
3296 capture = self.pg0.get_capture(1)
3301 self.assertEqual(ip.src, self.nat_addr)
3302 self.assertEqual(ip.dst, host.ip4)
3303 self.assertEqual(tcp.sport, server_out_port)
3304 self.assertEqual(tcp.dport, host_in_port)
3305 self.assert_packet_checksums_valid(p)
3307 self.logger.error(ppp("Unexpected or invalid packet:", p))
3310 def test_output_feature_and_service(self):
3311 """ NAT44 interface output feature and services """
3312 external_addr = '1.2.3.4'
3316 self.vapi.nat44_forwarding_enable_disable(1)
3317 self.nat44_add_address(self.nat_addr)
3318 self.vapi.nat44_add_del_identity_mapping(ip=self.pg1.remote_ip4n)
3319 self.nat44_add_static_mapping(self.pg0.remote_ip4, external_addr,
3320 local_port, external_port,
3321 proto=IP_PROTOS.tcp, out2in_only=1)
3322 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3323 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3325 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3328 # from client to service
3329 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3330 IP(src=self.pg1.remote_ip4, dst=external_addr) /
3331 TCP(sport=12345, dport=external_port))
3332 self.pg1.add_stream(p)
3333 self.pg_enable_capture(self.pg_interfaces)
3335 capture = self.pg0.get_capture(1)
3340 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3341 self.assertEqual(tcp.dport, local_port)
3342 self.assert_packet_checksums_valid(p)
3344 self.logger.error(ppp("Unexpected or invalid packet:", p))
3347 # from service back to client
3348 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3349 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3350 TCP(sport=local_port, dport=12345))
3351 self.pg0.add_stream(p)
3352 self.pg_enable_capture(self.pg_interfaces)
3354 capture = self.pg1.get_capture(1)
3359 self.assertEqual(ip.src, external_addr)
3360 self.assertEqual(tcp.sport, external_port)
3361 self.assert_packet_checksums_valid(p)
3363 self.logger.error(ppp("Unexpected or invalid packet:", p))
3366 # from local network host to external network
3367 pkts = self.create_stream_in(self.pg0, self.pg1)
3368 self.pg0.add_stream(pkts)
3369 self.pg_enable_capture(self.pg_interfaces)
3371 capture = self.pg1.get_capture(len(pkts))
3372 self.verify_capture_out(capture)
3373 pkts = self.create_stream_in(self.pg0, self.pg1)
3374 self.pg0.add_stream(pkts)
3375 self.pg_enable_capture(self.pg_interfaces)
3377 capture = self.pg1.get_capture(len(pkts))
3378 self.verify_capture_out(capture)
3380 # from external network back to local network host
3381 pkts = self.create_stream_out(self.pg1)
3382 self.pg1.add_stream(pkts)
3383 self.pg_enable_capture(self.pg_interfaces)
3385 capture = self.pg0.get_capture(len(pkts))
3386 self.verify_capture_in(capture, self.pg0)
3388 def test_output_feature_and_service2(self):
3389 """ NAT44 interface output feature and service host direct access """
3390 self.vapi.nat44_forwarding_enable_disable(1)
3391 self.nat44_add_address(self.nat_addr)
3392 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3395 # session initiaded from service host - translate
3396 pkts = self.create_stream_in(self.pg0, self.pg1)
3397 self.pg0.add_stream(pkts)
3398 self.pg_enable_capture(self.pg_interfaces)
3400 capture = self.pg1.get_capture(len(pkts))
3401 self.verify_capture_out(capture)
3403 pkts = self.create_stream_out(self.pg1)
3404 self.pg1.add_stream(pkts)
3405 self.pg_enable_capture(self.pg_interfaces)
3407 capture = self.pg0.get_capture(len(pkts))
3408 self.verify_capture_in(capture, self.pg0)
3410 # session initiaded from remote host - do not translate
3411 pkts = self.create_stream_out(self.pg1,
3412 self.pg0.remote_ip4,
3413 use_inside_ports=True)
3414 self.pg1.add_stream(pkts)
3415 self.pg_enable_capture(self.pg_interfaces)
3417 capture = self.pg0.get_capture(len(pkts))
3418 self.verify_capture_in(capture, self.pg0)
3420 pkts = self.create_stream_in(self.pg0, self.pg1)
3421 self.pg0.add_stream(pkts)
3422 self.pg_enable_capture(self.pg_interfaces)
3424 capture = self.pg1.get_capture(len(pkts))
3425 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
3428 def test_output_feature_and_service3(self):
3429 """ NAT44 interface output feature and DST NAT """
3430 external_addr = '1.2.3.4'
3434 self.vapi.nat44_forwarding_enable_disable(1)
3435 self.nat44_add_address(self.nat_addr)
3436 self.nat44_add_static_mapping(self.pg1.remote_ip4, external_addr,
3437 local_port, external_port,
3438 proto=IP_PROTOS.tcp, out2in_only=1)
3439 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3440 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
3442 self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
3445 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3446 IP(src=self.pg0.remote_ip4, dst=external_addr) /
3447 TCP(sport=12345, dport=external_port))
3448 self.pg0.add_stream(p)
3449 self.pg_enable_capture(self.pg_interfaces)
3451 capture = self.pg1.get_capture(1)
3456 self.assertEqual(ip.src, self.pg0.remote_ip4)
3457 self.assertEqual(tcp.sport, 12345)
3458 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3459 self.assertEqual(tcp.dport, local_port)
3460 self.assert_packet_checksums_valid(p)
3462 self.logger.error(ppp("Unexpected or invalid packet:", p))
3465 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
3466 IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
3467 TCP(sport=local_port, dport=12345))
3468 self.pg1.add_stream(p)
3469 self.pg_enable_capture(self.pg_interfaces)
3471 capture = self.pg0.get_capture(1)
3476 self.assertEqual(ip.src, external_addr)
3477 self.assertEqual(tcp.sport, external_port)
3478 self.assertEqual(ip.dst, self.pg0.remote_ip4)
3479 self.assertEqual(tcp.dport, 12345)
3480 self.assert_packet_checksums_valid(p)
3482 self.logger.error(ppp("Unexpected or invalid packet:", p))
3485 def test_one_armed_nat44(self):
3486 """ One armed NAT44 """
3487 remote_host = self.pg9.remote_hosts[0]
3488 local_host = self.pg9.remote_hosts[1]
3491 self.nat44_add_address(self.nat_addr)
3492 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3493 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3497 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3498 IP(src=local_host.ip4, dst=remote_host.ip4) /
3499 TCP(sport=12345, dport=80))
3500 self.pg9.add_stream(p)
3501 self.pg_enable_capture(self.pg_interfaces)
3503 capture = self.pg9.get_capture(1)
3508 self.assertEqual(ip.src, self.nat_addr)
3509 self.assertEqual(ip.dst, remote_host.ip4)
3510 self.assertNotEqual(tcp.sport, 12345)
3511 external_port = tcp.sport
3512 self.assertEqual(tcp.dport, 80)
3513 self.assert_packet_checksums_valid(p)
3515 self.logger.error(ppp("Unexpected or invalid packet:", p))
3519 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3520 IP(src=remote_host.ip4, dst=self.nat_addr) /
3521 TCP(sport=80, dport=external_port))
3522 self.pg9.add_stream(p)
3523 self.pg_enable_capture(self.pg_interfaces)
3525 capture = self.pg9.get_capture(1)
3530 self.assertEqual(ip.src, remote_host.ip4)
3531 self.assertEqual(ip.dst, local_host.ip4)
3532 self.assertEqual(tcp.sport, 80)
3533 self.assertEqual(tcp.dport, 12345)
3534 self.assert_packet_checksums_valid(p)
3536 self.logger.error(ppp("Unexpected or invalid packet:", p))
3539 def test_one_armed_nat44_static(self):
3540 """ One armed NAT44 and 1:1 NAPT symmetrical rule """
3541 remote_host = self.pg9.remote_hosts[0]
3542 local_host = self.pg9.remote_hosts[1]
3547 self.vapi.nat44_forwarding_enable_disable(1)
3548 self.nat44_add_address(self.nat_addr, twice_nat=1)
3549 self.nat44_add_static_mapping(local_host.ip4, self.nat_addr,
3550 local_port, external_port,
3551 proto=IP_PROTOS.tcp, out2in_only=1,
3553 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
3554 self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
3557 # from client to service
3558 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3559 IP(src=remote_host.ip4, dst=self.nat_addr) /
3560 TCP(sport=12345, dport=external_port))
3561 self.pg9.add_stream(p)
3562 self.pg_enable_capture(self.pg_interfaces)
3564 capture = self.pg9.get_capture(1)
3569 self.assertEqual(ip.dst, local_host.ip4)
3570 self.assertEqual(ip.src, self.nat_addr)
3571 self.assertEqual(tcp.dport, local_port)
3572 self.assertNotEqual(tcp.sport, 12345)
3573 eh_port_in = tcp.sport
3574 self.assert_packet_checksums_valid(p)
3576 self.logger.error(ppp("Unexpected or invalid packet:", p))
3579 # from service back to client
3580 p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3581 IP(src=local_host.ip4, dst=self.nat_addr) /
3582 TCP(sport=local_port, dport=eh_port_in))
3583 self.pg9.add_stream(p)
3584 self.pg_enable_capture(self.pg_interfaces)
3586 capture = self.pg9.get_capture(1)
3591 self.assertEqual(ip.src, self.nat_addr)
3592 self.assertEqual(ip.dst, remote_host.ip4)
3593 self.assertEqual(tcp.sport, external_port)
3594 self.assertEqual(tcp.dport, 12345)
3595 self.assert_packet_checksums_valid(p)
3597 self.logger.error(ppp("Unexpected or invalid packet:", p))
3600 def test_del_session(self):
3601 """ Delete NAT44 session """
3602 self.nat44_add_address(self.nat_addr)
3603 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3604 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3607 pkts = self.create_stream_in(self.pg0, self.pg1)
3608 self.pg0.add_stream(pkts)
3609 self.pg_enable_capture(self.pg_interfaces)
3611 capture = self.pg1.get_capture(len(pkts))
3613 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3614 nsessions = len(sessions)
3616 self.vapi.nat44_del_session(sessions[0].inside_ip_address,
3617 sessions[0].inside_port,
3618 sessions[0].protocol)
3619 self.vapi.nat44_del_session(sessions[1].outside_ip_address,
3620 sessions[1].outside_port,
3621 sessions[1].protocol,
3624 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
3625 self.assertEqual(nsessions - len(sessions), 2)
3627 def test_set_get_reass(self):
3628 """ NAT44 set/get virtual fragmentation reassembly """
3629 reas_cfg1 = self.vapi.nat_get_reass()
3631 self.vapi.nat_set_reass(timeout=reas_cfg1.ip4_timeout + 5,
3632 max_reass=reas_cfg1.ip4_max_reass * 2,
3633 max_frag=reas_cfg1.ip4_max_frag * 2)
3635 reas_cfg2 = self.vapi.nat_get_reass()
3637 self.assertEqual(reas_cfg1.ip4_timeout + 5, reas_cfg2.ip4_timeout)
3638 self.assertEqual(reas_cfg1.ip4_max_reass * 2, reas_cfg2.ip4_max_reass)
3639 self.assertEqual(reas_cfg1.ip4_max_frag * 2, reas_cfg2.ip4_max_frag)
3641 self.vapi.nat_set_reass(drop_frag=1)
3642 self.assertTrue(self.vapi.nat_get_reass().ip4_drop_frag)
3644 def test_frag_in_order(self):
3645 """ NAT44 translate fragments arriving in order """
3646 self.nat44_add_address(self.nat_addr)
3647 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3648 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3651 data = "A" * 4 + "B" * 16 + "C" * 3
3652 self.tcp_port_in = random.randint(1025, 65535)
3654 reass = self.vapi.nat_reass_dump()
3655 reass_n_start = len(reass)
3658 pkts = self.create_stream_frag(self.pg0,
3659 self.pg1.remote_ip4,
3663 self.pg0.add_stream(pkts)
3664 self.pg_enable_capture(self.pg_interfaces)
3666 frags = self.pg1.get_capture(len(pkts))
3667 p = self.reass_frags_and_verify(frags,
3669 self.pg1.remote_ip4)
3670 self.assertEqual(p[TCP].dport, 20)
3671 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3672 self.tcp_port_out = p[TCP].sport
3673 self.assertEqual(data, p[Raw].load)
3676 pkts = self.create_stream_frag(self.pg1,
3681 self.pg1.add_stream(pkts)
3682 self.pg_enable_capture(self.pg_interfaces)
3684 frags = self.pg0.get_capture(len(pkts))
3685 p = self.reass_frags_and_verify(frags,
3686 self.pg1.remote_ip4,
3687 self.pg0.remote_ip4)
3688 self.assertEqual(p[TCP].sport, 20)
3689 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3690 self.assertEqual(data, p[Raw].load)
3692 reass = self.vapi.nat_reass_dump()
3693 reass_n_end = len(reass)
3695 self.assertEqual(reass_n_end - reass_n_start, 2)
3697 def test_reass_hairpinning(self):
3698 """ NAT44 fragments hairpinning """
3699 server = self.pg0.remote_hosts[1]
3700 host_in_port = random.randint(1025, 65535)
3701 server_in_port = random.randint(1025, 65535)
3702 server_out_port = random.randint(1025, 65535)
3703 data = "A" * 4 + "B" * 16 + "C" * 3
3705 self.nat44_add_address(self.nat_addr)
3706 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3707 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3709 # add static mapping for server
3710 self.nat44_add_static_mapping(server.ip4, self.nat_addr,
3711 server_in_port, server_out_port,
3712 proto=IP_PROTOS.tcp)
3714 # send packet from host to server
3715 pkts = self.create_stream_frag(self.pg0,
3720 self.pg0.add_stream(pkts)
3721 self.pg_enable_capture(self.pg_interfaces)
3723 frags = self.pg0.get_capture(len(pkts))
3724 p = self.reass_frags_and_verify(frags,
3727 self.assertNotEqual(p[TCP].sport, host_in_port)
3728 self.assertEqual(p[TCP].dport, server_in_port)
3729 self.assertEqual(data, p[Raw].load)
3731 def test_frag_out_of_order(self):
3732 """ NAT44 translate fragments arriving out of order """
3733 self.nat44_add_address(self.nat_addr)
3734 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3735 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3738 data = "A" * 4 + "B" * 16 + "C" * 3
3739 random.randint(1025, 65535)
3742 pkts = self.create_stream_frag(self.pg0,
3743 self.pg1.remote_ip4,
3748 self.pg0.add_stream(pkts)
3749 self.pg_enable_capture(self.pg_interfaces)
3751 frags = self.pg1.get_capture(len(pkts))
3752 p = self.reass_frags_and_verify(frags,
3754 self.pg1.remote_ip4)
3755 self.assertEqual(p[TCP].dport, 20)
3756 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
3757 self.tcp_port_out = p[TCP].sport
3758 self.assertEqual(data, p[Raw].load)
3761 pkts = self.create_stream_frag(self.pg1,
3767 self.pg1.add_stream(pkts)
3768 self.pg_enable_capture(self.pg_interfaces)
3770 frags = self.pg0.get_capture(len(pkts))
3771 p = self.reass_frags_and_verify(frags,
3772 self.pg1.remote_ip4,
3773 self.pg0.remote_ip4)
3774 self.assertEqual(p[TCP].sport, 20)
3775 self.assertEqual(p[TCP].dport, self.tcp_port_in)
3776 self.assertEqual(data, p[Raw].load)
3778 def test_port_restricted(self):
3779 """ Port restricted NAT44 (MAP-E CE) """
3780 self.nat44_add_address(self.nat_addr)
3781 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
3782 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
3784 self.vapi.cli("nat addr-port-assignment-alg map-e psid 10 "
3785 "psid-offset 6 psid-len 6")
3787 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3788 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3789 TCP(sport=4567, dport=22))
3790 self.pg0.add_stream(p)
3791 self.pg_enable_capture(self.pg_interfaces)
3793 capture = self.pg1.get_capture(1)
3798 self.assertEqual(ip.dst, self.pg1.remote_ip4)
3799 self.assertEqual(ip.src, self.nat_addr)
3800 self.assertEqual(tcp.dport, 22)
3801 self.assertNotEqual(tcp.sport, 4567)
3802 self.assertEqual((tcp.sport >> 6) & 63, 10)
3803 self.assert_packet_checksums_valid(p)
3805 self.logger.error(ppp("Unexpected or invalid packet:", p))
3808 def twice_nat_common(self, self_twice_nat=False, same_pg=False, lb=False,
3810 twice_nat_addr = '10.0.1.3'
3818 port_in1 = port_in+1
3819 port_in2 = port_in+2
3824 server1 = self.pg0.remote_hosts[0]
3825 server2 = self.pg0.remote_hosts[1]
3837 eh_translate = ((not self_twice_nat) or (not lb and same_pg) or
3840 self.nat44_add_address(self.nat_addr)
3841 self.nat44_add_address(twice_nat_addr, twice_nat=1)
3843 self.nat44_add_static_mapping(pg0.remote_ip4, self.nat_addr,
3845 proto=IP_PROTOS.tcp,
3846 twice_nat=int(not self_twice_nat),
3847 self_twice_nat=int(self_twice_nat))
3849 locals = [{'addr': server1.ip4n,
3852 {'addr': server2.ip4n,
3855 out_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
3856 self.vapi.nat44_add_del_lb_static_mapping(out_addr_n,
3860 not self_twice_nat),
3863 local_num=len(locals),
3865 self.vapi.nat44_interface_add_del_feature(pg0.sw_if_index)
3866 self.vapi.nat44_interface_add_del_feature(pg1.sw_if_index,
3873 assert client_id is not None
3875 client = self.pg0.remote_hosts[0]
3876 elif client_id == 2:
3877 client = self.pg0.remote_hosts[1]
3879 client = pg1.remote_hosts[0]
3880 p = (Ether(src=pg1.remote_mac, dst=pg1.local_mac) /
3881 IP(src=client.ip4, dst=self.nat_addr) /
3882 TCP(sport=eh_port_out, dport=port_out))
3884 self.pg_enable_capture(self.pg_interfaces)
3886 capture = pg0.get_capture(1)
3892 if ip.dst == server1.ip4:
3898 self.assertEqual(ip.dst, server.ip4)
3900 self.assertIn(tcp.dport, [port_in1, port_in2])
3902 self.assertEqual(tcp.dport, port_in)
3904 self.assertEqual(ip.src, twice_nat_addr)
3905 self.assertNotEqual(tcp.sport, eh_port_out)
3907 self.assertEqual(ip.src, client.ip4)
3908 self.assertEqual(tcp.sport, eh_port_out)
3910 eh_port_in = tcp.sport
3911 saved_port_in = tcp.dport
3912 self.assert_packet_checksums_valid(p)
3914 self.logger.error(ppp("Unexpected or invalid packet:", p))
3917 p = (Ether(src=server.mac, dst=pg0.local_mac) /
3918 IP(src=server.ip4, dst=eh_addr_in) /
3919 TCP(sport=saved_port_in, dport=eh_port_in))
3921 self.pg_enable_capture(self.pg_interfaces)
3923 capture = pg1.get_capture(1)
3928 self.assertEqual(ip.dst, client.ip4)
3929 self.assertEqual(ip.src, self.nat_addr)
3930 self.assertEqual(tcp.dport, eh_port_out)
3931 self.assertEqual(tcp.sport, port_out)
3932 self.assert_packet_checksums_valid(p)
3934 self.logger.error(ppp("Unexpected or invalid packet:", p))
3938 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3939 self.assertEqual(len(sessions), 1)
3940 self.assertTrue(sessions[0].ext_host_valid)
3941 self.assertTrue(sessions[0].is_twicenat)
3942 self.vapi.nat44_del_session(
3943 sessions[0].inside_ip_address,
3944 sessions[0].inside_port,
3945 sessions[0].protocol,
3946 ext_host_address=sessions[0].ext_host_nat_address,
3947 ext_host_port=sessions[0].ext_host_nat_port)
3948 sessions = self.vapi.nat44_user_session_dump(server.ip4n, 0)
3949 self.assertEqual(len(sessions), 0)
3951 def test_twice_nat(self):
3953 self.twice_nat_common()
3955 def test_self_twice_nat_positive(self):
3956 """ Self Twice NAT44 (positive test) """
3957 self.twice_nat_common(self_twice_nat=True, same_pg=True)
3959 def test_self_twice_nat_negative(self):
3960 """ Self Twice NAT44 (negative test) """
3961 self.twice_nat_common(self_twice_nat=True)
3963 def test_twice_nat_lb(self):
3964 """ Twice NAT44 local service load balancing """
3965 self.twice_nat_common(lb=True)
3967 def test_self_twice_nat_lb_positive(self):
3968 """ Self Twice NAT44 local service load balancing (positive test) """
3969 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3972 def test_self_twice_nat_lb_negative(self):
3973 """ Self Twice NAT44 local service load balancing (negative test) """
3974 self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
3977 def test_twice_nat_interface_addr(self):
3978 """ Acquire twice NAT44 addresses from interface """
3979 self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
3981 # no address in NAT pool
3982 adresses = self.vapi.nat44_address_dump()
3983 self.assertEqual(0, len(adresses))
3985 # configure interface address and check NAT address pool
3986 self.pg7.config_ip4()
3987 adresses = self.vapi.nat44_address_dump()
3988 self.assertEqual(1, len(adresses))
3989 self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
3990 self.assertEqual(adresses[0].twice_nat, 1)
3992 # remove interface address and check NAT address pool
3993 self.pg7.unconfig_ip4()
3994 adresses = self.vapi.nat44_address_dump()
3995 self.assertEqual(0, len(adresses))
3997 def test_ipfix_max_frags(self):
3998 """ IPFIX logging maximum fragments pending reassembly exceeded """
3999 self.nat44_add_address(self.nat_addr)
4000 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4001 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4003 self.vapi.nat_set_reass(max_frag=0)
4004 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
4005 src_address=self.pg3.local_ip4n,
4007 template_interval=10)
4008 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
4009 src_port=self.ipfix_src_port)
4011 data = "A" * 4 + "B" * 16 + "C" * 3
4012 self.tcp_port_in = random.randint(1025, 65535)
4013 pkts = self.create_stream_frag(self.pg0,
4014 self.pg1.remote_ip4,
4018 self.pg0.add_stream(pkts[-1])
4019 self.pg_enable_capture(self.pg_interfaces)
4021 frags = self.pg1.get_capture(0)
4022 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4023 capture = self.pg3.get_capture(9)
4024 ipfix = IPFIXDecoder()
4025 # first load template
4027 self.assertTrue(p.haslayer(IPFIX))
4028 self.assertEqual(p[IP].src, self.pg3.local_ip4)
4029 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
4030 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
4031 self.assertEqual(p[UDP].dport, 4739)
4032 self.assertEqual(p[IPFIX].observationDomainID,
4033 self.ipfix_domain_id)
4034 if p.haslayer(Template):
4035 ipfix.add_template(p.getlayer(Template))
4036 # verify events in data set
4038 if p.haslayer(Data):
4039 data = ipfix.decode_data_set(p.getlayer(Set))
4040 self.verify_ipfix_max_fragments_ip4(data, 0,
4041 self.pg0.remote_ip4n)
4043 def test_tcp_session_close_in(self):
4044 """ Close TCP session from inside network """
4045 self.nat44_add_address(self.nat_addr)
4046 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4047 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4050 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4051 start_sessnum = len(sessions)
4053 self.initiate_tcp_session(self.pg0, self.pg1)
4055 # close the session from inside
4057 # FIN packet in -> out
4058 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4059 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4060 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4061 flags="FA", seq=100, ack=300))
4062 self.pg0.add_stream(p)
4063 self.pg_enable_capture(self.pg_interfaces)
4065 self.pg1.get_capture(1)
4069 # ACK packet out -> in
4070 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4071 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4072 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4073 flags="A", seq=300, ack=101))
4076 # FIN packet out -> in
4077 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4078 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4079 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4080 flags="FA", seq=300, ack=101))
4083 self.pg1.add_stream(pkts)
4084 self.pg_enable_capture(self.pg_interfaces)
4086 self.pg0.get_capture(2)
4088 # ACK packet in -> out
4089 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4090 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4091 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4092 flags="A", seq=101, ack=301))
4093 self.pg0.add_stream(p)
4094 self.pg_enable_capture(self.pg_interfaces)
4096 self.pg1.get_capture(1)
4098 self.initiate_tcp_session(self.pg0, self.pg1)
4099 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4101 self.assertEqual(len(sessions) - start_sessnum, 1)
4103 self.logger.error("TCP session termination failed")
4106 def test_tcp_session_close_out(self):
4107 """ Close TCP session from outside network """
4108 self.nat44_add_address(self.nat_addr)
4109 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4110 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4113 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4114 start_sessnum = len(sessions)
4116 self.initiate_tcp_session(self.pg0, self.pg1)
4118 # close the session from outside
4120 # FIN packet out -> in
4121 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4122 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4123 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4124 flags="FA", seq=100, ack=300))
4125 self.pg1.add_stream(p)
4126 self.pg_enable_capture(self.pg_interfaces)
4128 self.pg0.get_capture(1)
4130 # FIN+ACK packet in -> out
4131 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4132 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4133 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4134 flags="FA", seq=300, ack=101))
4136 self.pg0.add_stream(p)
4137 self.pg_enable_capture(self.pg_interfaces)
4139 self.pg1.get_capture(1)
4141 # ACK packet out -> in
4142 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4143 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4144 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4145 flags="A", seq=101, ack=301))
4146 self.pg1.add_stream(p)
4147 self.pg_enable_capture(self.pg_interfaces)
4149 self.pg0.get_capture(1)
4151 self.initiate_tcp_session(self.pg0, self.pg1)
4152 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4154 self.assertEqual(len(sessions) - start_sessnum, 1)
4156 self.logger.error("TCP session termination failed")
4159 def test_tcp_session_close_simultaneous(self):
4160 """ Close TCP session from inside network """
4161 self.nat44_add_address(self.nat_addr)
4162 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4163 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4166 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
4167 start_sessnum = len(sessions)
4169 self.initiate_tcp_session(self.pg0, self.pg1)
4171 # close the session from inside
4173 # FIN packet in -> out
4174 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4175 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4176 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4177 flags="FA", seq=100, ack=300))
4178 self.pg0.add_stream(p)
4179 self.pg_enable_capture(self.pg_interfaces)
4181 self.pg1.get_capture(1)
4183 # FIN packet out -> in
4184 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4185 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4186 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4187 flags="FA", seq=300, ack=100))
4188 self.pg1.add_stream(p)
4189 self.pg_enable_capture(self.pg_interfaces)
4191 self.pg0.get_capture(1)
4193 # ACK packet in -> out
4194 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4195 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4196 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4197 flags="A", seq=101, ack=301))
4198 self.pg0.add_stream(p)
4199 self.pg_enable_capture(self.pg_interfaces)
4201 self.pg1.get_capture(1)
4203 # ACK packet out -> in
4204 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4205 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4206 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4207 flags="A", seq=301, ack=101))
4208 self.pg1.add_stream(p)
4209 self.pg_enable_capture(self.pg_interfaces)
4211 self.pg0.get_capture(1)
4213 self.initiate_tcp_session(self.pg0, self.pg1)
4214 sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
4216 self.assertEqual(len(sessions) - start_sessnum, 1)
4218 self.logger.error("TCP session termination failed")
4222 super(TestNAT44, self).tearDown()
4223 if not self.vpp_dead:
4224 self.logger.info(self.vapi.cli("show nat44 addresses"))
4225 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4226 self.logger.info(self.vapi.cli("show nat44 static mappings"))
4227 self.logger.info(self.vapi.cli("show nat44 interface address"))
4228 self.logger.info(self.vapi.cli("show nat44 sessions detail"))
4229 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
4230 self.vapi.cli("nat addr-port-assignment-alg default")
4234 class TestNAT44Out2InDPO(MethodHolder):
4235 """ NAT44 Test Cases using out2in DPO """
4238 def setUpConstants(cls):
4239 super(TestNAT44Out2InDPO, cls).setUpConstants()
4240 cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
4243 def setUpClass(cls):
4244 super(TestNAT44Out2InDPO, cls).setUpClass()
4247 cls.tcp_port_in = 6303
4248 cls.tcp_port_out = 6303
4249 cls.udp_port_in = 6304
4250 cls.udp_port_out = 6304
4251 cls.icmp_id_in = 6305
4252 cls.icmp_id_out = 6305
4253 cls.nat_addr = '10.0.0.3'
4254 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4255 cls.dst_ip4 = '192.168.70.1'
4257 cls.create_pg_interfaces(range(2))
4260 cls.pg0.config_ip4()
4261 cls.pg0.resolve_arp()
4264 cls.pg1.config_ip6()
4265 cls.pg1.resolve_ndp()
4267 cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
4268 dst_address_length=0,
4269 next_hop_address=cls.pg1.remote_ip6n,
4270 next_hop_sw_if_index=cls.pg1.sw_if_index)
4273 super(TestNAT44Out2InDPO, cls).tearDownClass()
4276 def configure_xlat(self):
4277 self.dst_ip6_pfx = '1:2:3::'
4278 self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4280 self.dst_ip6_pfx_len = 96
4281 self.src_ip6_pfx = '4:5:6::'
4282 self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
4284 self.src_ip6_pfx_len = 96
4285 self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
4286 self.src_ip6_pfx_n, self.src_ip6_pfx_len,
4287 '\x00\x00\x00\x00', 0, is_translation=1,
4290 def test_464xlat_ce(self):
4291 """ Test 464XLAT CE with NAT44 """
4293 self.configure_xlat()
4295 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4296 self.vapi.nat44_add_del_address_range(self.nat_addr_n, self.nat_addr_n)
4298 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4299 self.dst_ip6_pfx_len)
4300 out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
4301 self.src_ip6_pfx_len)
4304 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4305 self.pg0.add_stream(pkts)
4306 self.pg_enable_capture(self.pg_interfaces)
4308 capture = self.pg1.get_capture(len(pkts))
4309 self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
4312 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
4314 self.pg1.add_stream(pkts)
4315 self.pg_enable_capture(self.pg_interfaces)
4317 capture = self.pg0.get_capture(len(pkts))
4318 self.verify_capture_in(capture, self.pg0)
4320 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index,
4322 self.vapi.nat44_add_del_address_range(self.nat_addr_n,
4323 self.nat_addr_n, is_add=0)
4325 def test_464xlat_ce_no_nat(self):
4326 """ Test 464XLAT CE without NAT44 """
4328 self.configure_xlat()
4330 out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
4331 self.dst_ip6_pfx_len)
4332 out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
4333 self.src_ip6_pfx_len)
4335 pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
4336 self.pg0.add_stream(pkts)
4337 self.pg_enable_capture(self.pg_interfaces)
4339 capture = self.pg1.get_capture(len(pkts))
4340 self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
4341 nat_ip=out_dst_ip6, same_port=True)
4343 pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
4344 self.pg1.add_stream(pkts)
4345 self.pg_enable_capture(self.pg_interfaces)
4347 capture = self.pg0.get_capture(len(pkts))
4348 self.verify_capture_in(capture, self.pg0)
4351 class TestDeterministicNAT(MethodHolder):
4352 """ Deterministic NAT Test Cases """
4355 def setUpConstants(cls):
4356 super(TestDeterministicNAT, cls).setUpConstants()
4357 cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
4360 def setUpClass(cls):
4361 super(TestDeterministicNAT, cls).setUpClass()
4364 cls.tcp_port_in = 6303
4365 cls.tcp_external_port = 6303
4366 cls.udp_port_in = 6304
4367 cls.udp_external_port = 6304
4368 cls.icmp_id_in = 6305
4369 cls.nat_addr = '10.0.0.3'
4371 cls.create_pg_interfaces(range(3))
4372 cls.interfaces = list(cls.pg_interfaces)
4374 for i in cls.interfaces:
4379 cls.pg0.generate_remote_hosts(2)
4380 cls.pg0.configure_ipv4_neighbors()
4383 super(TestDeterministicNAT, cls).tearDownClass()
4386 def create_stream_in(self, in_if, out_if, ttl=64):
4388 Create packet stream for inside network
4390 :param in_if: Inside interface
4391 :param out_if: Outside interface
4392 :param ttl: TTL of generated packets
4396 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4397 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4398 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
4402 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4403 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4404 UDP(sport=self.udp_port_in, dport=self.udp_external_port))
4408 p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
4409 IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
4410 ICMP(id=self.icmp_id_in, type='echo-request'))
4415 def create_stream_out(self, out_if, dst_ip=None, ttl=64):
4417 Create packet stream for outside network
4419 :param out_if: Outside interface
4420 :param dst_ip: Destination IP address (Default use global NAT address)
4421 :param ttl: TTL of generated packets
4424 dst_ip = self.nat_addr
4427 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4428 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4429 TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
4433 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4434 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4435 UDP(dport=self.udp_port_out, sport=self.udp_external_port))
4439 p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
4440 IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
4441 ICMP(id=self.icmp_external_id, type='echo-reply'))
4446 def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
4448 Verify captured packets on outside network
4450 :param capture: Captured packets
4451 :param nat_ip: Translated IP address (Default use global NAT address)
4452 :param same_port: Sorce port number is not translated (Default False)
4453 :param packet_num: Expected number of packets (Default 3)
4456 nat_ip = self.nat_addr
4457 self.assertEqual(packet_num, len(capture))
4458 for packet in capture:
4460 self.assertEqual(packet[IP].src, nat_ip)
4461 if packet.haslayer(TCP):
4462 self.tcp_port_out = packet[TCP].sport
4463 elif packet.haslayer(UDP):
4464 self.udp_port_out = packet[UDP].sport
4466 self.icmp_external_id = packet[ICMP].id
4468 self.logger.error(ppp("Unexpected or invalid packet "
4469 "(outside network):", packet))
4472 def verify_ipfix_max_entries_per_user(self, data):
4474 Verify IPFIX maximum entries per user exceeded event
4476 :param data: Decoded IPFIX data records
4478 self.assertEqual(1, len(data))
4481 self.assertEqual(ord(record[230]), 13)
4482 # natQuotaExceededEvent
4483 self.assertEqual('\x03\x00\x00\x00', record[466])
4485 self.assertEqual('\xe8\x03\x00\x00', record[473])
4487 self.assertEqual(self.pg0.remote_ip4n, record[8])
4489 def test_deterministic_mode(self):
4490 """ NAT plugin run deterministic mode """
4491 in_addr = '172.16.255.0'
4492 out_addr = '172.17.255.50'
4493 in_addr_t = '172.16.255.20'
4494 in_addr_n = socket.inet_aton(in_addr)
4495 out_addr_n = socket.inet_aton(out_addr)
4496 in_addr_t_n = socket.inet_aton(in_addr_t)
4500 nat_config = self.vapi.nat_show_config()
4501 self.assertEqual(1, nat_config.deterministic)
4503 self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
4505 rep1 = self.vapi.nat_det_forward(in_addr_t_n)
4506 self.assertEqual(rep1.out_addr[:4], out_addr_n)
4507 rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
4508 self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
4510 deterministic_mappings = self.vapi.nat_det_map_dump()
4511 self.assertEqual(len(deterministic_mappings), 1)
4512 dsm = deterministic_mappings[0]
4513 self.assertEqual(in_addr_n, dsm.in_addr[:4])
4514 self.assertEqual(in_plen, dsm.in_plen)
4515 self.assertEqual(out_addr_n, dsm.out_addr[:4])
4516 self.assertEqual(out_plen, dsm.out_plen)
4518 self.clear_nat_det()
4519 deterministic_mappings = self.vapi.nat_det_map_dump()
4520 self.assertEqual(len(deterministic_mappings), 0)
4522 def test_set_timeouts(self):
4523 """ Set deterministic NAT timeouts """
4524 timeouts_before = self.vapi.nat_det_get_timeouts()
4526 self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
4527 timeouts_before.tcp_established + 10,
4528 timeouts_before.tcp_transitory + 10,
4529 timeouts_before.icmp + 10)
4531 timeouts_after = self.vapi.nat_det_get_timeouts()
4533 self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
4534 self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
4535 self.assertNotEqual(timeouts_before.tcp_established,
4536 timeouts_after.tcp_established)
4537 self.assertNotEqual(timeouts_before.tcp_transitory,
4538 timeouts_after.tcp_transitory)
4540 def test_det_in(self):
4541 """ Deterministic NAT translation test (TCP, UDP, ICMP) """
4543 nat_ip = "10.0.0.10"
4545 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4547 socket.inet_aton(nat_ip),
4549 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4550 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4554 pkts = self.create_stream_in(self.pg0, self.pg1)
4555 self.pg0.add_stream(pkts)
4556 self.pg_enable_capture(self.pg_interfaces)
4558 capture = self.pg1.get_capture(len(pkts))
4559 self.verify_capture_out(capture, nat_ip)
4562 pkts = self.create_stream_out(self.pg1, nat_ip)
4563 self.pg1.add_stream(pkts)
4564 self.pg_enable_capture(self.pg_interfaces)
4566 capture = self.pg0.get_capture(len(pkts))
4567 self.verify_capture_in(capture, self.pg0)
4570 sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
4571 self.assertEqual(len(sessions), 3)
4575 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4576 self.assertEqual(s.in_port, self.tcp_port_in)
4577 self.assertEqual(s.out_port, self.tcp_port_out)
4578 self.assertEqual(s.ext_port, self.tcp_external_port)
4582 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4583 self.assertEqual(s.in_port, self.udp_port_in)
4584 self.assertEqual(s.out_port, self.udp_port_out)
4585 self.assertEqual(s.ext_port, self.udp_external_port)
4589 self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
4590 self.assertEqual(s.in_port, self.icmp_id_in)
4591 self.assertEqual(s.out_port, self.icmp_external_id)
4593 def test_multiple_users(self):
4594 """ Deterministic NAT multiple users """
4596 nat_ip = "10.0.0.10"
4598 external_port = 6303
4600 host0 = self.pg0.remote_hosts[0]
4601 host1 = self.pg0.remote_hosts[1]
4603 self.vapi.nat_det_add_del_map(host0.ip4n,
4605 socket.inet_aton(nat_ip),
4607 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4608 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4612 p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
4613 IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
4614 TCP(sport=port_in, dport=external_port))
4615 self.pg0.add_stream(p)
4616 self.pg_enable_capture(self.pg_interfaces)
4618 capture = self.pg1.get_capture(1)
4623 self.assertEqual(ip.src, nat_ip)
4624 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4625 self.assertEqual(tcp.dport, external_port)
4626 port_out0 = tcp.sport
4628 self.logger.error(ppp("Unexpected or invalid packet:", p))
4632 p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
4633 IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
4634 TCP(sport=port_in, dport=external_port))
4635 self.pg0.add_stream(p)
4636 self.pg_enable_capture(self.pg_interfaces)
4638 capture = self.pg1.get_capture(1)
4643 self.assertEqual(ip.src, nat_ip)
4644 self.assertEqual(ip.dst, self.pg1.remote_ip4)
4645 self.assertEqual(tcp.dport, external_port)
4646 port_out1 = tcp.sport
4648 self.logger.error(ppp("Unexpected or invalid packet:", p))
4651 dms = self.vapi.nat_det_map_dump()
4652 self.assertEqual(1, len(dms))
4653 self.assertEqual(2, dms[0].ses_num)
4656 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4657 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4658 TCP(sport=external_port, dport=port_out0))
4659 self.pg1.add_stream(p)
4660 self.pg_enable_capture(self.pg_interfaces)
4662 capture = self.pg0.get_capture(1)
4667 self.assertEqual(ip.src, self.pg1.remote_ip4)
4668 self.assertEqual(ip.dst, host0.ip4)
4669 self.assertEqual(tcp.dport, port_in)
4670 self.assertEqual(tcp.sport, external_port)
4672 self.logger.error(ppp("Unexpected or invalid packet:", p))
4676 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4677 IP(src=self.pg1.remote_ip4, dst=nat_ip) /
4678 TCP(sport=external_port, dport=port_out1))
4679 self.pg1.add_stream(p)
4680 self.pg_enable_capture(self.pg_interfaces)
4682 capture = self.pg0.get_capture(1)
4687 self.assertEqual(ip.src, self.pg1.remote_ip4)
4688 self.assertEqual(ip.dst, host1.ip4)
4689 self.assertEqual(tcp.dport, port_in)
4690 self.assertEqual(tcp.sport, external_port)
4692 self.logger.error(ppp("Unexpected or invalid packet", p))
4695 # session close api test
4696 self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
4698 self.pg1.remote_ip4n,
4700 dms = self.vapi.nat_det_map_dump()
4701 self.assertEqual(dms[0].ses_num, 1)
4703 self.vapi.nat_det_close_session_in(host0.ip4n,
4705 self.pg1.remote_ip4n,
4707 dms = self.vapi.nat_det_map_dump()
4708 self.assertEqual(dms[0].ses_num, 0)
4710 def test_tcp_session_close_detection_in(self):
4711 """ Deterministic NAT TCP session close from inside network """
4712 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4714 socket.inet_aton(self.nat_addr),
4716 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4717 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4720 self.initiate_tcp_session(self.pg0, self.pg1)
4722 # close the session from inside
4724 # FIN packet in -> out
4725 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4726 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4727 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4729 self.pg0.add_stream(p)
4730 self.pg_enable_capture(self.pg_interfaces)
4732 self.pg1.get_capture(1)
4736 # ACK packet out -> in
4737 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4738 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4739 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4743 # FIN packet out -> in
4744 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4745 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4746 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4750 self.pg1.add_stream(pkts)
4751 self.pg_enable_capture(self.pg_interfaces)
4753 self.pg0.get_capture(2)
4755 # ACK packet in -> out
4756 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4757 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4758 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4760 self.pg0.add_stream(p)
4761 self.pg_enable_capture(self.pg_interfaces)
4763 self.pg1.get_capture(1)
4765 # Check if deterministic NAT44 closed the session
4766 dms = self.vapi.nat_det_map_dump()
4767 self.assertEqual(0, dms[0].ses_num)
4769 self.logger.error("TCP session termination failed")
4772 def test_tcp_session_close_detection_out(self):
4773 """ Deterministic NAT TCP session close from outside network """
4774 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4776 socket.inet_aton(self.nat_addr),
4778 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4779 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4782 self.initiate_tcp_session(self.pg0, self.pg1)
4784 # close the session from outside
4786 # FIN packet out -> in
4787 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4788 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4789 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4791 self.pg1.add_stream(p)
4792 self.pg_enable_capture(self.pg_interfaces)
4794 self.pg0.get_capture(1)
4798 # ACK packet in -> out
4799 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4800 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4801 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4805 # ACK packet in -> out
4806 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4807 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4808 TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
4812 self.pg0.add_stream(pkts)
4813 self.pg_enable_capture(self.pg_interfaces)
4815 self.pg1.get_capture(2)
4817 # ACK packet out -> in
4818 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
4819 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
4820 TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
4822 self.pg1.add_stream(p)
4823 self.pg_enable_capture(self.pg_interfaces)
4825 self.pg0.get_capture(1)
4827 # Check if deterministic NAT44 closed the session
4828 dms = self.vapi.nat_det_map_dump()
4829 self.assertEqual(0, dms[0].ses_num)
4831 self.logger.error("TCP session termination failed")
4834 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4835 def test_session_timeout(self):
4836 """ Deterministic NAT session timeouts """
4837 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4839 socket.inet_aton(self.nat_addr),
4841 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4842 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4845 self.initiate_tcp_session(self.pg0, self.pg1)
4846 self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
4847 pkts = self.create_stream_in(self.pg0, self.pg1)
4848 self.pg0.add_stream(pkts)
4849 self.pg_enable_capture(self.pg_interfaces)
4851 capture = self.pg1.get_capture(len(pkts))
4854 dms = self.vapi.nat_det_map_dump()
4855 self.assertEqual(0, dms[0].ses_num)
4857 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
4858 def test_session_limit_per_user(self):
4859 """ Deterministic NAT maximum sessions per user limit """
4860 self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
4862 socket.inet_aton(self.nat_addr),
4864 self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
4865 self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
4867 self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
4868 src_address=self.pg2.local_ip4n,
4870 template_interval=10)
4871 self.vapi.nat_ipfix()
4874 for port in range(1025, 2025):
4875 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4876 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4877 UDP(sport=port, dport=port))
4880 self.pg0.add_stream(pkts)
4881 self.pg_enable_capture(self.pg_interfaces)
4883 capture = self.pg1.get_capture(len(pkts))
4885 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
4886 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
4887 UDP(sport=3001, dport=3002))
4888 self.pg0.add_stream(p)
4889 self.pg_enable_capture(self.pg_interfaces)
4891 capture = self.pg1.assert_nothing_captured()
4893 # verify ICMP error packet
4894 capture = self.pg0.get_capture(1)
4896 self.assertTrue(p.haslayer(ICMP))
4898 self.assertEqual(icmp.type, 3)
4899 self.assertEqual(icmp.code, 1)
4900 self.assertTrue(icmp.haslayer(IPerror))
4901 inner_ip = icmp[IPerror]
4902 self.assertEqual(inner_ip[UDPerror].sport, 3001)
4903 self.assertEqual(inner_ip[UDPerror].dport, 3002)
4905 dms = self.vapi.nat_det_map_dump()
4907 self.assertEqual(1000, dms[0].ses_num)
4909 # verify IPFIX logging
4910 self.vapi.cli("ipfix flush") # FIXME this should be an API call
4912 capture = self.pg2.get_capture(2)
4913 ipfix = IPFIXDecoder()
4914 # first load template
4916 self.assertTrue(p.haslayer(IPFIX))
4917 if p.haslayer(Template):
4918 ipfix.add_template(p.getlayer(Template))
4919 # verify events in data set
4921 if p.haslayer(Data):
4922 data = ipfix.decode_data_set(p.getlayer(Set))
4923 self.verify_ipfix_max_entries_per_user(data)
4925 def clear_nat_det(self):
4927 Clear deterministic NAT configuration.
4929 self.vapi.nat_ipfix(enable=0)
4930 self.vapi.nat_det_set_timeouts()
4931 deterministic_mappings = self.vapi.nat_det_map_dump()
4932 for dsm in deterministic_mappings:
4933 self.vapi.nat_det_add_del_map(dsm.in_addr,
4939 interfaces = self.vapi.nat44_interface_dump()
4940 for intf in interfaces:
4941 self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
4946 super(TestDeterministicNAT, self).tearDown()
4947 if not self.vpp_dead:
4948 self.logger.info(self.vapi.cli("show nat44 interfaces"))
4950 self.vapi.cli("show nat44 deterministic mappings"))
4952 self.vapi.cli("show nat44 deterministic timeouts"))
4954 self.vapi.cli("show nat44 deterministic sessions"))
4955 self.clear_nat_det()
4958 class TestNAT64(MethodHolder):
4959 """ NAT64 Test Cases """
4962 def setUpConstants(cls):
4963 super(TestNAT64, cls).setUpConstants()
4964 cls.vpp_cmdline.extend(["nat", "{", "nat64 bib hash buckets 128",
4965 "nat64 st hash buckets 256", "}"])
4968 def setUpClass(cls):
4969 super(TestNAT64, cls).setUpClass()
4972 cls.tcp_port_in = 6303
4973 cls.tcp_port_out = 6303
4974 cls.udp_port_in = 6304
4975 cls.udp_port_out = 6304
4976 cls.icmp_id_in = 6305
4977 cls.icmp_id_out = 6305
4978 cls.nat_addr = '10.0.0.3'
4979 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
4981 cls.vrf1_nat_addr = '10.0.10.3'
4982 cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
4984 cls.ipfix_src_port = 4739
4985 cls.ipfix_domain_id = 1
4987 cls.create_pg_interfaces(range(5))
4988 cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
4989 cls.ip6_interfaces.append(cls.pg_interfaces[2])
4990 cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
4992 cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
4994 cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
4996 cls.pg0.generate_remote_hosts(2)
4998 for i in cls.ip6_interfaces:
5001 i.configure_ipv6_neighbors()
5003 for i in cls.ip4_interfaces:
5009 cls.pg3.config_ip4()
5010 cls.pg3.resolve_arp()
5011 cls.pg3.config_ip6()
5012 cls.pg3.configure_ipv6_neighbors()
5015 super(TestNAT64, cls).tearDownClass()
5018 def test_pool(self):
5019 """ Add/delete address to NAT64 pool """
5020 nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
5022 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
5024 addresses = self.vapi.nat64_pool_addr_dump()
5025 self.assertEqual(len(addresses), 1)
5026 self.assertEqual(addresses[0].address, nat_addr)
5028 self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
5030 addresses = self.vapi.nat64_pool_addr_dump()
5031 self.assertEqual(len(addresses), 0)
5033 def test_interface(self):
5034 """ Enable/disable NAT64 feature on the interface """
5035 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5036 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5038 interfaces = self.vapi.nat64_interface_dump()
5039 self.assertEqual(len(interfaces), 2)
5042 for intf in interfaces:
5043 if intf.sw_if_index == self.pg0.sw_if_index:
5044 self.assertEqual(intf.is_inside, 1)
5046 elif intf.sw_if_index == self.pg1.sw_if_index:
5047 self.assertEqual(intf.is_inside, 0)
5049 self.assertTrue(pg0_found)
5050 self.assertTrue(pg1_found)
5052 features = self.vapi.cli("show interface features pg0")
5053 self.assertNotEqual(features.find('nat64-in2out'), -1)
5054 features = self.vapi.cli("show interface features pg1")
5055 self.assertNotEqual(features.find('nat64-out2in'), -1)
5057 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
5058 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
5060 interfaces = self.vapi.nat64_interface_dump()
5061 self.assertEqual(len(interfaces), 0)
5063 def test_static_bib(self):
5064 """ Add/delete static BIB entry """
5065 in_addr = socket.inet_pton(socket.AF_INET6,
5066 '2001:db8:85a3::8a2e:370:7334')
5067 out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
5070 proto = IP_PROTOS.tcp
5072 self.vapi.nat64_add_del_static_bib(in_addr,
5077 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5082 self.assertEqual(bibe.i_addr, in_addr)
5083 self.assertEqual(bibe.o_addr, out_addr)
5084 self.assertEqual(bibe.i_port, in_port)
5085 self.assertEqual(bibe.o_port, out_port)
5086 self.assertEqual(static_bib_num, 1)
5088 self.vapi.nat64_add_del_static_bib(in_addr,
5094 bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
5099 self.assertEqual(static_bib_num, 0)
5101 def test_set_timeouts(self):
5102 """ Set NAT64 timeouts """
5103 # verify default values
5104 timeouts = self.vapi.nat64_get_timeouts()
5105 self.assertEqual(timeouts.udp, 300)
5106 self.assertEqual(timeouts.icmp, 60)
5107 self.assertEqual(timeouts.tcp_trans, 240)
5108 self.assertEqual(timeouts.tcp_est, 7440)
5109 self.assertEqual(timeouts.tcp_incoming_syn, 6)
5111 # set and verify custom values
5112 self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
5113 tcp_est=7450, tcp_incoming_syn=10)
5114 timeouts = self.vapi.nat64_get_timeouts()
5115 self.assertEqual(timeouts.udp, 200)
5116 self.assertEqual(timeouts.icmp, 30)
5117 self.assertEqual(timeouts.tcp_trans, 250)
5118 self.assertEqual(timeouts.tcp_est, 7450)
5119 self.assertEqual(timeouts.tcp_incoming_syn, 10)
5121 def test_dynamic(self):
5122 """ NAT64 dynamic translation test """
5123 self.tcp_port_in = 6303
5124 self.udp_port_in = 6304
5125 self.icmp_id_in = 6305
5127 ses_num_start = self.nat64_get_ses_num()
5129 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5131 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5132 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5135 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5136 self.pg0.add_stream(pkts)
5137 self.pg_enable_capture(self.pg_interfaces)
5139 capture = self.pg1.get_capture(len(pkts))
5140 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5141 dst_ip=self.pg1.remote_ip4)
5144 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5145 self.pg1.add_stream(pkts)
5146 self.pg_enable_capture(self.pg_interfaces)
5148 capture = self.pg0.get_capture(len(pkts))
5149 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5150 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5153 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5154 self.pg0.add_stream(pkts)
5155 self.pg_enable_capture(self.pg_interfaces)
5157 capture = self.pg1.get_capture(len(pkts))
5158 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5159 dst_ip=self.pg1.remote_ip4)
5162 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5163 self.pg1.add_stream(pkts)
5164 self.pg_enable_capture(self.pg_interfaces)
5166 capture = self.pg0.get_capture(len(pkts))
5167 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5169 ses_num_end = self.nat64_get_ses_num()
5171 self.assertEqual(ses_num_end - ses_num_start, 3)
5173 # tenant with specific VRF
5174 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5175 self.vrf1_nat_addr_n,
5176 vrf_id=self.vrf1_id)
5177 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5179 pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
5180 self.pg2.add_stream(pkts)
5181 self.pg_enable_capture(self.pg_interfaces)
5183 capture = self.pg1.get_capture(len(pkts))
5184 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5185 dst_ip=self.pg1.remote_ip4)
5187 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5188 self.pg1.add_stream(pkts)
5189 self.pg_enable_capture(self.pg_interfaces)
5191 capture = self.pg2.get_capture(len(pkts))
5192 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
5194 def test_static(self):
5195 """ NAT64 static translation test """
5196 self.tcp_port_in = 60303
5197 self.udp_port_in = 60304
5198 self.icmp_id_in = 60305
5199 self.tcp_port_out = 60303
5200 self.udp_port_out = 60304
5201 self.icmp_id_out = 60305
5203 ses_num_start = self.nat64_get_ses_num()
5205 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5207 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5208 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5210 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5215 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5220 self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
5227 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5228 self.pg0.add_stream(pkts)
5229 self.pg_enable_capture(self.pg_interfaces)
5231 capture = self.pg1.get_capture(len(pkts))
5232 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5233 dst_ip=self.pg1.remote_ip4, same_port=True)
5236 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5237 self.pg1.add_stream(pkts)
5238 self.pg_enable_capture(self.pg_interfaces)
5240 capture = self.pg0.get_capture(len(pkts))
5241 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5242 self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
5244 ses_num_end = self.nat64_get_ses_num()
5246 self.assertEqual(ses_num_end - ses_num_start, 3)
5248 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5249 def test_session_timeout(self):
5250 """ NAT64 session timeout """
5251 self.icmp_id_in = 1234
5252 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5254 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5255 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5256 self.vapi.nat64_set_timeouts(icmp=5)
5258 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5259 self.pg0.add_stream(pkts)
5260 self.pg_enable_capture(self.pg_interfaces)
5262 capture = self.pg1.get_capture(len(pkts))
5264 ses_num_before_timeout = self.nat64_get_ses_num()
5268 # ICMP session after timeout
5269 ses_num_after_timeout = self.nat64_get_ses_num()
5270 self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
5272 def test_icmp_error(self):
5273 """ NAT64 ICMP Error message translation """
5274 self.tcp_port_in = 6303
5275 self.udp_port_in = 6304
5276 self.icmp_id_in = 6305
5278 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5280 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5281 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5283 # send some packets to create sessions
5284 pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
5285 self.pg0.add_stream(pkts)
5286 self.pg_enable_capture(self.pg_interfaces)
5288 capture_ip4 = self.pg1.get_capture(len(pkts))
5289 self.verify_capture_out(capture_ip4,
5290 nat_ip=self.nat_addr,
5291 dst_ip=self.pg1.remote_ip4)
5293 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5294 self.pg1.add_stream(pkts)
5295 self.pg_enable_capture(self.pg_interfaces)
5297 capture_ip6 = self.pg0.get_capture(len(pkts))
5298 ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
5299 self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
5300 self.pg0.remote_ip6)
5303 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5304 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
5305 ICMPv6DestUnreach(code=1) /
5306 packet[IPv6] for packet in capture_ip6]
5307 self.pg0.add_stream(pkts)
5308 self.pg_enable_capture(self.pg_interfaces)
5310 capture = self.pg1.get_capture(len(pkts))
5311 for packet in capture:
5313 self.assertEqual(packet[IP].src, self.nat_addr)
5314 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5315 self.assertEqual(packet[ICMP].type, 3)
5316 self.assertEqual(packet[ICMP].code, 13)
5317 inner = packet[IPerror]
5318 self.assertEqual(inner.src, self.pg1.remote_ip4)
5319 self.assertEqual(inner.dst, self.nat_addr)
5320 self.assert_packet_checksums_valid(packet)
5321 if inner.haslayer(TCPerror):
5322 self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
5323 elif inner.haslayer(UDPerror):
5324 self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
5326 self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
5328 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5332 pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5333 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5334 ICMP(type=3, code=13) /
5335 packet[IP] for packet in capture_ip4]
5336 self.pg1.add_stream(pkts)
5337 self.pg_enable_capture(self.pg_interfaces)
5339 capture = self.pg0.get_capture(len(pkts))
5340 for packet in capture:
5342 self.assertEqual(packet[IPv6].src, ip.src)
5343 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5344 icmp = packet[ICMPv6DestUnreach]
5345 self.assertEqual(icmp.code, 1)
5346 inner = icmp[IPerror6]
5347 self.assertEqual(inner.src, self.pg0.remote_ip6)
5348 self.assertEqual(inner.dst, ip.src)
5349 self.assert_icmpv6_checksum_valid(packet)
5350 if inner.haslayer(TCPerror):
5351 self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
5352 elif inner.haslayer(UDPerror):
5353 self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
5355 self.assertEqual(inner[ICMPv6EchoRequest].id,
5358 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5361 def test_hairpinning(self):
5362 """ NAT64 hairpinning """
5364 client = self.pg0.remote_hosts[0]
5365 server = self.pg0.remote_hosts[1]
5366 server_tcp_in_port = 22
5367 server_tcp_out_port = 4022
5368 server_udp_in_port = 23
5369 server_udp_out_port = 4023
5370 client_tcp_in_port = 1234
5371 client_udp_in_port = 1235
5372 client_tcp_out_port = 0
5373 client_udp_out_port = 0
5374 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5375 nat_addr_ip6 = ip.src
5377 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5379 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5380 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5382 self.vapi.nat64_add_del_static_bib(server.ip6n,
5385 server_tcp_out_port,
5387 self.vapi.nat64_add_del_static_bib(server.ip6n,
5390 server_udp_out_port,
5395 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5396 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5397 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5399 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5400 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5401 UDP(sport=client_udp_in_port, dport=server_udp_out_port))
5403 self.pg0.add_stream(pkts)
5404 self.pg_enable_capture(self.pg_interfaces)
5406 capture = self.pg0.get_capture(len(pkts))
5407 for packet in capture:
5409 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5410 self.assertEqual(packet[IPv6].dst, server.ip6)
5411 self.assert_packet_checksums_valid(packet)
5412 if packet.haslayer(TCP):
5413 self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
5414 self.assertEqual(packet[TCP].dport, server_tcp_in_port)
5415 client_tcp_out_port = packet[TCP].sport
5417 self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
5418 self.assertEqual(packet[UDP].dport, server_udp_in_port)
5419 client_udp_out_port = packet[UDP].sport
5421 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5426 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5427 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5428 TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
5430 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5431 IPv6(src=server.ip6, dst=nat_addr_ip6) /
5432 UDP(sport=server_udp_in_port, dport=client_udp_out_port))
5434 self.pg0.add_stream(pkts)
5435 self.pg_enable_capture(self.pg_interfaces)
5437 capture = self.pg0.get_capture(len(pkts))
5438 for packet in capture:
5440 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5441 self.assertEqual(packet[IPv6].dst, client.ip6)
5442 self.assert_packet_checksums_valid(packet)
5443 if packet.haslayer(TCP):
5444 self.assertEqual(packet[TCP].sport, server_tcp_out_port)
5445 self.assertEqual(packet[TCP].dport, client_tcp_in_port)
5447 self.assertEqual(packet[UDP].sport, server_udp_out_port)
5448 self.assertEqual(packet[UDP].dport, client_udp_in_port)
5450 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5455 pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5456 IPv6(src=client.ip6, dst=nat_addr_ip6) /
5457 ICMPv6DestUnreach(code=1) /
5458 packet[IPv6] for packet in capture]
5459 self.pg0.add_stream(pkts)
5460 self.pg_enable_capture(self.pg_interfaces)
5462 capture = self.pg0.get_capture(len(pkts))
5463 for packet in capture:
5465 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
5466 self.assertEqual(packet[IPv6].dst, server.ip6)
5467 icmp = packet[ICMPv6DestUnreach]
5468 self.assertEqual(icmp.code, 1)
5469 inner = icmp[IPerror6]
5470 self.assertEqual(inner.src, server.ip6)
5471 self.assertEqual(inner.dst, nat_addr_ip6)
5472 self.assert_packet_checksums_valid(packet)
5473 if inner.haslayer(TCPerror):
5474 self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
5475 self.assertEqual(inner[TCPerror].dport,
5476 client_tcp_out_port)
5478 self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
5479 self.assertEqual(inner[UDPerror].dport,
5480 client_udp_out_port)
5482 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5485 def test_prefix(self):
5486 """ NAT64 Network-Specific Prefix """
5488 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5490 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5491 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5492 self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
5493 self.vrf1_nat_addr_n,
5494 vrf_id=self.vrf1_id)
5495 self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
5498 global_pref64 = "2001:db8::"
5499 global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
5500 global_pref64_len = 32
5501 self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
5503 prefix = self.vapi.nat64_prefix_dump()
5504 self.assertEqual(len(prefix), 1)
5505 self.assertEqual(prefix[0].prefix, global_pref64_n)
5506 self.assertEqual(prefix[0].prefix_len, global_pref64_len)
5507 self.assertEqual(prefix[0].vrf_id, 0)
5509 # Add tenant specific prefix
5510 vrf1_pref64 = "2001:db8:122:300::"
5511 vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
5512 vrf1_pref64_len = 56
5513 self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
5515 vrf_id=self.vrf1_id)
5516 prefix = self.vapi.nat64_prefix_dump()
5517 self.assertEqual(len(prefix), 2)
5520 pkts = self.create_stream_in_ip6(self.pg0,
5523 plen=global_pref64_len)
5524 self.pg0.add_stream(pkts)
5525 self.pg_enable_capture(self.pg_interfaces)
5527 capture = self.pg1.get_capture(len(pkts))
5528 self.verify_capture_out(capture, nat_ip=self.nat_addr,
5529 dst_ip=self.pg1.remote_ip4)
5531 pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
5532 self.pg1.add_stream(pkts)
5533 self.pg_enable_capture(self.pg_interfaces)
5535 capture = self.pg0.get_capture(len(pkts))
5536 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5539 self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
5541 # Tenant specific prefix
5542 pkts = self.create_stream_in_ip6(self.pg2,
5545 plen=vrf1_pref64_len)
5546 self.pg2.add_stream(pkts)
5547 self.pg_enable_capture(self.pg_interfaces)
5549 capture = self.pg1.get_capture(len(pkts))
5550 self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
5551 dst_ip=self.pg1.remote_ip4)
5553 pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
5554 self.pg1.add_stream(pkts)
5555 self.pg_enable_capture(self.pg_interfaces)
5557 capture = self.pg2.get_capture(len(pkts))
5558 dst_ip = self.compose_ip6(self.pg1.remote_ip4,
5561 self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
5563 def test_unknown_proto(self):
5564 """ NAT64 translate packet with unknown protocol """
5566 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5568 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5569 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5570 remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5573 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5574 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
5575 TCP(sport=self.tcp_port_in, dport=20))
5576 self.pg0.add_stream(p)
5577 self.pg_enable_capture(self.pg_interfaces)
5579 p = self.pg1.get_capture(1)
5581 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5582 IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
5584 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5585 TCP(sport=1234, dport=1234))
5586 self.pg0.add_stream(p)
5587 self.pg_enable_capture(self.pg_interfaces)
5589 p = self.pg1.get_capture(1)
5592 self.assertEqual(packet[IP].src, self.nat_addr)
5593 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
5594 self.assertTrue(packet.haslayer(GRE))
5595 self.assert_packet_checksums_valid(packet)
5597 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5601 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
5602 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
5604 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5605 TCP(sport=1234, dport=1234))
5606 self.pg1.add_stream(p)
5607 self.pg_enable_capture(self.pg_interfaces)
5609 p = self.pg0.get_capture(1)
5612 self.assertEqual(packet[IPv6].src, remote_ip6)
5613 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
5614 self.assertEqual(packet[IPv6].nh, 47)
5616 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5619 def test_hairpinning_unknown_proto(self):
5620 """ NAT64 translate packet with unknown protocol - hairpinning """
5622 client = self.pg0.remote_hosts[0]
5623 server = self.pg0.remote_hosts[1]
5624 server_tcp_in_port = 22
5625 server_tcp_out_port = 4022
5626 client_tcp_in_port = 1234
5627 client_tcp_out_port = 1235
5628 server_nat_ip = "10.0.0.100"
5629 client_nat_ip = "10.0.0.110"
5630 server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
5631 client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
5632 server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
5633 client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
5635 self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
5637 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5638 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5640 self.vapi.nat64_add_del_static_bib(server.ip6n,
5643 server_tcp_out_port,
5646 self.vapi.nat64_add_del_static_bib(server.ip6n,
5652 self.vapi.nat64_add_del_static_bib(client.ip6n,
5655 client_tcp_out_port,
5659 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5660 IPv6(src=client.ip6, dst=server_nat_ip6) /
5661 TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
5662 self.pg0.add_stream(p)
5663 self.pg_enable_capture(self.pg_interfaces)
5665 p = self.pg0.get_capture(1)
5667 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5668 IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
5670 IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
5671 TCP(sport=1234, dport=1234))
5672 self.pg0.add_stream(p)
5673 self.pg_enable_capture(self.pg_interfaces)
5675 p = self.pg0.get_capture(1)
5678 self.assertEqual(packet[IPv6].src, client_nat_ip6)
5679 self.assertEqual(packet[IPv6].dst, server.ip6)
5680 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5682 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5686 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
5687 IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
5689 IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
5690 TCP(sport=1234, dport=1234))
5691 self.pg0.add_stream(p)
5692 self.pg_enable_capture(self.pg_interfaces)
5694 p = self.pg0.get_capture(1)
5697 self.assertEqual(packet[IPv6].src, server_nat_ip6)
5698 self.assertEqual(packet[IPv6].dst, client.ip6)
5699 self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
5701 self.logger.error(ppp("Unexpected or invalid packet:", packet))
5704 def test_one_armed_nat64(self):
5705 """ One armed NAT64 """
5707 remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
5711 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5713 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
5714 self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
5717 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5718 IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
5719 TCP(sport=12345, dport=80))
5720 self.pg3.add_stream(p)
5721 self.pg_enable_capture(self.pg_interfaces)
5723 capture = self.pg3.get_capture(1)
5728 self.assertEqual(ip.src, self.nat_addr)
5729 self.assertEqual(ip.dst, self.pg3.remote_ip4)
5730 self.assertNotEqual(tcp.sport, 12345)
5731 external_port = tcp.sport
5732 self.assertEqual(tcp.dport, 80)
5733 self.assert_packet_checksums_valid(p)
5735 self.logger.error(ppp("Unexpected or invalid packet:", p))
5739 p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
5740 IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
5741 TCP(sport=80, dport=external_port))
5742 self.pg3.add_stream(p)
5743 self.pg_enable_capture(self.pg_interfaces)
5745 capture = self.pg3.get_capture(1)
5750 self.assertEqual(ip.src, remote_host_ip6)
5751 self.assertEqual(ip.dst, self.pg3.remote_ip6)
5752 self.assertEqual(tcp.sport, 80)
5753 self.assertEqual(tcp.dport, 12345)
5754 self.assert_packet_checksums_valid(p)
5756 self.logger.error(ppp("Unexpected or invalid packet:", p))
5759 def test_frag_in_order(self):
5760 """ NAT64 translate fragments arriving in order """
5761 self.tcp_port_in = random.randint(1025, 65535)
5763 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5765 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5766 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5768 reass = self.vapi.nat_reass_dump()
5769 reass_n_start = len(reass)
5773 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5774 self.tcp_port_in, 20, data)
5775 self.pg0.add_stream(pkts)
5776 self.pg_enable_capture(self.pg_interfaces)
5778 frags = self.pg1.get_capture(len(pkts))
5779 p = self.reass_frags_and_verify(frags,
5781 self.pg1.remote_ip4)
5782 self.assertEqual(p[TCP].dport, 20)
5783 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5784 self.tcp_port_out = p[TCP].sport
5785 self.assertEqual(data, p[Raw].load)
5788 data = "A" * 4 + "b" * 16 + "C" * 3
5789 pkts = self.create_stream_frag(self.pg1,
5794 self.pg1.add_stream(pkts)
5795 self.pg_enable_capture(self.pg_interfaces)
5797 frags = self.pg0.get_capture(len(pkts))
5798 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5799 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5800 self.assertEqual(p[TCP].sport, 20)
5801 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5802 self.assertEqual(data, p[Raw].load)
5804 reass = self.vapi.nat_reass_dump()
5805 reass_n_end = len(reass)
5807 self.assertEqual(reass_n_end - reass_n_start, 2)
5809 def test_reass_hairpinning(self):
5810 """ NAT64 fragments hairpinning """
5812 server = self.pg0.remote_hosts[1]
5813 server_in_port = random.randint(1025, 65535)
5814 server_out_port = random.randint(1025, 65535)
5815 client_in_port = random.randint(1025, 65535)
5816 ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
5817 nat_addr_ip6 = ip.src
5819 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5821 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5822 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5824 # add static BIB entry for server
5825 self.vapi.nat64_add_del_static_bib(server.ip6n,
5831 # send packet from host to server
5832 pkts = self.create_stream_frag_ip6(self.pg0,
5837 self.pg0.add_stream(pkts)
5838 self.pg_enable_capture(self.pg_interfaces)
5840 frags = self.pg0.get_capture(len(pkts))
5841 p = self.reass_frags_and_verify_ip6(frags, nat_addr_ip6, server.ip6)
5842 self.assertNotEqual(p[TCP].sport, client_in_port)
5843 self.assertEqual(p[TCP].dport, server_in_port)
5844 self.assertEqual(data, p[Raw].load)
5846 def test_frag_out_of_order(self):
5847 """ NAT64 translate fragments arriving out of order """
5848 self.tcp_port_in = random.randint(1025, 65535)
5850 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5852 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5853 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5857 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
5858 self.tcp_port_in, 20, data)
5860 self.pg0.add_stream(pkts)
5861 self.pg_enable_capture(self.pg_interfaces)
5863 frags = self.pg1.get_capture(len(pkts))
5864 p = self.reass_frags_and_verify(frags,
5866 self.pg1.remote_ip4)
5867 self.assertEqual(p[TCP].dport, 20)
5868 self.assertNotEqual(p[TCP].sport, self.tcp_port_in)
5869 self.tcp_port_out = p[TCP].sport
5870 self.assertEqual(data, p[Raw].load)
5873 data = "A" * 4 + "B" * 16 + "C" * 3
5874 pkts = self.create_stream_frag(self.pg1,
5880 self.pg1.add_stream(pkts)
5881 self.pg_enable_capture(self.pg_interfaces)
5883 frags = self.pg0.get_capture(len(pkts))
5884 src = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
5885 p = self.reass_frags_and_verify_ip6(frags, src, self.pg0.remote_ip6)
5886 self.assertEqual(p[TCP].sport, 20)
5887 self.assertEqual(p[TCP].dport, self.tcp_port_in)
5888 self.assertEqual(data, p[Raw].load)
5890 def test_interface_addr(self):
5891 """ Acquire NAT64 pool addresses from interface """
5892 self.vapi.nat64_add_interface_addr(self.pg4.sw_if_index)
5894 # no address in NAT64 pool
5895 adresses = self.vapi.nat44_address_dump()
5896 self.assertEqual(0, len(adresses))
5898 # configure interface address and check NAT64 address pool
5899 self.pg4.config_ip4()
5900 addresses = self.vapi.nat64_pool_addr_dump()
5901 self.assertEqual(len(addresses), 1)
5902 self.assertEqual(addresses[0].address, self.pg4.local_ip4n)
5904 # remove interface address and check NAT64 address pool
5905 self.pg4.unconfig_ip4()
5906 addresses = self.vapi.nat64_pool_addr_dump()
5907 self.assertEqual(0, len(adresses))
5909 @unittest.skipUnless(running_extended_tests(), "part of extended tests")
5910 def test_ipfix_max_bibs_sessions(self):
5911 """ IPFIX logging maximum session and BIB entries exceeded """
5914 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
5918 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
5920 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
5921 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
5925 for i in range(0, max_bibs):
5926 src = "fd01:aa::%x" % (i)
5927 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5928 IPv6(src=src, dst=remote_host_ip6) /
5929 TCP(sport=12345, dport=80))
5931 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5932 IPv6(src=src, dst=remote_host_ip6) /
5933 TCP(sport=12345, dport=22))
5935 self.pg0.add_stream(pkts)
5936 self.pg_enable_capture(self.pg_interfaces)
5938 self.pg1.get_capture(max_sessions)
5940 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
5941 src_address=self.pg3.local_ip4n,
5943 template_interval=10)
5944 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
5945 src_port=self.ipfix_src_port)
5947 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5948 IPv6(src=src, dst=remote_host_ip6) /
5949 TCP(sport=12345, dport=25))
5950 self.pg0.add_stream(p)
5951 self.pg_enable_capture(self.pg_interfaces)
5953 self.pg1.get_capture(0)
5954 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5955 capture = self.pg3.get_capture(9)
5956 ipfix = IPFIXDecoder()
5957 # first load template
5959 self.assertTrue(p.haslayer(IPFIX))
5960 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5961 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5962 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5963 self.assertEqual(p[UDP].dport, 4739)
5964 self.assertEqual(p[IPFIX].observationDomainID,
5965 self.ipfix_domain_id)
5966 if p.haslayer(Template):
5967 ipfix.add_template(p.getlayer(Template))
5968 # verify events in data set
5970 if p.haslayer(Data):
5971 data = ipfix.decode_data_set(p.getlayer(Set))
5972 self.verify_ipfix_max_sessions(data, max_sessions)
5974 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
5975 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
5976 TCP(sport=12345, dport=80))
5977 self.pg0.add_stream(p)
5978 self.pg_enable_capture(self.pg_interfaces)
5980 self.pg1.get_capture(0)
5981 self.vapi.cli("ipfix flush") # FIXME this should be an API call
5982 capture = self.pg3.get_capture(1)
5983 # verify events in data set
5985 self.assertTrue(p.haslayer(IPFIX))
5986 self.assertEqual(p[IP].src, self.pg3.local_ip4)
5987 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
5988 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
5989 self.assertEqual(p[UDP].dport, 4739)
5990 self.assertEqual(p[IPFIX].observationDomainID,
5991 self.ipfix_domain_id)
5992 if p.haslayer(Data):
5993 data = ipfix.decode_data_set(p.getlayer(Set))
5994 self.verify_ipfix_max_bibs(data, max_bibs)
5996 def test_ipfix_max_frags(self):
5997 """ IPFIX logging maximum fragments pending reassembly exceeded """
5998 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6000 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6001 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6002 self.vapi.nat_set_reass(max_frag=0, is_ip6=1)
6003 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6004 src_address=self.pg3.local_ip4n,
6006 template_interval=10)
6007 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6008 src_port=self.ipfix_src_port)
6011 pkts = self.create_stream_frag_ip6(self.pg0, self.pg1.remote_ip4,
6012 self.tcp_port_in, 20, data)
6013 self.pg0.add_stream(pkts[-1])
6014 self.pg_enable_capture(self.pg_interfaces)
6016 self.pg1.get_capture(0)
6017 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6018 capture = self.pg3.get_capture(9)
6019 ipfix = IPFIXDecoder()
6020 # first load template
6022 self.assertTrue(p.haslayer(IPFIX))
6023 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6024 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6025 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6026 self.assertEqual(p[UDP].dport, 4739)
6027 self.assertEqual(p[IPFIX].observationDomainID,
6028 self.ipfix_domain_id)
6029 if p.haslayer(Template):
6030 ipfix.add_template(p.getlayer(Template))
6031 # verify events in data set
6033 if p.haslayer(Data):
6034 data = ipfix.decode_data_set(p.getlayer(Set))
6035 self.verify_ipfix_max_fragments_ip6(data, 0,
6036 self.pg0.remote_ip6n)
6038 def test_ipfix_bib_ses(self):
6039 """ IPFIX logging NAT64 BIB/session create and delete events """
6040 self.tcp_port_in = random.randint(1025, 65535)
6041 remote_host_ip6 = self.compose_ip6(self.pg1.remote_ip4,
6045 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6047 self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
6048 self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6049 self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
6050 src_address=self.pg3.local_ip4n,
6052 template_interval=10)
6053 self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
6054 src_port=self.ipfix_src_port)
6057 p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
6058 IPv6(src=self.pg0.remote_ip6, dst=remote_host_ip6) /
6059 TCP(sport=self.tcp_port_in, dport=25))
6060 self.pg0.add_stream(p)
6061 self.pg_enable_capture(self.pg_interfaces)
6063 p = self.pg1.get_capture(1)
6064 self.tcp_port_out = p[0][TCP].sport
6065 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6066 capture = self.pg3.get_capture(10)
6067 ipfix = IPFIXDecoder()
6068 # first load template
6070 self.assertTrue(p.haslayer(IPFIX))
6071 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6072 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6073 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6074 self.assertEqual(p[UDP].dport, 4739)
6075 self.assertEqual(p[IPFIX].observationDomainID,
6076 self.ipfix_domain_id)
6077 if p.haslayer(Template):
6078 ipfix.add_template(p.getlayer(Template))
6079 # verify events in data set
6081 if p.haslayer(Data):
6082 data = ipfix.decode_data_set(p.getlayer(Set))
6083 if ord(data[0][230]) == 10:
6084 self.verify_ipfix_bib(data, 1, self.pg0.remote_ip6n)
6085 elif ord(data[0][230]) == 6:
6086 self.verify_ipfix_nat64_ses(data,
6088 self.pg0.remote_ip6n,
6089 self.pg1.remote_ip4,
6092 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6095 self.pg_enable_capture(self.pg_interfaces)
6096 self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
6099 self.vapi.cli("ipfix flush") # FIXME this should be an API call
6100 capture = self.pg3.get_capture(2)
6101 # verify events in data set
6103 self.assertTrue(p.haslayer(IPFIX))
6104 self.assertEqual(p[IP].src, self.pg3.local_ip4)
6105 self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
6106 self.assertEqual(p[UDP].sport, self.ipfix_src_port)
6107 self.assertEqual(p[UDP].dport, 4739)
6108 self.assertEqual(p[IPFIX].observationDomainID,
6109 self.ipfix_domain_id)
6110 if p.haslayer(Data):
6111 data = ipfix.decode_data_set(p.getlayer(Set))
6112 if ord(data[0][230]) == 11:
6113 self.verify_ipfix_bib(data, 0, self.pg0.remote_ip6n)
6114 elif ord(data[0][230]) == 7:
6115 self.verify_ipfix_nat64_ses(data,
6117 self.pg0.remote_ip6n,
6118 self.pg1.remote_ip4,
6121 self.logger.error(ppp("Unexpected or invalid packet: ", p))
6123 def nat64_get_ses_num(self):
6125 Return number of active NAT64 sessions.
6127 st = self.vapi.nat64_st_dump()
6130 def clear_nat64(self):
6132 Clear NAT64 configuration.
6134 self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
6135 domain_id=self.ipfix_domain_id)
6136 self.ipfix_src_port = 4739
6137 self.ipfix_domain_id = 1
6139 self.vapi.nat64_set_timeouts()
6141 interfaces = self.vapi.nat64_interface_dump()
6142 for intf in interfaces:
6143 if intf.is_inside > 1:
6144 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6147 self.vapi.nat64_add_del_interface(intf.sw_if_index,
6151 bib = self.vapi.nat64_bib_dump(255)
6154 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
6162 adresses = self.vapi.nat64_pool_addr_dump()
6163 for addr in adresses:
6164 self.vapi.nat64_add_del_pool_addr_range(addr.address,
6169 prefixes = self.vapi.nat64_prefix_dump()
6170 for prefix in prefixes:
6171 self.vapi.nat64_add_del_prefix(prefix.prefix,
6173 vrf_id=prefix.vrf_id,
6177 super(TestNAT64, self).tearDown()
6178 if not self.vpp_dead:
6179 self.logger.info(self.vapi.cli("show nat64 pool"))
6180 self.logger.info(self.vapi.cli("show nat64 interfaces"))
6181 self.logger.info(self.vapi.cli("show nat64 prefix"))
6182 self.logger.info(self.vapi.cli("show nat64 bib all"))
6183 self.logger.info(self.vapi.cli("show nat64 session table all"))
6184 self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
6188 class TestDSlite(MethodHolder):
6189 """ DS-Lite Test Cases """
6192 def setUpClass(cls):
6193 super(TestDSlite, cls).setUpClass()
6196 cls.nat_addr = '10.0.0.3'
6197 cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
6199 cls.create_pg_interfaces(range(2))
6201 cls.pg0.config_ip4()
6202 cls.pg0.resolve_arp()
6204 cls.pg1.config_ip6()
6205 cls.pg1.generate_remote_hosts(2)
6206 cls.pg1.configure_ipv6_neighbors()
6209 super(TestDSlite, cls).tearDownClass()
6212 def test_dslite(self):
6213 """ Test DS-Lite """
6214 self.vapi.dslite_add_del_pool_addr_range(self.nat_addr_n,
6216 aftr_ip4 = '192.0.0.1'
6217 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6218 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6219 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6220 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6223 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6224 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[0].ip6) /
6225 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6226 UDP(sport=20000, dport=10000))
6227 self.pg1.add_stream(p)
6228 self.pg_enable_capture(self.pg_interfaces)
6230 capture = self.pg0.get_capture(1)
6231 capture = capture[0]
6232 self.assertFalse(capture.haslayer(IPv6))
6233 self.assertEqual(capture[IP].src, self.nat_addr)
6234 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6235 self.assertNotEqual(capture[UDP].sport, 20000)
6236 self.assertEqual(capture[UDP].dport, 10000)
6237 self.assert_packet_checksums_valid(capture)
6238 out_port = capture[UDP].sport
6240 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6241 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6242 UDP(sport=10000, dport=out_port))
6243 self.pg0.add_stream(p)
6244 self.pg_enable_capture(self.pg_interfaces)
6246 capture = self.pg1.get_capture(1)
6247 capture = capture[0]
6248 self.assertEqual(capture[IPv6].src, aftr_ip6)
6249 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6250 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6251 self.assertEqual(capture[IP].dst, '192.168.1.1')
6252 self.assertEqual(capture[UDP].sport, 10000)
6253 self.assertEqual(capture[UDP].dport, 20000)
6254 self.assert_packet_checksums_valid(capture)
6257 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6258 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6259 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6260 TCP(sport=20001, dport=10001))
6261 self.pg1.add_stream(p)
6262 self.pg_enable_capture(self.pg_interfaces)
6264 capture = self.pg0.get_capture(1)
6265 capture = capture[0]
6266 self.assertFalse(capture.haslayer(IPv6))
6267 self.assertEqual(capture[IP].src, self.nat_addr)
6268 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6269 self.assertNotEqual(capture[TCP].sport, 20001)
6270 self.assertEqual(capture[TCP].dport, 10001)
6271 self.assert_packet_checksums_valid(capture)
6272 out_port = capture[TCP].sport
6274 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6275 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6276 TCP(sport=10001, dport=out_port))
6277 self.pg0.add_stream(p)
6278 self.pg_enable_capture(self.pg_interfaces)
6280 capture = self.pg1.get_capture(1)
6281 capture = capture[0]
6282 self.assertEqual(capture[IPv6].src, aftr_ip6)
6283 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6284 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6285 self.assertEqual(capture[IP].dst, '192.168.1.1')
6286 self.assertEqual(capture[TCP].sport, 10001)
6287 self.assertEqual(capture[TCP].dport, 20001)
6288 self.assert_packet_checksums_valid(capture)
6291 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6292 IPv6(dst=aftr_ip6, src=self.pg1.remote_hosts[1].ip6) /
6293 IP(dst=self.pg0.remote_ip4, src='192.168.1.1') /
6294 ICMP(id=4000, type='echo-request'))
6295 self.pg1.add_stream(p)
6296 self.pg_enable_capture(self.pg_interfaces)
6298 capture = self.pg0.get_capture(1)
6299 capture = capture[0]
6300 self.assertFalse(capture.haslayer(IPv6))
6301 self.assertEqual(capture[IP].src, self.nat_addr)
6302 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6303 self.assertNotEqual(capture[ICMP].id, 4000)
6304 self.assert_packet_checksums_valid(capture)
6305 out_id = capture[ICMP].id
6307 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6308 IP(dst=self.nat_addr, src=self.pg0.remote_ip4) /
6309 ICMP(id=out_id, type='echo-reply'))
6310 self.pg0.add_stream(p)
6311 self.pg_enable_capture(self.pg_interfaces)
6313 capture = self.pg1.get_capture(1)
6314 capture = capture[0]
6315 self.assertEqual(capture[IPv6].src, aftr_ip6)
6316 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6317 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6318 self.assertEqual(capture[IP].dst, '192.168.1.1')
6319 self.assertEqual(capture[ICMP].id, 4000)
6320 self.assert_packet_checksums_valid(capture)
6322 # ping DS-Lite AFTR tunnel endpoint address
6323 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6324 IPv6(src=self.pg1.remote_hosts[1].ip6, dst=aftr_ip6) /
6325 ICMPv6EchoRequest())
6326 self.pg1.add_stream(p)
6327 self.pg_enable_capture(self.pg_interfaces)
6329 capture = self.pg1.get_capture(1)
6330 self.assertEqual(1, len(capture))
6331 capture = capture[0]
6332 self.assertEqual(capture[IPv6].src, aftr_ip6)
6333 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[1].ip6)
6334 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6337 super(TestDSlite, self).tearDown()
6338 if not self.vpp_dead:
6339 self.logger.info(self.vapi.cli("show dslite pool"))
6341 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6342 self.logger.info(self.vapi.cli("show dslite sessions"))
6345 class TestDSliteCE(MethodHolder):
6346 """ DS-Lite CE Test Cases """
6349 def setUpConstants(cls):
6350 super(TestDSliteCE, cls).setUpConstants()
6351 cls.vpp_cmdline.extend(["nat", "{", "dslite ce", "}"])
6354 def setUpClass(cls):
6355 super(TestDSliteCE, cls).setUpClass()
6358 cls.create_pg_interfaces(range(2))
6360 cls.pg0.config_ip4()
6361 cls.pg0.resolve_arp()
6363 cls.pg1.config_ip6()
6364 cls.pg1.generate_remote_hosts(1)
6365 cls.pg1.configure_ipv6_neighbors()
6368 super(TestDSliteCE, cls).tearDownClass()
6371 def test_dslite_ce(self):
6372 """ Test DS-Lite CE """
6374 b4_ip4 = '192.0.0.2'
6375 b4_ip4_n = socket.inet_pton(socket.AF_INET, b4_ip4)
6376 b4_ip6 = '2001:db8:62aa::375e:f4c1:1'
6377 b4_ip6_n = socket.inet_pton(socket.AF_INET6, b4_ip6)
6378 self.vapi.dslite_set_b4_addr(b4_ip6_n, b4_ip4_n)
6380 aftr_ip4 = '192.0.0.1'
6381 aftr_ip4_n = socket.inet_pton(socket.AF_INET, aftr_ip4)
6382 aftr_ip6 = '2001:db8:85a3::8a2e:370:1'
6383 aftr_ip6_n = socket.inet_pton(socket.AF_INET6, aftr_ip6)
6384 self.vapi.dslite_set_aftr_addr(aftr_ip6_n, aftr_ip4_n)
6386 self.vapi.ip_add_del_route(dst_address=aftr_ip6_n,
6387 dst_address_length=128,
6388 next_hop_address=self.pg1.remote_ip6n,
6389 next_hop_sw_if_index=self.pg1.sw_if_index,
6393 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6394 IP(dst=self.pg1.remote_ip4, src=self.pg0.remote_ip4) /
6395 UDP(sport=10000, dport=20000))
6396 self.pg0.add_stream(p)
6397 self.pg_enable_capture(self.pg_interfaces)
6399 capture = self.pg1.get_capture(1)
6400 capture = capture[0]
6401 self.assertEqual(capture[IPv6].src, b4_ip6)
6402 self.assertEqual(capture[IPv6].dst, aftr_ip6)
6403 self.assertEqual(capture[IP].src, self.pg0.remote_ip4)
6404 self.assertEqual(capture[IP].dst, self.pg1.remote_ip4)
6405 self.assertEqual(capture[UDP].sport, 10000)
6406 self.assertEqual(capture[UDP].dport, 20000)
6407 self.assert_packet_checksums_valid(capture)
6410 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6411 IPv6(dst=b4_ip6, src=aftr_ip6) /
6412 IP(dst=self.pg0.remote_ip4, src=self.pg1.remote_ip4) /
6413 UDP(sport=20000, dport=10000))
6414 self.pg1.add_stream(p)
6415 self.pg_enable_capture(self.pg_interfaces)
6417 capture = self.pg0.get_capture(1)
6418 capture = capture[0]
6419 self.assertFalse(capture.haslayer(IPv6))
6420 self.assertEqual(capture[IP].src, self.pg1.remote_ip4)
6421 self.assertEqual(capture[IP].dst, self.pg0.remote_ip4)
6422 self.assertEqual(capture[UDP].sport, 20000)
6423 self.assertEqual(capture[UDP].dport, 10000)
6424 self.assert_packet_checksums_valid(capture)
6426 # ping DS-Lite B4 tunnel endpoint address
6427 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6428 IPv6(src=self.pg1.remote_hosts[0].ip6, dst=b4_ip6) /
6429 ICMPv6EchoRequest())
6430 self.pg1.add_stream(p)
6431 self.pg_enable_capture(self.pg_interfaces)
6433 capture = self.pg1.get_capture(1)
6434 self.assertEqual(1, len(capture))
6435 capture = capture[0]
6436 self.assertEqual(capture[IPv6].src, b4_ip6)
6437 self.assertEqual(capture[IPv6].dst, self.pg1.remote_hosts[0].ip6)
6438 self.assertTrue(capture.haslayer(ICMPv6EchoReply))
6441 super(TestDSliteCE, self).tearDown()
6442 if not self.vpp_dead:
6444 self.vapi.cli("show dslite aftr-tunnel-endpoint-address"))
6446 self.vapi.cli("show dslite b4-tunnel-endpoint-address"))
6449 class TestNAT66(MethodHolder):
6450 """ NAT66 Test Cases """
6453 def setUpClass(cls):
6454 super(TestNAT66, cls).setUpClass()
6457 cls.nat_addr = 'fd01:ff::2'
6458 cls.nat_addr_n = socket.inet_pton(socket.AF_INET6, cls.nat_addr)
6460 cls.create_pg_interfaces(range(2))
6461 cls.interfaces = list(cls.pg_interfaces)
6463 for i in cls.interfaces:
6466 i.configure_ipv6_neighbors()
6469 super(TestNAT66, cls).tearDownClass()
6472 def test_static(self):
6473 """ 1:1 NAT66 test """
6474 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6475 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index, is_inside=0)
6476 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6481 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6482 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6485 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6486 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6489 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6490 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6491 ICMPv6EchoRequest())
6493 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6494 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6495 GRE() / IP() / TCP())
6497 self.pg0.add_stream(pkts)
6498 self.pg_enable_capture(self.pg_interfaces)
6500 capture = self.pg1.get_capture(len(pkts))
6501 for packet in capture:
6503 self.assertEqual(packet[IPv6].src, self.nat_addr)
6504 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6505 self.assert_packet_checksums_valid(packet)
6507 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6512 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6513 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6516 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6517 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6520 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6521 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6524 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
6525 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
6526 GRE() / IP() / TCP())
6528 self.pg1.add_stream(pkts)
6529 self.pg_enable_capture(self.pg_interfaces)
6531 capture = self.pg0.get_capture(len(pkts))
6532 for packet in capture:
6534 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
6535 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
6536 self.assert_packet_checksums_valid(packet)
6538 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6541 sm = self.vapi.nat66_static_mapping_dump()
6542 self.assertEqual(len(sm), 1)
6543 self.assertEqual(sm[0].total_pkts, 8)
6545 def test_check_no_translate(self):
6546 """ NAT66 translate only when egress interface is outside interface """
6547 self.vapi.nat66_add_del_interface(self.pg0.sw_if_index)
6548 self.vapi.nat66_add_del_interface(self.pg1.sw_if_index)
6549 self.vapi.nat66_add_del_static_mapping(self.pg0.remote_ip6n,
6553 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
6554 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
6556 self.pg0.add_stream([p])
6557 self.pg_enable_capture(self.pg_interfaces)
6559 capture = self.pg1.get_capture(1)
6562 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
6563 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
6565 self.logger.error(ppp("Unexpected or invalid packet:", packet))
6568 def clear_nat66(self):
6570 Clear NAT66 configuration.
6572 interfaces = self.vapi.nat66_interface_dump()
6573 for intf in interfaces:
6574 self.vapi.nat66_add_del_interface(intf.sw_if_index,
6578 static_mappings = self.vapi.nat66_static_mapping_dump()
6579 for sm in static_mappings:
6580 self.vapi.nat66_add_del_static_mapping(sm.local_ip_address,
6581 sm.external_ip_address,
6586 super(TestNAT66, self).tearDown()
6587 if not self.vpp_dead:
6588 self.logger.info(self.vapi.cli("show nat66 interfaces"))
6589 self.logger.info(self.vapi.cli("show nat66 static mappings"))
6593 if __name__ == '__main__':
6594 unittest.main(testRunner=VppTestRunner)